Managing intermediate results when using R/sparklyr
Win-Vector Blog 2017-06-22
In our latest “R and big data” article we show how to manage intermediate results in non-trivial Apache Spark workflows using R, sparklyr, dplyr, and replyr.
Handle management
Many Sparklyr
tasks involve creation of intermediate or temporary tables. This can be through dplyr::copy_to()
and through dplyr::compute()
. These handles can represent a reference leak and eat up resources.
To help control handle lifetime the replyr
supplies record-retaining temporary name generators (and uses the same internally).
The actual function is pretty simple:
print(replyr::makeTempNameGenerator)
## function(prefix,## suffix= NULL) {## force(prefix)## if((length(prefix)!=1)||(!is.character(prefix))) {## stop("repyr::makeTempNameGenerator prefix must be a string")## }## if(is.null(suffix)) {## alphabet <- c(letters, toupper(letters), as.character(0:9))## suffix <- paste(base::sample(alphabet, size=20, replace= TRUE),## collapse = '')## }## count <- 0## nameList <- c()## function(dumpList=FALSE) {## if(dumpList) {## v <- nameList## nameList <<- c()## return(v)## }## nm <- paste(prefix, suffix, sprintf('%010d',count), sep='_')## nameList <<- c(nameList, nm)## count <<- count + 1## nm## }## }## <bytecode: 0x7f8659110708>## <environment: namespace:replyr>
For instance to join a few tables it can be a good idea to call compute after each join (else the generated SQL
can become large and unmanageable). This sort of code looks like the following:
# create example datanames <- paste('table', 1:5, sep='_')tables <- lapply(names, function(ni) { di <- data.frame(key= 1:3) di[[paste('val',ni,sep='_')]] <- runif(nrow(di)) copy_to(sc, di, ni) })# build our temp name generatortmpNamGen <- replyr::makeTempNameGenerator('JOINTMP')# left join the tables in sequencejoined <- tables[[1]]for(i in seq(2,length(tables))) { ti <- tables[[i]] if(i<length(tables)) { joined <- compute(left_join(joined, ti, by='key'), name= tmpNamGen()) } else { # use non-temp name. joined <- compute(left_join(joined, ti, by='key'), name= 'joinres') }}# clean up tempstemps <- tmpNamGen(dumpList = TRUE)print(temps)
## [1] "JOINTMP_9lWXvfnkhI2NPRsA1tEh_0000000000"## [2] "JOINTMP_9lWXvfnkhI2NPRsA1tEh_0000000001"## [3] "JOINTMP_9lWXvfnkhI2NPRsA1tEh_0000000002"
for(ti in temps) { db_drop_table(sc, ti)}# show resultprint(joined)
## Source: query [3 x 6]## Database: spark connection master=local[4] app=sparklyr local=TRUE## ## # A tibble: 3 x 6## key val_table_1 val_table_2 val_table_3 val_table_4 val_table_5## <int> <dbl> <dbl> <dbl> <dbl> <dbl>## 1 1 0.7594355 0.8082776 0.696254059 0.3777300 0.30015615## 2 2 0.4082232 0.8101691 0.005687125 0.9382002 0.04502867## 3 3 0.5941884 0.7990701 0.874374779 0.7936563 0.19940400
Careful introduction and management of materialized intermediates can conserve resources (both time and space) and greatly improve outcomes. We feel it is a good practice to set up an explicit temp name manager, pass it through all your Sparklyr
transforms, and then clear temps in batches after the results no longer depend on the intermediates.
Edit 7-22-2017: after some investigation of an issue (please see sparklyr
issue 721 and sparklyr
830326c) it turns out sparklyr
does not in fact save a data snapshot when compute()
is called. It appears to cache something like a query. This is despite the dplyr::compute()
documentation (the close one gets to a public specification in this “verse
“) stating:
compute() stores results in a remote temporary table.
This is inconsistent with my earlier claim that “Careful introduction and management of materialized intermediates can conserve resources (both time and space) and greatly improve outcomes” on sparklyr
.
I apologize for this.
We have been using dplyr::compute()
in production, at production scale, with clients, on a number of remote data sources (PostgreSQL
, MySQL
, and Sparklyr
). On many of these sources we have seen the introduction of compute()
make the difference between success and failure in long calculation (such as binding rows). We have also been using the claims that dplyr
is somewhat uniform across implementations to try and collect and build best-practices (and work-arounds) into our replyr
package.
What I had not done is deliberately turned off compute()
with sparklyr
to see if it breaks. Frankly so much breaks with dplyr
0.7.0
that there just isn’t time to do enough lesion studies to see what combination of patches is absolutely necessary on each different database realization. One is going to need something like dplyr::compute()
to do significant work, and something like the temp manager describe above to prevent such a system from leaking references. For significant remote data sources the analyst is going to have to signal intent, as simple client-side garbage collecting is not going to be enough to control result lifetimes.
I hope this error has not lowered our reader’s trust of this series. The sparklyr
/Spark
combination is very powerful, and does have some rough edges. To my mind this means defensive coding practices (such as we are trying to describe in this series) are in fact very much needed. It just turns out we need even more of them.
There is a patch coming- but I think it is going to be problematic as I believe the “don’t save the data in memory” behavior of compute()
has been hiding some limitations of the “fire and forget compute()
” concept (for one thing deleting intermediates early will become critical).
To work around the issue one would have to realize some intermediate computations into a more durable an isolated form, such as writing an Apache Parquet file. This would require an even stronger intermediate manager discipline as we would need to store possibly different “how to delete” annotations for different types of stored intermediate results. We will be looking into this.
sdf_checkpoint()
has been suggested as a work around, but that function is not exported in the CRAN
release of sparklyr
(0.5.6
as of June 22nd, 2017) and the documentation is lacking:
#' Checkpoint a Spark DataFrame#'#' @param x an object coercible to a Spark DataFrame#' @param eager whether to truncate the lineage of the DataFrame#' @export
Obviously given experience with dplyr::compute()
I am less willing to apply syllogisms of the form “well, it must have some way to handle this difficult case otherwise what would be the point?”
Again, I apologize for my part in this shamble. I have been trying to bring organization and clarity to the reader, and it looks like for that goal I overly trusted the project claims and documentation (plus related experience on other dplyr
systems). My intent is to help, but the above is below my standards.