I’ll share my code for testing Spark below, in case someone finds it useful.
library(tidyverse)
library(arrow)
library(sparklyr)
library(kableExtra)
sc <- spark_connect(master = "local")
x <- letters[1:4]
df <- data.frame(
chr = x,
fct = factor(x),
date = seq(
as.Date("2020-06-01"),
as.Date("2020-06-04"),
length.out = 4
),
dttm = seq(
as.POSIXct("2020-06-01", tz = "UTC"),
as.POSIXct("2020-06-02", tz = "UTC"),
length.out = 4
),
stringsAsFactors = FALSE
)
df <- bind_rows(df, df, .id = "id")
Writing a single Parquet file from Spark is not so straightforward.
- We need to load the data in chunks into separate Spark DataFrames.
- These Spark DataFrames need to be coalesced, otherwise Spark will write the data into several partitions on disk.
- Even without any partitioning, Spark will write the Parquet file into a directory (given as
path
in spark_write_parquet()
), where the actual Parquet file has a random name, something like part-00000-bfefeade-e8a6-4355-90e8-129b6157a3e2-c000.snappy.parquet
, with additional metadata in other files (an empty _SUCCESS
file, and checksums). I want a simple Parquet file, so some files have to be shuffled around at the end.
The following function automates all these steps.
write_parquet_with_spark <- function(df, path, con = sc) {
l <- split(df, df$id)
names(l) <- paste0("t", names(l))
sc_tbls <- map2(
l,
names(l),
~ copy_to(con, .x, .y, overwrite = TRUE)
)
t <- sdf_bind_rows(sc_tbls) %>%
sdf_coalesce(partitions = 1) %>%
compute("t")
tmp <- tempfile()
spark_write_parquet(t, path = tmp, mode = "overwrite")
parquet_from_spark <- list.files(path = tmp, pattern = "*.parquet", full.names = TRUE)
if (length(parquet_from_spark) != 1) {
stop("More than one parquet file generated!")
}
file.copy(parquet_from_spark, path)
}
Let’s test writing out Parquet files, with both example data frames,
using arrow and sparklyr. It has a lot of overhead with Spark.
fun <- c("write_parquet", "write_parquet_with_spark")
df_write <- crossing(fun, df = list(df)) %>%
mutate(
path = str_c("df", fun, sep = "_"),
path = str_c(path, "parquet", sep = ".")
) %>%
select(fun, df, path)
system.time(pwalk(df_write, ~ exec(..1, ..2, ..3)))
## Warning: `data_frame()` is deprecated as of tibble 1.1.0.
## Please use `tibble()` instead.
## This warning is displayed once every 8 hours.
## Call `lifecycle::last_warnings()` to see where this warning was generated.
## user system elapsed
## 2.32 0.12 37.55
spark_disconnect(sc)
And check the results:
fs::dir_info(glob = "df*.parquet") %>%
kable()
path
|
type
|
size
|
permissions
|
modification_time
|
user
|
group
|
device_id
|
hard_links
|
special_device_id
|
inode
|
block_size
|
blocks
|
flags
|
generation
|
access_time
|
change_time
|
birth_time
|
df_write_parquet.parquet
|
file
|
1.79K
|
rw-
|
2020-06-19 11:02:37
|
NA
|
NA
|
114062174
|
1
|
0
|
5.629500e+15
|
4096
|
8
|
0
|
0
|
2020-06-18 15:09:59
|
2020-06-19 11:02:37
|
2020-06-18 15:09:59
|
df_write_parquet_with_spark.parquet
|
file
|
1.23K
|
rw-
|
2020-06-18 15:10:27
|
NA
|
NA
|
114062174
|
1
|
0
|
6.755399e+15
|
4096
|
8
|
0
|
0
|
2020-06-18 15:10:27
|
2020-06-18 15:10:27
|
2020-06-18 15:10:27
|
df_check <- df_write %>%
mutate(
df_read = map(path, read_parquet),
all_equal = map2_chr(df, df_read, all_equal)
)
df_check %>%
select(path, all_equal) %>%
kable()
path
|
all_equal
|
df_write_parquet.parquet
|
TRUE
|
df_write_parquet_with_spark.parquet
|
- Different types for column
fct : factor<10564> vs character
- Different types for column
dttm : datetime<UTC> vs
datetime<local>
|