appending to parquet file with {arrow}

I’ll share my code for testing Spark below, in case someone finds it useful.

library(tidyverse)
library(arrow)
library(sparklyr)
library(kableExtra)
sc <- spark_connect(master = "local")
x <- letters[1:4]
df <- data.frame(
  chr = x,
  fct = factor(x),
  date = seq(
    as.Date("2020-06-01"),
    as.Date("2020-06-04"),
    length.out = 4
  ),
  dttm = seq(
    as.POSIXct("2020-06-01", tz = "UTC"),
    as.POSIXct("2020-06-02", tz = "UTC"),
    length.out = 4
  ),
  stringsAsFactors = FALSE
)
df <- bind_rows(df, df, .id = "id")

Writing a single Parquet file from Spark is not so straightforward.

  • We need to load the data in chunks into separate Spark DataFrames.
  • These Spark DataFrames need to be coalesced, otherwise Spark will write the data into several partitions on disk.
  • Even without any partitioning, Spark will write the Parquet file into a directory (given as path in spark_write_parquet()), where the actual Parquet file has a random name, something like part-00000-bfefeade-e8a6-4355-90e8-129b6157a3e2-c000.snappy.parquet, with additional metadata in other files (an empty _SUCCESS file, and checksums). I want a simple Parquet file, so some files have to be shuffled around at the end.

The following function automates all these steps.

write_parquet_with_spark <- function(df, path, con = sc) {
  l <- split(df, df$id)
  names(l) <- paste0("t", names(l))

  sc_tbls <- map2(
    l,
    names(l),
    ~ copy_to(con, .x, .y, overwrite = TRUE)
  )

  t <- sdf_bind_rows(sc_tbls) %>%
    sdf_coalesce(partitions = 1) %>%
    compute("t")

  tmp <- tempfile()
  spark_write_parquet(t, path = tmp, mode = "overwrite")
  parquet_from_spark <- list.files(path = tmp, pattern = "*.parquet", full.names = TRUE)
  if (length(parquet_from_spark) != 1) {
    stop("More than one parquet file generated!")
  }
  file.copy(parquet_from_spark, path)
}

Let’s test writing out Parquet files, with both example data frames,
using arrow and sparklyr. It has a lot of overhead with Spark.

fun <- c("write_parquet", "write_parquet_with_spark")
df_write <- crossing(fun, df = list(df)) %>%
  mutate(
    path = str_c("df", fun, sep = "_"),
    path = str_c(path, "parquet", sep = ".")
  ) %>%
  select(fun, df, path)

system.time(pwalk(df_write, ~ exec(..1, ..2, ..3)))
## Warning: `data_frame()` is deprecated as of tibble 1.1.0.
## Please use `tibble()` instead.
## This warning is displayed once every 8 hours.
## Call `lifecycle::last_warnings()` to see where this warning was generated.

##    user  system elapsed 
##    2.32    0.12   37.55
spark_disconnect(sc)

And check the results:

fs::dir_info(glob = "df*.parquet") %>%
  kable()

path

type

size

permissions

modification_time

user

group

device_id

hard_links

special_device_id

inode

block_size

blocks

flags

generation

access_time

change_time

birth_time

df_write_parquet.parquet

file

1.79K

rw-

2020-06-19 11:02:37

NA

NA

114062174

1

0

5.629500e+15

4096

8

0

0

2020-06-18 15:09:59

2020-06-19 11:02:37

2020-06-18 15:09:59

df_write_parquet_with_spark.parquet

file

1.23K

rw-

2020-06-18 15:10:27

NA

NA

114062174

1

0

6.755399e+15

4096

8

0

0

2020-06-18 15:10:27

2020-06-18 15:10:27

2020-06-18 15:10:27

df_check <- df_write %>%
  mutate(
    df_read = map(path, read_parquet),
    all_equal = map2_chr(df, df_read, all_equal)
  )

df_check %>%
  select(path, all_equal) %>%
  kable()

path

all_equal

df_write_parquet.parquet

TRUE

df_write_parquet_with_spark.parquet

  • Different types for column fct: factor<10564> vs character
  • Different types for column dttm: datetime<UTC> vs
    datetime<local>
3 Likes