From the same doc you site:
partition()
splits flights1
into roughly equal subsets on each worker, ensuring that all rows in a group are transfered to the same worker. The result is a party_df
, or partitioned data frame.
So, the shards you see in the result ( 166,251--170,525 rows
) are the rows split across workers, while keeping the original groups together.
Yes, the way to get the data back into the interactive session is through collect()
:
Once you have a partitioned data frame, you can operate on it with the usual dplyr verbs. To bring the data back to the interactive process, use collect()
I'm not totally sure what you mean by this.
From looking at the source code for party_df()
, I'm not sure if you can access results of a party_df
by shard.
The order is preserved when you collect, so I guess you could literally subset the data by rows once collected:
library(multidplyr)
library(dplyr, warn.conflicts = FALSE)
library(nycflights13)
cluster <- new_cluster(2)
cluster
#> 2 session cluster [..]
flights1 <- flights %>% group_by(dest) %>% partition(cluster)
result <- flights1
result$cluster
#> 2 session cluster [..]
str(result)
#> List of 3
#> $ cluster :List of 2
#> ..$ :Classes 'r_session', 'process', 'R6' PROCESS 'R', running, pid 10764.
#>
#> ..$ :Classes 'r_session', 'process', 'R6' PROCESS 'R', running, pid 10768.
#>
#> ..- attr(*, "cleaner")=Classes 'Cleaner', 'R6' <Cleaner>
#> Public:
#> add: function (x)
#> clone: function (deep = FALSE)
#> names:
#> reset: function (x)
#> ..- attr(*, "class")= chr "multidplyr_cluster"
#> $ name : symbol _DF1
#> $ .auto_clean:<environment: 0x7f9f5f2827d0>
#> - attr(*, "class")= chr "multidplyr_party_df"
multidplyr:::tbl_vars.multidplyr_party_df(result)
#> <dplyr:::vars>
#> [1] "year" "month" "day" "dep_time"
#> [5] "sched_dep_time" "dep_delay" "arr_time" "sched_arr_time"
#> [9] "arr_delay" "carrier" "flight" "tailnum"
#> [13] "origin" "dest" "air_time" "distance"
#> [17] "hour" "minute" "time_hour"
#> Groups:
#> [1] "dest"
collected <- collect(result)
collected[1:166251,]
#> # A tibble: 166,251 x 19
#> # Groups: dest [58]
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> <int> <int> <int> <int> <int> <dbl> <int> <int>
#> 1 2013 1 1 557 600 -3 709 723
#> 2 2013 1 1 557 600 -3 838 846
#> 3 2013 1 1 558 600 -2 849 851
#> 4 2013 1 1 558 600 -2 853 856
#> 5 2013 1 1 559 559 0 702 706
#> 6 2013 1 1 559 600 -1 854 902
#> 7 2013 1 1 601 600 1 844 850
#> 8 2013 1 1 602 610 -8 812 820
#> 9 2013 1 1 602 605 -3 821 805
#> 10 2013 1 1 615 615 0 1039 1100
#> # … with 166,241 more rows, and 11 more variables: arr_delay <dbl>,
#> # carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
#> # air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm>
Created on 2021-04-05 by the reprex package (v1.0.0)