I have been able to confirm that my (clunky) solution, which I originally mentioned regarding local Spark, does indeed work on a cluster. It works in my situation because I have very long data (millions of rows) that is not very wide (thousands of columns), so distributing the dataframe in Spark makes sense.
In short, you paste the two matrices side-by-side in a single dataframe, use sparklyr::spark_apply to leverage the Matrix::crossprod function (on the subsets of the dataframe corresponding to the original two matrices) in Spark, append a column containing the vector of ID values as ordered in the dataframe (repeated for the number of partitions), and then summarize all of the columns by the ID value to return a single row per ID.
Below is a small example of how my solution works on distributed data with a matrix of Items x Attributes and a matrix of Items x Users that I want to get the cross-product of to return a matrix of Users x Attributes profiles.
library(sparklyr)
library(Matrix)
library(dplyr)
library(readr)
sc <- sparklyr::spark_connect(master = "local")
itemAttribute <- readr::read_csv('"Item_ID","Attr_1","Attr_2","Attr_3","Attr_4","Attr_5","Attr_6","Attr_7","Attr_8"
111,0,0.5,0.5,0,0.5,0.5,0,0
222,0.5,0,0,0.5,0.5,0,0.5,0
333,0,0.577350269189626,0,0.577350269189626,0.577350269189626,0,0,0
444,0,0,0,0,0.577350269189626,0.577350269189626,0,0.577350269189626
555,0.707106781186547,0,0,0.707106781186547,0,0,0,0')
itemUser <- readr::read_csv('"Item_ID","User_1","User_2"
111,0,1
222,-1,1
333,1,0
444,0,1
555,1,-1')
singleDf <- itemAttribute %>% dplyr::left_join(itemUser, by = "Item_ID")
singleSparkDf <- sparklyr::sdf_copy_to(sc,
singleDf,
"singleDfNamed",
overwrite = TRUE,
repartition = 2)
attrColNames <- base::colnames(itemAttribute %>% dplyr::select(-Item_ID))
userColNames <- base::colnames(itemUser %>% dplyr::select(-Item_ID))
userColIndexLast <- base::ncol(singleDf)
userColIndexFirst <- base::ncol(singleDf) - base::length(userColNames) + 1
attrColIndexLast <- userColIndexFirst - 1
attrColIndexFirst <- attrColIndexLast - base::length(attrColNames) + 1
userAttribute_partitions <- singleSparkDf %>% sparklyr::spark_apply(function(df, context) {
for (name in names(context)) assign(name, context[[name]], envir = .GlobalEnv)
profiles <- Matrix::crossprod(Matrix::as.matrix(base::as.data.frame(df)[,userColIndexFirst:userColIndexLast]),
Matrix::as.matrix(base::as.data.frame(df)[,attrColIndexFirst:attrColIndexLast]))
return(base::as.data.frame(profiles))
}, context = list(
userColIndexLast = userColIndexLast,
userColIndexFirst = userColIndexFirst,
attrColIndexLast = attrColIndexLast,
attrColIndexFirst = attrColIndexFirst
))
userColNames_partitions <- base::rep(userColNames, sparklyr::sdf_num_partitions(singleSparkDf))
User_ID <- userColNames_partitions
userColNames_partitionsSpark <- sparklyr::sdf_copy_to(sc,
dplyr::tibble(User_ID),
"userColNames_partitionsNamed",
overwrite = TRUE)
userAttribute_partitionsUserID <- userAttribute_partitions %>%
sparklyr::sdf_bind_cols(userColNames_partitionsSpark)
userAttribute <- userAttribute_partitionsUserID %>%
dplyr::group_by(User_ID) %>%
dplyr::summarise_all(sum) %>%
dplyr::arrange(User_ID)
userAttribute
## for comparison, crossprod of original matrices in R session (no Spark)
Matrix::crossprod(Matrix::as.matrix(itemUser %>% dplyr::select(-Item_ID)),
Matrix::as.matrix(itemAttribute %>% dplyr::select(-Item_ID)))
In this example, the cross-product gives us the User-Attribute weight which is the sum of each Item's (User preference x Attribute flag) value. It is fine to partition the data by rows because each row still contain's an Item's Attribute flag and User preference, so the product can be calculated. These products are summed within each partition by using the Matrix::crossprod function within sparklyr::spark_apply, so then the partitioned results just need to be summed to complete the calculation and return a single vector of Attribute weights for each User.