I’ll share my code for testing Spark below, in case someone finds it useful.
library(tidyverse)
library(arrow)
library(sparklyr)
library(kableExtra)
sc <- spark_connect(master = "local")
x <- letters[1:4]
df <- data.frame(
chr = x,
fct = factor(x),
date = seq(
as.Date("2020-06-01"),
as.Date("2020-06-04"),
length.out = 4
),
dttm = seq(
as.POSIXct("2020-06-01", tz = "UTC"),
as.POSIXct("2020-06-02", tz = "UTC"),
length.out = 4
),
stringsAsFactors = FALSE
)
df <- bind_rows(df, df, .id = "id")
Writing a single Parquet file from Spark is not so straightforward.
- We need to load the data in chunks into separate Spark DataFrames.
- These Spark DataFrames need to be coalesced, otherwise Spark will write the data into several partitions on disk.
- Even without any partitioning, Spark will write the Parquet file into a directory (given as
path
inspark_write_parquet()
), where the actual Parquet file has a random name, something likepart-00000-bfefeade-e8a6-4355-90e8-129b6157a3e2-c000.snappy.parquet
, with additional metadata in other files (an empty_SUCCESS
file, and checksums). I want a simple Parquet file, so some files have to be shuffled around at the end.
The following function automates all these steps.
write_parquet_with_spark <- function(df, path, con = sc) {
l <- split(df, df$id)
names(l) <- paste0("t", names(l))
sc_tbls <- map2(
l,
names(l),
~ copy_to(con, .x, .y, overwrite = TRUE)
)
t <- sdf_bind_rows(sc_tbls) %>%
sdf_coalesce(partitions = 1) %>%
compute("t")
tmp <- tempfile()
spark_write_parquet(t, path = tmp, mode = "overwrite")
parquet_from_spark <- list.files(path = tmp, pattern = "*.parquet", full.names = TRUE)
if (length(parquet_from_spark) != 1) {
stop("More than one parquet file generated!")
}
file.copy(parquet_from_spark, path)
}
Let’s test writing out Parquet files, with both example data frames,
using arrow and sparklyr. It has a lot of overhead with Spark.
fun <- c("write_parquet", "write_parquet_with_spark")
df_write <- crossing(fun, df = list(df)) %>%
mutate(
path = str_c("df", fun, sep = "_"),
path = str_c(path, "parquet", sep = ".")
) %>%
select(fun, df, path)
system.time(pwalk(df_write, ~ exec(..1, ..2, ..3)))
## Warning: `data_frame()` is deprecated as of tibble 1.1.0.
## Please use `tibble()` instead.
## This warning is displayed once every 8 hours.
## Call `lifecycle::last_warnings()` to see where this warning was generated.
## user system elapsed
## 2.32 0.12 37.55
spark_disconnect(sc)
And check the results:
fs::dir_info(glob = "df*.parquet") %>%
kable()
path |
type |
size |
permissions |
modification_time |
user |
group |
device_id |
hard_links |
special_device_id |
inode |
block_size |
blocks |
flags |
generation |
access_time |
change_time |
birth_time |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
df_write_parquet.parquet |
file |
1.79K |
rw- |
2020-06-19 11:02:37 |
NA |
NA |
114062174 |
1 |
0 |
5.629500e+15 |
4096 |
8 |
0 |
0 |
2020-06-18 15:09:59 |
2020-06-19 11:02:37 |
2020-06-18 15:09:59 |
df_write_parquet_with_spark.parquet |
file |
1.23K |
rw- |
2020-06-18 15:10:27 |
NA |
NA |
114062174 |
1 |
0 |
6.755399e+15 |
4096 |
8 |
0 |
0 |
2020-06-18 15:10:27 |
2020-06-18 15:10:27 |
2020-06-18 15:10:27 |
df_check <- df_write %>%
mutate(
df_read = map(path, read_parquet),
all_equal = map2_chr(df, df_read, all_equal)
)
df_check %>%
select(path, all_equal) %>%
kable()
path |
all_equal |
---|---|
df_write_parquet.parquet |
TRUE |
df_write_parquet_with_spark.parquet |
|