EBernhardson has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/357321 )
Change subject: Parameterize CLI scripts
......................................................................
Parameterize CLI scripts
Adds CLI parameters to the CLI scripts to simplify running them
for different circumstances.
Change-Id: I8915b764cda9bf9996bc76aacff3743ae5c84be2
---
M mjolnir/cli/data_pipeline.py
M mjolnir/cli/training_pipeline.py
M mjolnir/training/xgboost.py
3 files changed, 153 insertions(+), 48 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR
refs/changes/21/357321/1
diff --git a/mjolnir/cli/data_pipeline.py b/mjolnir/cli/data_pipeline.py
index 7508c0c..f3cd29a 100644
--- a/mjolnir/cli/data_pipeline.py
+++ b/mjolnir/cli/data_pipeline.py
@@ -10,6 +10,7 @@
mjolnir/cli/data_pipeline.py
"""
+import argparse
import mjolnir.dbn
import mjolnir.metrics
import mjolnir.norm_query
@@ -19,18 +20,21 @@
from pyspark.sql import HiveContext
from pyspark.sql import functions as F
+SEARCH_CLUSTERS = {
+ 'eqiad': ['http://elastic%d.eqiad.wmnet:9200/_msearch' % (i) for i in
range(1017, 1052)],
+ 'codfw': ['http://elastic%d.codfw.wmnet:9200/_msearch' % (i) for i in
range(2001, 2035)],
+}
-def main(sc, sqlContext):
+
+def main(sc, sqlContext, input_dir, output_dir, wikis, queries_per_wiki,
+ min_sessions_per_query, search_cluster):
+ # TODO: Should this jar have to be provided on the command line instead?
sqlContext.sql("ADD JAR
/mnt/hdfs/wmf/refinery/current/artifacts/refinery-hive.jar")
sqlContext.sql("CREATE TEMPORARY FUNCTION stemmer AS
'org.wikimedia.analytics.refinery.hive.StemmerUDF'")
- # TODO: Should be CLI option
- wikis = ['enwiki', 'dewiki', 'ruwiki', 'frwiki']
-
# Load click data from HDFS
df_clicks = (
- sqlContext.read.parquet(
-
'hdfs://analytics-hadoop/wmf/data/discovery/query_clicks/daily/year=*/month=*/day=*')
+ sqlContext.read.parquet(input_dir)
# Limit to the wikis we are working against
.where(mjolnir.sampling._array_contains(F.array(map(F.lit, wikis)),
F.col('wikiid')))
# Clicks and hits contains a bunch of useful debugging data, but we
don't
@@ -52,8 +56,8 @@
df_norm,
wikis=wikis,
seed=54321,
- queries_per_wiki=20000,
- min_sessions_per_query=10)
+ queries_per_wiki=queries_per_wiki,
+ min_sessions_per_query=min_sessions_per_query)
# Explode source into a row per displayed hit
.select('*', F.expr("posexplode(hit_page_ids)").alias('hit_position',
'hit_page_id'))
.drop('hit_page_ids')
@@ -118,8 +122,9 @@
# uses query and NOT norm_query_id. Merge those back into the source hits.
df_features = mjolnir.features.collect(
df_hits,
- # TODO: Should be a CLI option of some sort
- url_list=['http://elastic%d.codfw.wmnet:9200/_msearch' % (i) for i in
range(2001, 2035)],
+ url_list=SEARCH_CLUSTERS[search_cluster],
+ # TODO: While this works for now, at some point we might want to handle
+ # things like multimedia search from commons, and non-main namespace
searches.
indices={wiki: '%s_content' % (wiki) for wiki in wikis},
# TODO: If we are going to do multiple wikis, this probably needs
different features
# per wiki? At a minimum trying to include useful templates as
features will need
@@ -128,13 +133,40 @@
feature_definitions=mjolnir.features.enwiki_features())
df_hits_with_features = df_hits.join(df_features, how='inner',
on=['wikiid', 'query', 'hit_page_id'])
-
df_hits_with_features.write.parquet('hdfs://analytics-hadoop/user/ebernhardson/mjolnir/hits_with_features')
+ df_hits_with_features.write.parquet(output_dir)
+
+
+def parse_arguments():
+ parser = argparse.ArgumentParser(description='...')
+ parser.add_argument(
+ '-i', '--input', dest='input_dir', type=str,
+
default='hdfs://analytics-hadoop/wmf/data/discovery/query_clicks/daily/year=*/month=*/day=*',
+ help='Input path, prefixed with hdfs://, to query and click data')
+ parser.add_argument(
+ '-q', '--queries-per-wiki', dest='queries_per_wiki', type=int,
default=20000,
+ help='The number of normalized queries, per wiki, to operate on')
+ parser.add_argument(
+ '-s', '--min-sessions', dest='min_sessions_per_query', type=int,
default=10,
+ help='The minimum number of sessions per normalized query')
+ parser.add_argument(
+ '-c', '--search-cluster', dest='search_cluster', type=str,
default='codfw',
+ choices=SEARCH_CLUSTERS.keys(), help='Search cluster to source
features from')
+ parser.add_argument(
+ '-o', '--output-dir', dest='output_dir', type=str, required=True,
+ help='Output path, prefixed with hdfs://, to write resulting dataframe
to')
+ parser.add_argument(
+ 'wikis', metavar='wiki', type=str, nargs='+',
+ help='A wiki to generate features and labels for')
+
+ args = parser.parse_args()
+ return dict(vars(args))
if __name__ == "__main__":
+ args = parse_arguments()
sc = SparkContext(appName="MLR: data collection pipeline")
# spark info logging is incredibly spammy. Use warn to have some hope of
# human decipherable output
sc.setLogLevel('WARN')
sqlContext = HiveContext(sc)
- main(sc, sqlContext)
+ main(sc, sqlContext, **args)
diff --git a/mjolnir/cli/training_pipeline.py b/mjolnir/cli/training_pipeline.py
index 1edf255..c23a00f 100644
--- a/mjolnir/cli/training_pipeline.py
+++ b/mjolnir/cli/training_pipeline.py
@@ -9,53 +9,116 @@
path/to/training_pipeline.py
"""
-import mjolnir.dbn
-import mjolnir.sampling
-import mjolnir.features
-import mjolnir.training.tuning
+import argparse
import mjolnir.training.xgboost
+import os
import pickle
from pyspark import SparkContext
from pyspark.sql import HiveContext
+from pyspark.sql import functions as F
-def main(sc, sqlContext):
- # TODO: cli argument
- in_path =
'hdfs://analytics-hadoop/user/ebernhardson/mjolnir/hits_with_features'
- df_hits_with_features = sqlContext.read.parquet(in_path)
+def main(sc, sqlContext, input_dir, output_dir, wikis, target_node_evaluations,
+ num_workers, num_cv_jobs, num_folds):
- # Explore a hyperparameter space. Skip the most expensive part of tuning,
- # increasing the # of trees, with target_node_evaluations=None
- tune_results = mjolnir.training.xgboost.tune(
- df_hits_with_features, target_node_evaluations=None)
+ for wiki in wikis:
+ print 'Training wiki: %s' % (wiki)
+ df_hits_with_features = (
+ sqlContext.read.parquet(input_dir)
+ .where(F.col('wikiid') == wiki))
- # Save the tune results somewhere for later analysis. Use pickle
- # to maintain the hyperopt.Trials objects as is.
- # TODO: Path should be CLI argument
- with open('/home/ebernhardson/xgboost_training.pickle', 'w') as f:
- f.write(pickle.dumps(tune_results))
+ data_size = df_hits_with_features.count()
+ if data_size == 0:
+ print 'No data found.' % (wiki)
+ print ''
+ continue
- # Train a model over all data with best params
- best_params = tune_results['params']
- df_grouped, j_groups = mjolnir.training.xgboost.prep_training(
- df_hits_with_features, 10)
- best_params['groupData'] = j_groups
- model = mjolnir.training.xgboost.train(df_grouped, best_params)
+ # Make a guess at the number of fold partitions needed based on data
size.
+ # This requires there to be around 40k data points, arbitrarily chosen,
+ # per partition used to calculate the folds, up to 100 partitions.
+ num_fold_partitions = min(100, max(1, data_size / 40000))
- print 'train-ndcg@10: %.3f' % (model.eval(df_grouped, j_groups))
+ # Explore a hyperparameter space. Skip the most expensive part of
tuning,
+ # increasing the # of trees, with target_node_evaluations=None
+ tune_results = mjolnir.training.xgboost.tune(
+ df_hits_with_features, num_folds=num_folds,
num_fold_partitions=num_fold_partitions,
+ num_cv_jobs=num_cv_jobs, num_workers=num_workers,
+ target_node_evaluations=target_node_evaluations)
- # Generate a feature map so xgboost can include feature names in the dump.
- # The final `q` indicates all features are quantitative values (floats).
- features = df_hits_with_features.schema['features'].metadata['features']
- feat_map = ["%d %s q" % (i, fname) for i, fname in enumerate(features)]
- # TODO: this path should be CLI argument as well
- with open('/home/ebernhardson/xgboost_model.json', 'wb') as f:
- f.write(model.dump("\n".join(feat_map)))
+ # Save the tune results somewhere for later analysis. Use pickle
+ # to maintain the hyperopt.Trials objects as is.
+ tune_output = os.path.join(output_dir, 'tune_%s.pickle' % (wiki))
+ with open(tune_output, 'w') as f:
+ f.write(pickle.dumps(tune_results))
+ print 'Wrote tuning results to %s' % (tune_output)
+
+ print 'CV test-ndcg@10: %.4f' % (tune_results['metrics']['test'])
+ print 'CV train-ndcg@10: %.4f' % (tune_results['metrics']['train'])
+
+ # Train a model over all data with best params
+ best_params = tune_results['params']
+ print 'Best parameters:'
+ for param, value in best_params.items():
+ print '\t%20s: %s' % (param, value)
+ df_grouped, j_groups = mjolnir.training.xgboost.prep_training(
+ df_hits_with_features, 10)
+ best_params['groupData'] = j_groups
+ model = mjolnir.training.xgboost.train(df_grouped, best_params)
+
+ print 'train-ndcg@10: %.3f' % (model.eval(df_grouped, j_groups))
+
+ # Generate a feature map so xgboost can include feature names in the
dump.
+ # The final `q` indicates all features are quantitative values
(floats).
+ features =
df_hits_with_features.schema['features'].metadata['features']
+ feat_map = ["%d %s q" % (i, fname) for i, fname in enumerate(features)]
+ model_output = os.path.join(output_dir, 'model_%s.json' % (wiki))
+ with open(model_output, 'wb') as f:
+ f.write(model.dump("\n".join(feat_map)))
+ print 'Wrote xgboost json model to %s' % (model_output)
+ print ''
+
+
+def parse_arguments():
+ parser = argparse.ArgumentParser(description='Train XGBoost ranking
models')
+ parser.add_argument(
+ '-i', '--input', dest='input_dir', type=str, required=True,
+ help='Input path, prefixed with hdfs://, to dataframe with labels and
features')
+ parser.add_argument(
+ '-o', '--output', dest='output_dir', type=str, required=True,
+ help='Path, on local filesystem, to directory to store the results of '
+ 'model training to.')
+ parser.add_argument(
+ '-w', '--workers', dest='num_workers', default=10, type=int,
+ help='Number of workers to train each individual model with. The total
number '
+ + 'of executors required is workers * cv-jobs. (Default: 10)')
+ parser.add_argument(
+ '-c', '--cv-jobs', dest='num_cv_jobs', default=None, type=int,
+ help='Number of cross validations to perform in parallel. Defaults to
number '
+ + 'of folds, to run all in parallel.')
+ parser.add_argument(
+ '-f', '--folds', dest='num_folds', default=5, type=int,
+ help='Number of cross validation folds to use. (Default: 5)')
+ parser.add_argument(
+ '-n', '--node-evaluations', dest='target_node_evaluations', type=int,
default=None,
+ help='Approximate number of node evaluations per predication that '
+ + 'the final result will require. This controls the number of '
+ + 'trees used in the final result. Default uses 100 trees rather '
+ + 'than dynamically choosing based on max_depth. (Default: None)')
+ parser.add_argument(
+ 'wikis', metavar='wiki', type=str, nargs='+',
+ help='A wiki to perform model training for.')
+
+ args = parser.parse_args()
+ if args.num_cv_jobs is None:
+ args.num_cv_jobs = args.num_folds
+ return dict(vars(args))
if __name__ == "__main__":
+ args = parse_arguments()
# TODO: Set spark configuration? Some can't actually be set here though,
so best might be to set all of it
# on the command line for consistency.
sc = SparkContext(appName="MLR: training pipeline")
+ sc.setLogLevel('WARN')
sqlContext = HiveContext(sc)
- main(sc, sqlContext)
+ main(sc, sqlContext, **args)
diff --git a/mjolnir/training/xgboost.py b/mjolnir/training/xgboost.py
index ccec41a..53c7859 100644
--- a/mjolnir/training/xgboost.py
+++ b/mjolnir/training/xgboost.py
@@ -517,16 +517,22 @@
# much time as all previous steps combined, and then some, so it can be
disabled
# with target_node_evalations of None.
if target_node_evaluations is None:
- trials_final = None
+ trials_trees = None
+ trials_final = trials_noise
else:
space['num_rounds'] = target_node_evaluations / space['max_depth']
# TODO: Is 15 steps right amount? too many? too few? This generally
# uses a large number of trees which takes 10 to 20 minutes per
evaluation.
# That means evaluating 15 points is 2.5 to 5 hours.
space['eta'] = hyperopt.hp.choice('eta', np.linspace(0.01, 0.3, 15))
- best_final, trials_final = eval_space_grid(space)
- space['eta'] = _estimate_best_eta(trials_final)
+ best_trees, trials_trees = eval_space_grid(space)
+ trials_final = trials_trees
+ space['eta'] = _estimate_best_eta(trials_trees)
pprint.pprint(space)
+
+ best_trial = np.argmin(trials_final.losses())
+ loss = trials_final.losses()[best_trial]
+ true_loss = trials_final.results[best_trial].get('true_loss')
return {
'trials': {
@@ -534,7 +540,11 @@
'complexity': trials_complexity,
'gamma': trials_gamma,
'noise': trials_noise,
- 'final': trials_final,
+ 'trees': trials_trees,
},
'params': space,
+ 'metrics': {
+ 'test': -loss,
+ 'train': -loss + true_loss
+ }
}
--
To view, visit https://gerrit.wikimedia.org/r/357321
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I8915b764cda9bf9996bc76aacff3743ae5c84be2
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits