EBernhardson has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/349286 )

Change subject: Example script of full training pipeline
......................................................................

Example script of full training pipeline

This script gives a general overview of how the pieces of the
mjolnir library work together. It might also provide some area
to bikeshed about what the api's should look like.

In the long run there will be some more advanced version of this script
with various parameters for doing feature engineering, and there will
be python scripts in the discovery analytics repository called by oozie
that run pieces of the pipeline saving intermediate stages that are
potentially useful for feature engineering, and automating the process
of building a model from beginning to end.

Change-Id: If5e06ad51408687ccac9491edc7ea91ca2d57cf7
---
A mjolnir/cli/training_pipeline.py
1 file changed, 107 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR 
refs/changes/86/349286/1

diff --git a/mjolnir/cli/training_pipeline.py b/mjolnir/cli/training_pipeline.py
new file mode 100644
index 0000000..ad80f7a
--- /dev/null
+++ b/mjolnir/cli/training_pipeline.py
@@ -0,0 +1,107 @@
+"""
+Example script demonstrating the full LTR pipeline. It may
+not be desirable to run this all at once, but rather saving
+intermediate stages to HDFS for later analysis. This is mostly
+to demonstrate how everything ties together
+
+To run:
+    PYSPARK_PYTHON=MJOLNIR/venv/bin/python spark-submit \
+        --jars 
hdfs://analytics-hadoop/wmf/refinery/current/artifacts/refinery-hive.jar \
+        --artifacts 'mjolnir_venv.zip#MJOLNIR' \
+        path/to/training_pipeline.py
+"""
+
+import mjolnir.dbn
+import mjolnir.sampling
+import mjolnir.features
+import mjolnir.training
+import mjolnir.training.xgboost
+from pyspark import SparkContext
+from pyspark.sql import HiveContext
+from pyspark.sql import functions as F
+
+
+def main(sc, sqlContext):
+    sqlContext.sql("CREATE TEMPORARY FUNCTION stemmer AS 
'org.wikimedia.analytics.refinery.hive.StemmerUDF'")
+
+    # Load click data from HDFS
+    df_clicks = (
+        sqlContext.read.parquet(
+            
'hdfs://analytics-hadoop/wmf/data/discovery/query_clicks/daily/year=*/month=*/day=*')
+        # Clicks and hits contains a bunch of useful debugging data, but we 
don't
+        # need any of that here. Save a bunch of memory by only working with
+        # lists of page ids
+        .withColumn('hit_page_ids', F.col('hits.pageid'))
+        .drop('hits')
+        .withColumn('click_page_ids', F.col('clicks.pageid'))
+        .drop('clicks')
+        # Normalize queries using the lucene stemmer
+        .withColumn('norm_query', F.expr('stemmer(query, substring(wikiid, 1, 
2))')))
+
+    # Sample to some subset of queries per wiki
+    df_sampled = mjolnir.sampling.sample(
+        df_clicks,
+        wikis=['enwiki', 'dewiki', 'ruwiki', 'frwiki'],
+        seed=54321,
+        queries_per_wiki=10000,
+        min_sessions_per_query=10)
+    df_sampled = df_sampled.cache()
+
+    # Learn relevances
+    df_rel = mjolnir.dbn.train(df_sampled, {
+            'MAX_ITERATIONS': 40,
+            'DEBUG': False,
+            'PRETTY_LOG': True,
+            'MIN_DOCS_PER_QUERY': 10,
+            'MAX_DOCS_PER_QUERY': 20,
+            'SERP_SIZE': 20,
+            'QUERY_INDEPENDENT_PAGER': False,
+            'DEFAULT_REL': 0.5})
+
+    df_hits = (
+        df_sampled
+        # Explode source into a row per displayed hit, reduce to a row per
+        # unique (wikiid, query, page_id) and add a count of the number of
+        # duplicates removed.
+        .withColumn('hit_page_id', F.explode('hit_page_ids'))
+        .drop('hit_page_ids')
+        .groupBy('wikiid', 'query', 'norm_query', 'hit_page_id')
+        .agg(F.count(F.lit(1)).alias('weight'))
+        # Join in the relevance labels
+        .join(df_rel, how='inner', on=['wikiid', 'norm_query', 'hit_page_id']))
+
+    # TODO: Nothing below here is implemented, and is only included
+    # as a rough estimate of what will be implemented.
+
+    # Collect features for all known queries. Note that this intentionally
+    # uses query and NOT norm_query. Merge those back into the source hits.
+    df_hits_with_features = df_hits.join(
+        mjolnir.features.collect(df_hits,
+                                 
url='https://search.svc.eqiad.wmnet:9243/_msearch',
+                                 indices={'enwiki': 'enwiki_content'},
+                                 
feature_definitions=mjolnir.features.enwiki_features()),
+        how='inner', on=['wikiid', 'query', 'hit_page_id'])
+
+    # Generate test/train/validate splits
+    split_names = ['x_train', 'x_test', 'x_vali', 'y_train', 'y_test', 
'y_vali']
+    splits = mjolnir.training.split(
+        df_hits_with_features, splits=(0.6, 0.2, 0.2))
+
+    # Write out splits in a format suitable for a training library
+    for name, split in zip(split_names, splits):
+        mjolnir.training.xgboost.write(
+            'hdfs://analytics-hadoop/user/ebernhardson/foo/%s' % (name),
+            split)
+
+    # Train the model
+    best_model = mjolnir.training.xgboost.train('????')
+
+    # Write out the model in a format suitable for loading into
+    # the elasticsearch plugin 
+    mjolnir.training.xgboost.write_model(best_model, 
'file:///home/ebernhardson/xgboost_model.xml')
+
+
+if __name__ == "__main__":
+    sc = SparkContext(appName="LTR: training pipeline")
+    sqlContext = HiveContext(sc)
+    main(sc, sqlContext)

-- 
To view, visit https://gerrit.wikimedia.org/r/349286
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: If5e06ad51408687ccac9491edc7ea91ca2d57cf7
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to