Hello everyone
I would like to ask a question about exporting DuckDB tables to Apache Parquet format using R programs. As indicated in Arrow for R : : CHEAT SHEET the write_dataset function seems to be very handy in this regard, in particular thanks to its partitioning argument which lets me to specify the whole set of keys (that is, specific columns in my table) to be used when the data is split based on Parquet format. While I was checking the online documentation of DuckDB, I found at chapter Parquet the following command
COPY (SELECT * FROM tbl) TO 'result-snappy.parquet' (FORMAT 'parquet');
This seems quite interesting, because in the environment where I am working the amount of available RAM for my program, will never be enough to first load the entire data in a data.frame and then build the Parquet from that. What I need is loading the data from the database directly into the disk in Parquet format and if I understand correctly, apparently this is what the above COPY command allows to do if it is called via DBI::dbExecute . Yet, I didn't find any way to specify partitioning (Hive style) to define Parquet keys before exporting the data. Is there any way to do that?
Thanks in advance
Finally, I found the answer here :
duckdb:master
← samansmink:copy-into-partition-by
opened 09:49AM - 23 Jan 23 UTC
This PR introduces a first version of the partitioned COPY operator. This PR bui… lds on previous work from @lnkuiper adding the `PartitionedColumnData` (https://github.com/duckdb/duckdb/pull/4970) and the per thread output from @hannes (https://github.com/duckdb/duckdb/pull/5412)
To summarize, the Partitioned COPY:
- Supports both CSV and Parquet
- Currently fully materializes data during partitioning (to be improved in follow-up pr's)
- Outputs 1 file per partition per thread (similar to https://github.com/duckdb/duckdb/pull/5412)
The partitioned write is used similarly to the PER_THREAD_OUTPUT feature:
```
COPY table TO '__TEST_DIR__/partitioned' (FORMAT PARQUET, PARTITION_BY (part_col_a, part_col_b));
```
this command will write files in this format, which is known as the hive partitioning scheme:
```
__TEST_DIR__/partitioned/part_col_a=<val>/part_col_b=<val>/data_<thread_number>.parquet
```
Partioned copy to S3 also works:
```
COPY table TO 's3://mah-bucket/partitioned' (FORMAT PARQUET, PARTITION_BY (part_col_a, part_col_b));
```
Finally, a check is performed for existing files/directories which is currently quite conservative (and on S3 will add a bit of latency). To disable this check and force writing, an ALLOW_OVERWRITE flag is added:
```
COPY table TO '__TEST_DIR__/partitioned' (FORMAT PARQUET, PARTITION_BY (part_col_a, part_col_b), ALLOW_OVERWRITE TRUE);
```
Note that this also works with the PER_THREAD_OUTPUT feature.
### Implementation
To support these features, a new class `HivePartitionedColumnData` is introduced, which implements the `PartitionedColumnData` interface from https://github.com/duckdb/duckdb/pull/4970. The main complexity here is that the `HivePartitionedColumnData` class needs to be able to discover new partitions in parallel. For the RadixPartitioning that was already implemented, the number of partitions is known in advance and does not change. Partition discovery is handled by all threads while writing tuples to their local `HivePartitionedColumnData`. To prevent expensive locking when synchronizing the partition discovery, each thread will have a local partition map to do quick lookups during partitioning, with a shared global state where new partitions are added. This means that only when adding new partitions to the global state a lock is required. Since this partitioned write is not expected to scale to super large amounts of partitions anyway, this should work well. Due to this shared state the partition indices between the thread local `HivePartitionedColumnData` objects will remain in sync.
### Benchmarks
Here's some rough benchmarks to give an indication of the performance overhead on a M1 macbook:
| COPY lineitem_sf1 to file.parquet w/ 8 threads | Avg | Relative to fastest |
| ---------------------------------------------- | ----- | ------------------- |
| Regular copy | 7.50 | 443.98% |
| Disable preserve order (https://github.com/duckdb/duckdb/pull/5756) | 1.40 | 1.64% |
| Threads (https://github.com/duckdb/duckdb/pull/5412) | 1.38 | 0.00% |
| Threads + disable order preserving | 1.42 | 2.73% |
| Hive Partitioned (4 partitions over 2 cols) | 1.62 | 17.71% |
| Hive Partitioned (28 partitions over 3 cols) | 1.92 | 38.93% |
| Hive Partitioned (84 paritions over 2 cols) | 1.89 | 36.93% |
| Hive Partitioned (432 paritions over 3 cols) | 2.58 | 86.83% |
| Hive Partitioned (1160 paritions over 5 cols) | 5.67 | 310.78% |
| Hive Partitioned (2526 partitions over 1 col) | 11.15 | 708.10% |
Note that performance for low amounts of partitions is very good. Higher amounts of partitions get pretty bad, but this is most likely due to the fact that at this partition count, the resulting files only contain very few tuples, leading to large IO overhead. I would expect that for larger files the relative overhead will be lower, but more benchmarking here is required.
### Transform partition columns before writing
Note that partition values are converted to strings. There are probably many edge cases where this won't work nicely. However, just using SQL in your copy statement you can transform the partition by columns however you want. For example to add a prefix to an existing column call part_col you could do:
```
COPY (SELECT * EXCLUDE (part_col), 'prefix-'::VARCHAR || part_col::VARCHAR as part_col FROM test)
TO '__TEST_DIR__/partitioned' (FORMAT PARQUET, PARTITION_BY (part_col));
```
### Limitations
The partitioned write fully materializes, so it will quickly fill the available memory for large tables. This is not necessarily a blocker as the buffer manager can offload to disk. This will however mean that enough local disk space is required and for large files the data is:
- stored in memory
- offloaded to disk
- read from disk
- written to the final partition file
This is not ideal, but will still be good enough for many use cases.
### Future work
The ideal partitioning COPY would produce 1 file per partition while providing mechanisms to limit memory usage and not require full materialization.
First step towards this goal is to make a streaming variant that can flush partitions as partition tuple limits (or possibly global operator limits) are reached. This would produce a single file per partition, per flush of that partition.
The second step would be to not close files after flushing, allowing multiple flushes to a single file. With this we would achieve the desired behaviour where we can produce a single file per partition in a streaming fashion.
We have some nice idea's on implementing the above, so hopefully two more PR's coming up soon-ish :)
Apparently this functionality will be in the next release of DuckDB
1 Like
system
Closed
February 19, 2023, 5:29pm
3
This topic was automatically closed 7 days after the last reply. New replies are no longer allowed. If you have a query related to it or one of the replies, start a new topic and refer back with a link.