EBernhardson has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/395063 )

Change subject: Allow spark to keep the full data pipeline in memory
......................................................................

Allow spark to keep the full data pipeline in memory

Something with the upgrade to spark 2.1.2 has caused us to
recompute lots of data over and over again in the data pipeline.
This is particularly egregious for normalization and feature collection
steps which take an hour each on a full run of data.

I tested out simply not unpersisting our data and everything seems to
work fine. We have ~100G of memory available for caching and only
end up using 50G by not unpersisting. Figuring out what data is
available is also much easier if we don't have to think about when
to unpersist what data.

Change-Id: Iedf259e481055444f369c528a56bee372e57595e
---
M mjolnir/sampling.py
M mjolnir/utilities/data_pipeline.py
2 files changed, 1 insertion(+), 19 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR 
refs/changes/63/395063/1

diff --git a/mjolnir/sampling.py b/mjolnir/sampling.py
index d7d1f2b..88a93c7 100644
--- a/mjolnir/sampling.py
+++ b/mjolnir/sampling.py
@@ -196,8 +196,5 @@
         df
         .join(df_queries_sampled, how='inner', on=['wikiid', 'norm_query_id'])
         .cache())
-    df_sampled.count()
-    df.unpersist()
-    df_queries_unique.unpersist()
 
     return hit_page_id_counts, df_sampled
diff --git a/mjolnir/utilities/data_pipeline.py 
b/mjolnir/utilities/data_pipeline.py
index 20b06f0..5d26065 100644
--- a/mjolnir/utilities/data_pipeline.py
+++ b/mjolnir/utilities/data_pipeline.py
@@ -69,11 +69,6 @@
         seed=54321,
         samples_per_wiki=samples_per_wiki)
 
-    # This should already be cached from sample, but lets be explicit
-    # to prevent future problems with refactoring.
-    df_sampled_raw.cache().count()
-    df_norm.unpersist()
-
     # Transform our dataframe into the shape expected by the DBN
     df_sampled = (
         df_sampled_raw
@@ -85,13 +80,11 @@
         .drop('click_page_ids')
         .cache())
 
-    # materialize df_sampled and unpersist df_norm
-    nb_samples = df_sampled.count()
-    df_sampled_raw.unpersist()
 
     # Target around 125k rows per partition. Note that this isn't
     # how many the dbn will see, because it gets collected up. Just
     # a rough guess.
+    nb_samples = df_sampled.count()
     dbn_partitions = int(max(200, min(2000, nb_samples / 125000)))
 
     # Learn relevances
@@ -114,10 +107,6 @@
         .join(df_rel, how='inner', on=['wikiid', 'norm_query_id', 
'hit_page_id'])
         .cache())
 
-    # materialize df_all_hits and drop df_sampled, df_norm
-    df_all_hits.count()
-    df_sampled.unpersist()
-
     # TODO: Training is per-wiki, should this be as well?
     weightedNdcgAt10 = mjolnir.metrics.ndcg(df_all_hits, 10, 
query_cols=['wikiid', 'query', 'session_id'])
     print 'weighted ndcg@10: %.4f' % (weightedNdcgAt10)
@@ -133,10 +122,6 @@
              F.first('label').alias('label'),
              F.first('relevance').alias('relevance'))
         .cache())
-
-    # materialize df_hits and drop df_all_hits
-    df_hits.count()
-    df_all_hits.unpersist()
 
     actual_samples_per_wiki = 
df_hits.groupby('wikiid').agg(F.count(F.lit(1)).alias('n_obs')).collect()
     actual_samples_per_wiki = {row.wikiid: row.n_obs for row in 
actual_samples_per_wiki}

-- 
To view, visit https://gerrit.wikimedia.org/r/395063
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Iedf259e481055444f369c528a56bee372e57595e
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to