EBernhardson has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/395063 )
Change subject: Allow spark to keep the full data pipeline in memory ...................................................................... Allow spark to keep the full data pipeline in memory Something with the upgrade to spark 2.1.2 has caused us to recompute lots of data over and over again in the data pipeline. This is particularly egregious for normalization and feature collection steps which take an hour each on a full run of data. I tested out simply not unpersisting our data and everything seems to work fine. We have ~100G of memory available for caching and only end up using 50G by not unpersisting. Figuring out what data is available is also much easier if we don't have to think about when to unpersist what data. Change-Id: Iedf259e481055444f369c528a56bee372e57595e --- M mjolnir/sampling.py M mjolnir/utilities/data_pipeline.py 2 files changed, 1 insertion(+), 19 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR refs/changes/63/395063/1 diff --git a/mjolnir/sampling.py b/mjolnir/sampling.py index d7d1f2b..88a93c7 100644 --- a/mjolnir/sampling.py +++ b/mjolnir/sampling.py @@ -196,8 +196,5 @@ df .join(df_queries_sampled, how='inner', on=['wikiid', 'norm_query_id']) .cache()) - df_sampled.count() - df.unpersist() - df_queries_unique.unpersist() return hit_page_id_counts, df_sampled diff --git a/mjolnir/utilities/data_pipeline.py b/mjolnir/utilities/data_pipeline.py index 20b06f0..5d26065 100644 --- a/mjolnir/utilities/data_pipeline.py +++ b/mjolnir/utilities/data_pipeline.py @@ -69,11 +69,6 @@ seed=54321, samples_per_wiki=samples_per_wiki) - # This should already be cached from sample, but lets be explicit - # to prevent future problems with refactoring. - df_sampled_raw.cache().count() - df_norm.unpersist() - # Transform our dataframe into the shape expected by the DBN df_sampled = ( df_sampled_raw @@ -85,13 +80,11 @@ .drop('click_page_ids') .cache()) - # materialize df_sampled and unpersist df_norm - nb_samples = df_sampled.count() - df_sampled_raw.unpersist() # Target around 125k rows per partition. Note that this isn't # how many the dbn will see, because it gets collected up. Just # a rough guess. + nb_samples = df_sampled.count() dbn_partitions = int(max(200, min(2000, nb_samples / 125000))) # Learn relevances @@ -114,10 +107,6 @@ .join(df_rel, how='inner', on=['wikiid', 'norm_query_id', 'hit_page_id']) .cache()) - # materialize df_all_hits and drop df_sampled, df_norm - df_all_hits.count() - df_sampled.unpersist() - # TODO: Training is per-wiki, should this be as well? weightedNdcgAt10 = mjolnir.metrics.ndcg(df_all_hits, 10, query_cols=['wikiid', 'query', 'session_id']) print 'weighted ndcg@10: %.4f' % (weightedNdcgAt10) @@ -133,10 +122,6 @@ F.first('label').alias('label'), F.first('relevance').alias('relevance')) .cache()) - - # materialize df_hits and drop df_all_hits - df_hits.count() - df_all_hits.unpersist() actual_samples_per_wiki = df_hits.groupby('wikiid').agg(F.count(F.lit(1)).alias('n_obs')).collect() actual_samples_per_wiki = {row.wikiid: row.n_obs for row in actual_samples_per_wiki} -- To view, visit https://gerrit.wikimedia.org/r/395063 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Iedf259e481055444f369c528a56bee372e57595e Gerrit-PatchSet: 1 Gerrit-Project: search/MjoLniR Gerrit-Branch: master Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits