Hello everyone,
I have some existing (base) R code that uses some nested 'for' loops to iterate over a data frame and this takes quite some time.
To increase performance I created a Hadoop/Spark cluster and transferred the dataframe that needs to be manipulated by my code to a spark dataframe.
The dataframe contains 14 columns of which 3 columns (lost_offer, won_offer, last_offer) that contain all zeros. Based on other variables, the code decides whether or not these zeros should become ones.
What the current code does is iterate over the customers (also a column in my df), then iterate over each sku (read 'product group'). So for each customer-sku combination the code looks at some other columns (e.g type, auto_create etc) to decide wether or not the zeros in the last 3 columns should be changed to a '1'.
Below you see the old (base) R code which selects a chunk of data that relates to a unique 'customer-sku' combination (stored under 'select_data') and the main dataframe (country_new1) gets changed by analyzing the chunk of data (select_data).
n <- 0 ##Initialize a counter to reach at right row number
# print(paste0("n=", n))
start = Sys.time()
#1###########################################################
#first loop across customers
for(i in 1:length(customers)){
# print(unique(country_new1$updated_actual_related_customer)[i])
customer_c <- customers[i]
sdf <- country_new1 %>% filter(updated_actual_related_customer == customer_c) %>% distinct(sku)
skus <- sdf_read_column(sdf, "sku")
#2###########################################################
#Second loop across sku for each customer
for(j in 1:length(skus)){
# print(paste0("sku = ",skus[j]))
select_data <- country_new1 %>% filter(updated_actual_related_customer == customer_c, sku == skus[j]) %>% collect()
nrow_sd <- nrow(select_data)
# print(paste0("#rows for given cust-sku=",nrow_sd))
if(nrow_sd==1){
## We check if there is only one order without any offer..
# print("Only One Order - so Won")
# print(paste0("Row#=",n+k))
if(select_data$type[1]=='offer'){
country_new1 %>% mutate(final_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
country_new1 %>% mutate(lost_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
}
}
else if(nrow_sd>1){
#3###########################################################
#third loop for each row
for(k in 1:nrow_sd){ ## A loop for #rows in a sku for a customer
# print(paste0("K loop will go from ",k," to ",nrow_sd))
# print(paste0("k=",k))
if(k==nrow_sd){ ### If there is only one row in selected data or the last row of selected data is an offer.. It is final and lost
if(select_data$type[k]=='offer'){
dplyr::mutate(country_new1, final_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
dplyr::mutate(country_new1, lost_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
break
}
else if(select_data$type[k]=='order'){break} ## Do not process the last row, when it's an order
}
if(select_data$type[k] == 'offer'){
Current_date <- select_data$orderoffer_date[k] ###Store date of current row#
given_unitprice <- select_data$unitprice_transaction_currency[k] ###Store unit price of current row#
Is_autocreated <- select_data$auto_create[k]
#4###########################################################
#fourth loop within 30days window
for(l in k+1:nrow_sd){
if(l>nrow_sd){break}else(
### If next row is within 30 days
if(select_data$orderoffer_date[l]>=Current_date & select_data$orderoffer_date[l] < Current_date+30){
if(select_data$type[l]=='offer'){ ## First is an offer, second is also an offer
if(Is_autocreated=='Y' & select_data$auto_create[l]=='Y'){
# print("First Offer is AC and second is also AC, move on..")
break} ## If First Offer is AC and second is also AC, stop for this offer
if(Is_autocreated=='Y' & select_data$auto_create[l]=='N'){
# print("first Offer is AC and second is MC, AC is neither won/loss, move on..")
break} ## If first Offer is AC and second is MC, AC is neither won/loss, move on
if(Is_autocreated=='N' & select_data$auto_create[l]=='Y'){ ## If first Offer is MC and second is AC, check prices
# print("first Offer is MC and second is AC. Check prices..")
if(select_data$unitprice_transaction_currency[l] == given_unitprice){
# print("Offer with same price")
break ###If we found that there is a similar offer available in next - 30 days, End further iterations
}
else if(select_data$unitprice_transaction_currency[l] != given_unitprice){
# print("Offer with diff price")
# print(paste0("Row#=",n+k))
if(select_data$qty_disc[l]==1){break}
else {
country_new1 %>% mutate(final_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
country_new1 %>% mutate(lost_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
break
}
}
}
if(Is_autocreated=='N' & select_data$auto_create[l]=='N'){ ## Both are manual offers, Normal case
if(select_data$unitprice_transaction_currency[l] == given_unitprice){
# print("Offer with same price")
break ###If we found that there is a similar offer available in next - 30 days, End further iterations
}
else if(select_data$unitprice_transaction_currency[l] != given_unitprice){
# print("Offer with diff price")
# print(paste0("Row#=",n+k))
if(select_data$qty_disc[l]==1){break}
else {
country_new1 %>% mutate(final_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
country_new1 %>% mutate(lost_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
break
}
## We established that this is a final-lost offer. break the loop
}
}
}
else if(select_data$type[l]=='order'){
if(select_data$unitprice_transaction_currency[l]==given_unitprice){
# print("Order with same price - Won Offer")
# print(paste0("Row#=",n+k))
country_new1 %>% mutate(final_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
country_new1 %>% mutate(Won_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
country_new1 %>% mutate(lost_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 0,1))
break
}
else if(select_data$unitprice_transaction_currency[l]>given_unitprice){
# print("Order with higher price - Not a final offer")
break
}
else if(select_data$unitprice_transaction_currency[l]<given_unitprice){
# print("Order with low price")
if(select_data$qty_disc[l]==1){
country_new1 %>% mutate(final_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
country_new1 %>% mutate(Won_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
country_new1 %>% mutate(lost_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 0,1))
break
}
else {
country_new1 %>% mutate(final_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
country_new1 %>% mutate(Won_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 0,1))
country_new1 %>% mutate(lost_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
break
}
}
}
}
else if(select_data$orderoffer_date[l] >= Current_date+30){
# print("Current date is more than 30 days")
country_new1 %>% mutate(final_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
country_new1 %>% mutate(lost_offer = ifelse(updated_actual_related_customer == customer_c & sku == skus[j] , 1,0))
break
}
)}
}
}
}
n <- n + nrow_sd ##Value of n should be updated once loop has run for selected data
#print(paste0("New n=", n))
}
}
I'm trying to get rid of at least the outer 2 loops by using spark_apply( ..... , group_by = c("customers", "skus")) as you can see on the code below. I want to reuse the 2 inner loops by copy-pasting them as a function. In order to keep my old code unchanged, I need a way to iterate with an index over the chunks of data corresponding to the 'customer-sku' combination. Does anyone know how to do that?
countries_tbl <- copy_to(sc,country_new1)
results_tbl <- spark_apply(countries_tbl, function(country_new1) {
nrow_sd = count()
dplyr::mutate(test = nrow_sd)
if (nrow_sd == 1) {
if (country_new1[type] == 'offer') {
dplyr::mutate(country_new1, final_offer <- 1, lost_offer <- 1)
}
else {
dplyr::mutate(country_new1, Won_offer <- 1)
}
}
} , group_by = c("updated_actual_related_customer", "sku"))
i'm looking forward to your answers!
Kind Regards,
Charles