EBernhardson has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/406070 )
Change subject: Add end-to-end integration test ...................................................................... Add end-to-end integration test A basic end to end run through of the training pipeline. It's of course a bit slow, but worthwhile to see the whole operation run from end to end. * verifies that the general premise works * outputs from one stage are expected by the input from the next * models are in expected places and loadable * evaluations run against the models match train time metrics Change-Id: I8ad5fe1dbbbd50b897362b44411cfc19650b0390 --- M jvm/src/main/scala/org/wikimedia/search/mjolnir/PythonUtils.scala M mjolnir/test/conftest.py A mjolnir/test/fixtures/requests/test_integration.sqlite3 M mjolnir/test/training/test_xgboost.py M mjolnir/training/xgboost.py M mjolnir/utilities/data_pipeline.py M mjolnir/utilities/make_folds.py M mjolnir/utilities/training_pipeline.py 8 files changed, 63 insertions(+), 20 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR refs/changes/70/406070/1 diff --git a/jvm/src/main/scala/org/wikimedia/search/mjolnir/PythonUtils.scala b/jvm/src/main/scala/org/wikimedia/search/mjolnir/PythonUtils.scala index 8c6929e..9200fe9 100644 --- a/jvm/src/main/scala/org/wikimedia/search/mjolnir/PythonUtils.scala +++ b/jvm/src/main/scala/org/wikimedia/search/mjolnir/PythonUtils.scala @@ -1,7 +1,7 @@ package org.wikimedia.search.mjolnir import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint} -import org.apache.spark.ml.linalg.{Vector => MLVector} +import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vectors, Vector => MLVector} import org.apache.spark.rdd.RDD import org.apache.spark.sql.functions.col import org.apache.spark.sql.{Dataset, Row} @@ -13,22 +13,38 @@ * pyspark. */ object PythonUtils { + private def shiftVector(vec: MLVector): MLVector = vec match { + case y: DenseVector => Vectors.dense(Array(0D) ++ y.toArray) + case y: SparseVector => Vectors.sparse(y.size + 1, y.indices.map(_ + 1), y.values) + } + /** * There is no access to LabeledPoint from pyspark, but various methods such as * trainWithRDD and eval require an RDD[MLLabeledPoint]. This offers a bridge to * convert a Dataset into the required format. * + * @deprecated * @param ds Input dataframe containing features and label * @param featureCol Name of the column containing feature vectors * @param labelCol Name of the column containing numeric labels + * @param shiftRight Shift all features to index + 1. This is a disapointing hack, + * but due to the way data files are created feature indices start + * at 1 and the 0 feature is empty. This allows to shift to match + * when evaluating a dataframe againts a model trained that way. */ - def toLabeledPoints(ds: Dataset[_], featureCol: String, labelCol: String): RDD[MLLabeledPoint] = { + def toLabeledPoints(ds: Dataset[_], featureCol: String, labelCol: String, shiftRight: Boolean): RDD[MLLabeledPoint] = { ds.select(col(featureCol), col(labelCol).cast(DoubleType)).rdd.map { case Row(feature: MLVector, label: Double) => + val shiftedFeature = if (shiftRight) shiftVector(feature) else feature MLLabeledPoint(label, feature) } } + def toLabeledPoints(ds: Dataset[_], featureCol: String, labelCol: String): RDD[MLLabeledPoint] = { + toLabeledPoints(ds, featureCol, labelCol, shiftRight = false) + } + + /** * Training/evaluating a ranking model in XGBoost requires rows for the same * query to be provided sequentially, and it needs to know for each partition diff --git a/mjolnir/test/conftest.py b/mjolnir/test/conftest.py index efc8441..c4c3d77 100644 --- a/mjolnir/test/conftest.py +++ b/mjolnir/test/conftest.py @@ -72,7 +72,8 @@ .set('spark.jars.packages', ','.join([ 'ml.dmlc:xgboost4j-spark:0.8-wmf-1', 'org.wikimedia.search:mjolnir:0.4-SNAPSHOT', - 'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0'])) + 'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0', + 'org.wikimedia.analytics.refinery.hive:refinery-hive:0.0.57'])) # By default spark will shuffle to 200 partitions, which is # way too many for our small test cases. This cuts execution # time of the tests in half. diff --git a/mjolnir/test/fixtures/requests/test_integration.sqlite3 b/mjolnir/test/fixtures/requests/test_integration.sqlite3 new file mode 100644 index 0000000..44957bf --- /dev/null +++ b/mjolnir/test/fixtures/requests/test_integration.sqlite3 Binary files differ diff --git a/mjolnir/test/training/test_xgboost.py b/mjolnir/test/training/test_xgboost.py index 100a3e8..ba8dc48 100644 --- a/mjolnir/test/training/test_xgboost.py +++ b/mjolnir/test/training/test_xgboost.py @@ -1,7 +1,6 @@ from __future__ import absolute_import import mjolnir.training.xgboost from pyspark.ml.linalg import Vectors -import pyspark.sql import pytest @@ -107,7 +106,7 @@ # What else can we practically assert? df_transformed = model.transform(df_train) assert 'prediction' in df_transformed.columns - assert 0.74 == pytest.approx(model.eval(df_train), abs=0.01) + assert 0.59 == pytest.approx(model.eval(df_train), abs=0.01) # make sure train didn't clobber the incoming params assert params['num_rounds'] == 1 diff --git a/mjolnir/training/xgboost.py b/mjolnir/training/xgboost.py index 4d6bb9d..82c9d6c 100644 --- a/mjolnir/training/xgboost.py +++ b/mjolnir/training/xgboost.py @@ -59,7 +59,10 @@ # the pipeline so various tasks can accept a single column to work with. df.select('label', 'features', F.concat('wikiid', 'query').alias('queryId')) .repartition(num_partitions, 'queryId') - .sortWithinPartitions('queryId') + # xgboost ndcg isn't stable if labels of matching predictions are in + # different input orders so sort by label too. Sorting labels + # ascending is the less generous option, where the worst label comes first. + .sortWithinPartitions('queryId', F.col('label').asc()) .cache()) j_groups = df._sc._jvm.org.wikimedia.search.mjolnir.PythonUtils.calcQueryGroups( @@ -292,7 +295,7 @@ df_grouped = df_test j_rdd = df_test._sc._jvm.org.wikimedia.search.mjolnir.PythonUtils.toLabeledPoints( - df_grouped._jdf, feature_col, label_col) + df_grouped._jdf, feature_col, label_col, True) score = self._j_xgb_model.eval(j_rdd, 'test', None, 0, False, j_groups) return float(score.split('=')[1].strip()) @@ -327,7 +330,7 @@ } -def tune(folds, stats, train_matrix, num_cv_jobs=5, initial_num_trees=100, final_num_trees=500): +def tune(folds, stats, train_matrix, num_cv_jobs=5, initial_num_trees=100, final_num_trees=500, iterations=150): """Find appropriate hyperparameters for training df This is far from perfect, hyperparameter tuning is a bit of a black art @@ -376,13 +379,15 @@ dataset_size = 'large' elif num_obs > 500000: dataset_size = 'med' - else: + elif num_obs > 500: dataset_size = 'small' + else: + dataset_size = 'xsmall' # Setup different tuning profiles for different sizes of datasets. tune_spaces = [ ('initial', { - 'iterations': 150, + 'iterations': iterations, 'space': { 'xlarge': { 'eta': hyperopt.hp.uniform('eta', 0.3, 0.8), @@ -393,6 +398,7 @@ 'min_child_weight', np.log(10), np.log(500), 10), # % of features to use for each tree. helps prevent overfit 'colsample_bytree': hyperopt.hp.quniform('colsample_bytree', 0.8, 1, .01), + 'subsample': hyperopt.hp.quniform('subsample', 0.8, 1, .01), }, 'large': { 'eta': hyperopt.hp.uniform('eta', 0.3, 0.6), @@ -400,6 +406,7 @@ 'min_child_weight': hyperopt.hp.qloguniform( 'min_child_weight', np.log(10), np.log(300), 10), 'colsample_bytree': hyperopt.hp.quniform('colsample_bytree', 0.8, 1, .01), + 'subsample': hyperopt.hp.quniform('subsample', 0.8, 1, .01), }, 'med': { 'eta': hyperopt.hp.uniform('eta', 0.1, 0.6), @@ -407,6 +414,7 @@ 'min_child_weight': hyperopt.hp.qloguniform( 'min_child_weight', np.log(10), np.log(300), 10), 'colsample_bytree': hyperopt.hp.quniform('colsample_bytree', 0.8, 1, .01), + 'subsample': hyperopt.hp.quniform('subsample', 0.8, 1, .01), }, 'small': { 'eta': hyperopt.hp.uniform('eta', 0.1, 0.4), @@ -414,6 +422,15 @@ 'min_child_weight': hyperopt.hp.qloguniform( 'min_child_weight', np.log(10), np.log(100), 10), 'colsample_bytree': hyperopt.hp.quniform('colsample_bytree', 0.8, 1, .01), + 'subsample': hyperopt.hp.quniform('subsample', 0.8, 1, .01), + }, + 'xsmall': { + 'eta': hyperopt.hp.uniform('eta', 0.1, 0.4), + 'max_depth': hyperopt.hp.quniform('max_depth', 3, 6, 1), + # Never use for real data, but convenient for tiny sets in test suite + 'min_child_weight': 0, + 'colsample_bytree': hyperopt.hp.quniform('colsample_bytree', 0.8, 1, .01), + 'subsample': hyperopt.hp.quniform('subsample', 0.8, 1, .01), } }[dataset_size] }) @@ -442,6 +459,7 @@ 'large': 6, 'med': 5, 'small': 4, + 'xsmall': 3, }[dataset_size], 'gamma': 0, 'subsample': 1.0, diff --git a/mjolnir/utilities/data_pipeline.py b/mjolnir/utilities/data_pipeline.py index ae63de3..b1bc93d 100644 --- a/mjolnir/utilities/data_pipeline.py +++ b/mjolnir/utilities/data_pipeline.py @@ -22,18 +22,18 @@ from pyspark import SparkContext from pyspark.sql import HiveContext from pyspark.sql import functions as F +import requests SEARCH_CLUSTERS = { 'eqiad': ['http://elastic%d.eqiad.wmnet:9200' % (i) for i in range(1017, 1052)], 'codfw': ['http://elastic%d.codfw.wmnet:9200' % (i) for i in range(2001, 2035)], + 'localhost': ['http://localhost:9200'], } def run_pipeline(sc, sqlContext, input_dir, output_dir, wikis, samples_per_wiki, min_sessions_per_query, search_cluster, brokers, ltr_feature_definitions, - samples_size_tolerance): - # TODO: Should this jar have to be provided on the command line instead? - sqlContext.sql("ADD JAR /mnt/hdfs/wmf/refinery/current/artifacts/refinery-hive.jar") + samples_size_tolerance, session_factory=requests.Session): sqlContext.sql("CREATE TEMPORARY FUNCTION stemmer AS 'org.wikimedia.analytics.refinery.hive.StemmerUDF'") # Load click data from HDFS @@ -61,7 +61,8 @@ # TODO: While this works for now, at some point we might want to handle # things like multimedia search from commons, and non-main namespace searches. indices={wiki: '%s_content' % (wiki) for wiki in wikis}, - min_sessions_per_query=min_sessions_per_query) + min_sessions_per_query=min_sessions_per_query, + session_factory=session_factory) # Sample to some subset of queries per wiki hit_page_id_counts, df_sampled_raw = mjolnir.sampling.sample( @@ -90,7 +91,7 @@ df_rel = ( mjolnir.dbn.train(df_sampled, num_partitions=dbn_partitions, dbn_config={ 'MAX_ITERATIONS': 40, - 'MIN_DOCS_PER_QUERY': 10, + 'MIN_DOCS_PER_QUERY': 5, 'MAX_DOCS_PER_QUERY': 20, 'SERP_SIZE': 20, 'DEFAULT_REL': 0.5}) @@ -163,7 +164,8 @@ # things like multimedia search from commons, and non-main namespace searches. indices={wiki: '%s_content' % (wiki) for wiki in wikis}, model=ltr_feature_definitions, - feature_names_accu=fnames_accu) + feature_names_accu=fnames_accu, + session_factory=session_factory) else: if brokers: df_features = mjolnir.features.collect_kafka( @@ -184,7 +186,7 @@ # to vary per-wiki. Varied features per wiki would also mean they can't be trained # together, which is perhaps a good thing anyways. feature_definitions=mjolnir.features.enwiki_features(), - feature_names_accu=fnames_accu) + feature_names_accu=fnames_accu, session_factory=session_factory) # collect the accumulator df_features.cache().count() diff --git a/mjolnir/utilities/make_folds.py b/mjolnir/utilities/make_folds.py index 5cbd682..dde25e5 100644 --- a/mjolnir/utilities/make_folds.py +++ b/mjolnir/utilities/make_folds.py @@ -131,7 +131,7 @@ df_fold = ( mjolnir.training.tuning.group_k_fold(df, num_folds) .repartition(200, 'wikiid', 'query') - .sortWithinPartitions('wikiid', 'query')) + .sortWithinPartitions('wikiid', 'query', 'label')) try: df_fold.cache() diff --git a/mjolnir/utilities/training_pipeline.py b/mjolnir/utilities/training_pipeline.py index 37d4473..55072a2 100644 --- a/mjolnir/utilities/training_pipeline.py +++ b/mjolnir/utilities/training_pipeline.py @@ -24,7 +24,10 @@ import sys -def run_pipeline(sc, sqlContext, input_dir, output_dir, wikis, initial_num_trees, final_num_trees, num_cv_jobs): +def run_pipeline( + sc, sqlContext, input_dir, output_dir, wikis, initial_num_trees, + final_num_trees, num_cv_jobs, iterations +): with hdfs_open_read(os.path.join(input_dir, 'stats.json')) as f: stats = json.loads(f.read()) @@ -68,7 +71,8 @@ num_cv_jobs=num_cv_jobs, train_matrix="train", initial_num_trees=initial_num_trees, - final_num_trees=final_num_trees) + final_num_trees=final_num_trees, + iterations=iterations) print 'CV test-ndcg@10: %.4f' % (tune_results['metrics']['cv-test']) print 'CV train-ndcg@10: %.4f' % (tune_results['metrics']['cv-train']) @@ -143,6 +147,9 @@ help='Number of trees in the final ensemble. If not provided the value from ' + '--initial-trees will be used. (Default: None)') parser.add_argument( + '-t', '--iterations', dest='iterations', default=150, type=int, + help='The number of hyperparameter tuning iterations to perform') + parser.add_argument( '-v', '--verbose', dest='verbose', default=False, action='store_true', help='Increase logging to INFO') parser.add_argument( -- To view, visit https://gerrit.wikimedia.org/r/406070 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I8ad5fe1dbbbd50b897362b44411cfc19650b0390 Gerrit-PatchSet: 1 Gerrit-Project: search/MjoLniR Gerrit-Branch: master Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits