EBernhardson has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/357321 )

Change subject: Parameterize CLI scripts
......................................................................

Parameterize CLI scripts

Adds CLI parameters to the CLI scripts to simplify running them
for different circumstances.

Change-Id: I8915b764cda9bf9996bc76aacff3743ae5c84be2
---
M mjolnir/cli/data_pipeline.py
M mjolnir/cli/training_pipeline.py
M mjolnir/training/xgboost.py
3 files changed, 153 insertions(+), 48 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR 
refs/changes/21/357321/1

diff --git a/mjolnir/cli/data_pipeline.py b/mjolnir/cli/data_pipeline.py
index 7508c0c..f3cd29a 100644
--- a/mjolnir/cli/data_pipeline.py
+++ b/mjolnir/cli/data_pipeline.py
@@ -10,6 +10,7 @@
         mjolnir/cli/data_pipeline.py
 """
 
+import argparse
 import mjolnir.dbn
 import mjolnir.metrics
 import mjolnir.norm_query
@@ -19,18 +20,21 @@
 from pyspark.sql import HiveContext
 from pyspark.sql import functions as F
 
+SEARCH_CLUSTERS = {
+    'eqiad': ['http://elastic%d.eqiad.wmnet:9200/_msearch' % (i) for i in 
range(1017, 1052)],
+    'codfw': ['http://elastic%d.codfw.wmnet:9200/_msearch' % (i) for i in 
range(2001, 2035)],
+}
 
-def main(sc, sqlContext):
+
+def main(sc, sqlContext, input_dir, output_dir, wikis, queries_per_wiki,
+         min_sessions_per_query, search_cluster):
+    # TODO: Should this jar have to be provided on the command line instead?
     sqlContext.sql("ADD JAR 
/mnt/hdfs/wmf/refinery/current/artifacts/refinery-hive.jar")
     sqlContext.sql("CREATE TEMPORARY FUNCTION stemmer AS 
'org.wikimedia.analytics.refinery.hive.StemmerUDF'")
 
-    # TODO: Should be CLI option
-    wikis = ['enwiki', 'dewiki', 'ruwiki', 'frwiki']
-
     # Load click data from HDFS
     df_clicks = (
-        sqlContext.read.parquet(
-            
'hdfs://analytics-hadoop/wmf/data/discovery/query_clicks/daily/year=*/month=*/day=*')
+        sqlContext.read.parquet(input_dir)
         # Limit to the wikis we are working against
         .where(mjolnir.sampling._array_contains(F.array(map(F.lit, wikis)), 
F.col('wikiid')))
         # Clicks and hits contains a bunch of useful debugging data, but we 
don't
@@ -52,8 +56,8 @@
             df_norm,
             wikis=wikis,
             seed=54321,
-            queries_per_wiki=20000,
-            min_sessions_per_query=10)
+            queries_per_wiki=queries_per_wiki,
+            min_sessions_per_query=min_sessions_per_query)
         # Explode source into a row per displayed hit
         .select('*', F.expr("posexplode(hit_page_ids)").alias('hit_position', 
'hit_page_id'))
         .drop('hit_page_ids')
@@ -118,8 +122,9 @@
     # uses query and NOT norm_query_id. Merge those back into the source hits.
     df_features = mjolnir.features.collect(
         df_hits,
-        # TODO: Should be a CLI option of some sort
-        url_list=['http://elastic%d.codfw.wmnet:9200/_msearch' % (i) for i in 
range(2001, 2035)],
+        url_list=SEARCH_CLUSTERS[search_cluster],
+        # TODO: While this works for now, at some point we might want to handle
+        # things like multimedia search from commons, and non-main namespace 
searches.
         indices={wiki: '%s_content' % (wiki) for wiki in wikis},
         # TODO: If we are going to do multiple wikis, this probably needs 
different features
         # per wiki? At a minimum trying to include useful templates as 
features will need
@@ -128,13 +133,40 @@
         feature_definitions=mjolnir.features.enwiki_features())
     df_hits_with_features = df_hits.join(df_features, how='inner', 
on=['wikiid', 'query', 'hit_page_id'])
 
-    
df_hits_with_features.write.parquet('hdfs://analytics-hadoop/user/ebernhardson/mjolnir/hits_with_features')
+    df_hits_with_features.write.parquet(output_dir)
+
+
+def parse_arguments():
+    parser = argparse.ArgumentParser(description='...')
+    parser.add_argument(
+        '-i', '--input', dest='input_dir', type=str,
+        
default='hdfs://analytics-hadoop/wmf/data/discovery/query_clicks/daily/year=*/month=*/day=*',
+        help='Input path, prefixed with hdfs://, to query and click data')
+    parser.add_argument(
+        '-q', '--queries-per-wiki', dest='queries_per_wiki', type=int, 
default=20000,
+        help='The number of normalized queries, per wiki, to operate on')
+    parser.add_argument(
+        '-s', '--min-sessions', dest='min_sessions_per_query', type=int, 
default=10,
+        help='The minimum number of sessions per normalized query')
+    parser.add_argument(
+        '-c', '--search-cluster', dest='search_cluster', type=str, 
default='codfw',
+        choices=SEARCH_CLUSTERS.keys(), help='Search cluster to source 
features from')
+    parser.add_argument(
+        '-o', '--output-dir', dest='output_dir', type=str, required=True,
+        help='Output path, prefixed with hdfs://, to write resulting dataframe 
to')
+    parser.add_argument(
+        'wikis', metavar='wiki', type=str, nargs='+',
+        help='A wiki to generate features and labels for')
+
+    args = parser.parse_args()
+    return dict(vars(args))
 
 
 if __name__ == "__main__":
+    args = parse_arguments()
     sc = SparkContext(appName="MLR: data collection pipeline")
     # spark info logging is incredibly spammy. Use warn to have some hope of
     # human decipherable output
     sc.setLogLevel('WARN')
     sqlContext = HiveContext(sc)
-    main(sc, sqlContext)
+    main(sc, sqlContext, **args)
diff --git a/mjolnir/cli/training_pipeline.py b/mjolnir/cli/training_pipeline.py
index 1edf255..c23a00f 100644
--- a/mjolnir/cli/training_pipeline.py
+++ b/mjolnir/cli/training_pipeline.py
@@ -9,53 +9,116 @@
         path/to/training_pipeline.py
 """
 
-import mjolnir.dbn
-import mjolnir.sampling
-import mjolnir.features
-import mjolnir.training.tuning
+import argparse
 import mjolnir.training.xgboost
+import os
 import pickle
 from pyspark import SparkContext
 from pyspark.sql import HiveContext
+from pyspark.sql import functions as F
 
 
-def main(sc, sqlContext):
-    # TODO: cli argument
-    in_path = 
'hdfs://analytics-hadoop/user/ebernhardson/mjolnir/hits_with_features'
-    df_hits_with_features = sqlContext.read.parquet(in_path)
+def main(sc, sqlContext, input_dir, output_dir, wikis, target_node_evaluations,
+         num_workers, num_cv_jobs, num_folds):
 
-    # Explore a hyperparameter space. Skip the most expensive part of tuning,
-    # increasing the # of trees, with target_node_evaluations=None
-    tune_results = mjolnir.training.xgboost.tune(
-        df_hits_with_features, target_node_evaluations=None)
+    for wiki in wikis:
+        print 'Training wiki: %s' % (wiki)
+        df_hits_with_features = (
+            sqlContext.read.parquet(input_dir)
+            .where(F.col('wikiid') == wiki))
 
-    # Save the tune results somewhere for later analysis. Use pickle
-    # to maintain the hyperopt.Trials objects as is.
-    # TODO: Path should be CLI argument
-    with open('/home/ebernhardson/xgboost_training.pickle', 'w') as f:
-        f.write(pickle.dumps(tune_results))
+        data_size = df_hits_with_features.count()
+        if data_size == 0:
+            print 'No data found.' % (wiki)
+            print ''
+            continue
 
-    # Train a model over all data with best params
-    best_params = tune_results['params']
-    df_grouped, j_groups = mjolnir.training.xgboost.prep_training(
-        df_hits_with_features, 10)
-    best_params['groupData'] = j_groups
-    model = mjolnir.training.xgboost.train(df_grouped, best_params)
+        # Make a guess at the number of fold partitions needed based on data 
size.
+        # This requires there to be around 40k data points, arbitrarily chosen,
+        # per partition used to calculate the folds, up to 100 partitions.
+        num_fold_partitions = min(100, max(1, data_size / 40000))
 
-    print 'train-ndcg@10: %.3f' % (model.eval(df_grouped, j_groups))
+        # Explore a hyperparameter space. Skip the most expensive part of 
tuning,
+        # increasing the # of trees, with target_node_evaluations=None
+        tune_results = mjolnir.training.xgboost.tune(
+            df_hits_with_features, num_folds=num_folds, 
num_fold_partitions=num_fold_partitions,
+            num_cv_jobs=num_cv_jobs, num_workers=num_workers,
+            target_node_evaluations=target_node_evaluations)
 
-    # Generate a feature map so xgboost can include feature names in the dump.
-    # The final `q` indicates all features are quantitative values (floats).
-    features = df_hits_with_features.schema['features'].metadata['features']
-    feat_map = ["%d %s q" % (i, fname) for i, fname in enumerate(features)]
-    # TODO: this path should be CLI argument as well
-    with open('/home/ebernhardson/xgboost_model.json', 'wb') as f:
-        f.write(model.dump("\n".join(feat_map)))
+        # Save the tune results somewhere for later analysis. Use pickle
+        # to maintain the hyperopt.Trials objects as is.
+        tune_output = os.path.join(output_dir, 'tune_%s.pickle' % (wiki))
+        with open(tune_output, 'w') as f:
+            f.write(pickle.dumps(tune_results))
+            print 'Wrote tuning results to %s' % (tune_output)
+
+        print 'CV  test-ndcg@10: %.4f' % (tune_results['metrics']['test'])
+        print 'CV train-ndcg@10: %.4f' % (tune_results['metrics']['train'])
+
+        # Train a model over all data with best params
+        best_params = tune_results['params']
+        print 'Best parameters:'
+        for param, value in best_params.items():
+            print '\t%20s: %s' % (param, value)
+        df_grouped, j_groups = mjolnir.training.xgboost.prep_training(
+            df_hits_with_features, 10)
+        best_params['groupData'] = j_groups
+        model = mjolnir.training.xgboost.train(df_grouped, best_params)
+
+        print 'train-ndcg@10: %.3f' % (model.eval(df_grouped, j_groups))
+
+        # Generate a feature map so xgboost can include feature names in the 
dump.
+        # The final `q` indicates all features are quantitative values 
(floats).
+        features = 
df_hits_with_features.schema['features'].metadata['features']
+        feat_map = ["%d %s q" % (i, fname) for i, fname in enumerate(features)]
+        model_output = os.path.join(output_dir, 'model_%s.json' % (wiki))
+        with open(model_output, 'wb') as f:
+            f.write(model.dump("\n".join(feat_map)))
+            print 'Wrote xgboost json model to %s' % (model_output)
+        print ''
+
+
+def parse_arguments():
+    parser = argparse.ArgumentParser(description='Train XGBoost ranking 
models')
+    parser.add_argument(
+        '-i', '--input', dest='input_dir', type=str, required=True,
+        help='Input path, prefixed with hdfs://, to dataframe with labels and 
features')
+    parser.add_argument(
+        '-o', '--output', dest='output_dir', type=str, required=True,
+        help='Path, on local filesystem, to directory to store the results of '
+             'model training to.')
+    parser.add_argument(
+        '-w', '--workers', dest='num_workers', default=10, type=int,
+        help='Number of workers to train each individual model with. The total 
number '
+             + 'of executors required is workers * cv-jobs. (Default: 10)')
+    parser.add_argument(
+        '-c', '--cv-jobs', dest='num_cv_jobs', default=None, type=int,
+        help='Number of cross validations to perform in parallel. Defaults to 
number '
+             + 'of folds, to run all in parallel.')
+    parser.add_argument(
+        '-f', '--folds', dest='num_folds', default=5, type=int,
+        help='Number of cross validation folds to use. (Default: 5)')
+    parser.add_argument(
+        '-n', '--node-evaluations', dest='target_node_evaluations', type=int, 
default=None,
+        help='Approximate number of node evaluations per predication that '
+             + 'the final result will require. This controls the number of '
+             + 'trees used in the final result. Default uses 100 trees rather '
+             + 'than dynamically choosing based on max_depth. (Default: None)')
+    parser.add_argument(
+        'wikis', metavar='wiki', type=str, nargs='+',
+        help='A wiki to perform model training for.')
+
+    args = parser.parse_args()
+    if args.num_cv_jobs is None:
+        args.num_cv_jobs = args.num_folds
+    return dict(vars(args))
 
 
 if __name__ == "__main__":
+    args = parse_arguments()
     # TODO: Set spark configuration? Some can't actually be set here though, 
so best might be to set all of it
     # on the command line for consistency.
     sc = SparkContext(appName="MLR: training pipeline")
+    sc.setLogLevel('WARN')
     sqlContext = HiveContext(sc)
-    main(sc, sqlContext)
+    main(sc, sqlContext, **args)
diff --git a/mjolnir/training/xgboost.py b/mjolnir/training/xgboost.py
index ccec41a..53c7859 100644
--- a/mjolnir/training/xgboost.py
+++ b/mjolnir/training/xgboost.py
@@ -517,16 +517,22 @@
     # much time as all previous steps combined, and then some, so it can be 
disabled
     # with target_node_evalations of None.
     if target_node_evaluations is None:
-        trials_final = None
+        trials_trees = None
+        trials_final = trials_noise
     else:
         space['num_rounds'] = target_node_evaluations / space['max_depth']
         # TODO: Is 15 steps right amount? too many? too few? This generally
         # uses a large number of trees which takes 10 to 20 minutes per 
evaluation.
         # That means evaluating 15 points is 2.5 to 5 hours.
         space['eta'] = hyperopt.hp.choice('eta', np.linspace(0.01, 0.3, 15))
-        best_final, trials_final = eval_space_grid(space)
-        space['eta'] = _estimate_best_eta(trials_final)
+        best_trees, trials_trees = eval_space_grid(space)
+        trials_final = trials_trees
+        space['eta'] = _estimate_best_eta(trials_trees)
         pprint.pprint(space)
+
+    best_trial = np.argmin(trials_final.losses())
+    loss = trials_final.losses()[best_trial]
+    true_loss = trials_final.results[best_trial].get('true_loss')
 
     return {
         'trials': {
@@ -534,7 +540,11 @@
             'complexity': trials_complexity,
             'gamma': trials_gamma,
             'noise': trials_noise,
-            'final': trials_final,
+            'trees': trials_trees,
         },
         'params': space,
+        'metrics': {
+            'test': -loss,
+            'train': -loss + true_loss
+        }
     }

-- 
To view, visit https://gerrit.wikimedia.org/r/357321
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I8915b764cda9bf9996bc76aacff3743ae5c84be2
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to