EBernhardson has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/406070 )

Change subject: Add end-to-end integration test
......................................................................

Add end-to-end integration test

A basic end to end run through of the training pipeline. It's
of course a bit slow, but worthwhile to see the whole operation
run from end to end.

* verifies that the general premise works
* outputs from one stage are expected by the input from the next
* models are in expected places and loadable
* evaluations run against the models match train time metrics

Change-Id: I8ad5fe1dbbbd50b897362b44411cfc19650b0390
---
M jvm/src/main/scala/org/wikimedia/search/mjolnir/PythonUtils.scala
M mjolnir/test/conftest.py
A mjolnir/test/fixtures/requests/test_integration.sqlite3
M mjolnir/test/training/test_xgboost.py
M mjolnir/training/xgboost.py
M mjolnir/utilities/data_pipeline.py
M mjolnir/utilities/make_folds.py
M mjolnir/utilities/training_pipeline.py
8 files changed, 63 insertions(+), 20 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR 
refs/changes/70/406070/1

diff --git a/jvm/src/main/scala/org/wikimedia/search/mjolnir/PythonUtils.scala 
b/jvm/src/main/scala/org/wikimedia/search/mjolnir/PythonUtils.scala
index 8c6929e..9200fe9 100644
--- a/jvm/src/main/scala/org/wikimedia/search/mjolnir/PythonUtils.scala
+++ b/jvm/src/main/scala/org/wikimedia/search/mjolnir/PythonUtils.scala
@@ -1,7 +1,7 @@
 package org.wikimedia.search.mjolnir
 
 import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint}
-import org.apache.spark.ml.linalg.{Vector => MLVector}
+import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vectors, Vector 
=> MLVector}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.{Dataset, Row}
@@ -13,22 +13,38 @@
   * pyspark.
   */
 object PythonUtils {
+  private def shiftVector(vec: MLVector): MLVector = vec match {
+    case y: DenseVector => Vectors.dense(Array(0D) ++ y.toArray)
+    case y: SparseVector => Vectors.sparse(y.size + 1, y.indices.map(_ + 1), 
y.values)
+  }
+
   /**
    * There is no access to LabeledPoint from pyspark, but various methods such 
as
    * trainWithRDD and eval require an RDD[MLLabeledPoint]. This offers a 
bridge to
    * convert a Dataset into the required format.
    *
+   * @deprecated
    * @param ds Input dataframe containing features and label
    * @param featureCol Name of the column containing feature vectors
    * @param labelCol Name of the column containing numeric labels
+   * @param shiftRight Shift all features to index + 1. This is a disapointing 
hack,
+   *                  but due to the way data files are created feature 
indices start
+   *                  at 1 and the 0 feature is empty. This allows to shift to 
match
+   *                  when evaluating a dataframe againts a model trained that 
way.
    */
-  def toLabeledPoints(ds: Dataset[_], featureCol: String, labelCol: String): 
RDD[MLLabeledPoint] = {
+  def toLabeledPoints(ds: Dataset[_], featureCol: String, labelCol: String, 
shiftRight: Boolean): RDD[MLLabeledPoint] = {
     ds.select(col(featureCol), col(labelCol).cast(DoubleType)).rdd.map {
       case Row(feature: MLVector, label: Double) =>
+        val shiftedFeature = if (shiftRight) shiftVector(feature) else feature
         MLLabeledPoint(label, feature)
     }
   }
 
+  def toLabeledPoints(ds: Dataset[_], featureCol: String, labelCol: String): 
RDD[MLLabeledPoint] = {
+    toLabeledPoints(ds, featureCol, labelCol, shiftRight = false)
+  }
+
+
   /**
    * Training/evaluating a ranking model in XGBoost requires rows for the same
    * query to be provided sequentially, and it needs to know for each partition
diff --git a/mjolnir/test/conftest.py b/mjolnir/test/conftest.py
index efc8441..c4c3d77 100644
--- a/mjolnir/test/conftest.py
+++ b/mjolnir/test/conftest.py
@@ -72,7 +72,8 @@
         .set('spark.jars.packages', ','.join([
             'ml.dmlc:xgboost4j-spark:0.8-wmf-1',
             'org.wikimedia.search:mjolnir:0.4-SNAPSHOT',
-            'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0']))
+            'org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0',
+            'org.wikimedia.analytics.refinery.hive:refinery-hive:0.0.57']))
         # By default spark will shuffle to 200 partitions, which is
         # way too many for our small test cases. This cuts execution
         # time of the tests in half.
diff --git a/mjolnir/test/fixtures/requests/test_integration.sqlite3 
b/mjolnir/test/fixtures/requests/test_integration.sqlite3
new file mode 100644
index 0000000..44957bf
--- /dev/null
+++ b/mjolnir/test/fixtures/requests/test_integration.sqlite3
Binary files differ
diff --git a/mjolnir/test/training/test_xgboost.py 
b/mjolnir/test/training/test_xgboost.py
index 100a3e8..ba8dc48 100644
--- a/mjolnir/test/training/test_xgboost.py
+++ b/mjolnir/test/training/test_xgboost.py
@@ -1,7 +1,6 @@
 from __future__ import absolute_import
 import mjolnir.training.xgboost
 from pyspark.ml.linalg import Vectors
-import pyspark.sql
 import pytest
 
 
@@ -107,7 +106,7 @@
     # What else can we practically assert?
     df_transformed = model.transform(df_train)
     assert 'prediction' in df_transformed.columns
-    assert 0.74 == pytest.approx(model.eval(df_train), abs=0.01)
+    assert 0.59 == pytest.approx(model.eval(df_train), abs=0.01)
 
     # make sure train didn't clobber the incoming params
     assert params['num_rounds'] == 1
diff --git a/mjolnir/training/xgboost.py b/mjolnir/training/xgboost.py
index 4d6bb9d..82c9d6c 100644
--- a/mjolnir/training/xgboost.py
+++ b/mjolnir/training/xgboost.py
@@ -59,7 +59,10 @@
         # the pipeline so various tasks can accept a single column to work 
with.
         df.select('label', 'features', F.concat('wikiid', 
'query').alias('queryId'))
         .repartition(num_partitions, 'queryId')
-        .sortWithinPartitions('queryId')
+        # xgboost ndcg isn't stable if labels of matching predictions are in
+        # different input orders so sort by label too. Sorting labels
+        # ascending is the less generous option, where the worst label comes 
first.
+        .sortWithinPartitions('queryId', F.col('label').asc())
         .cache())
 
     j_groups = 
df._sc._jvm.org.wikimedia.search.mjolnir.PythonUtils.calcQueryGroups(
@@ -292,7 +295,7 @@
             df_grouped = df_test
 
         j_rdd = 
df_test._sc._jvm.org.wikimedia.search.mjolnir.PythonUtils.toLabeledPoints(
-            df_grouped._jdf, feature_col, label_col)
+            df_grouped._jdf, feature_col, label_col, True)
         score = self._j_xgb_model.eval(j_rdd, 'test', None, 0, False, j_groups)
         return float(score.split('=')[1].strip())
 
@@ -327,7 +330,7 @@
     }
 
 
-def tune(folds, stats, train_matrix, num_cv_jobs=5, initial_num_trees=100, 
final_num_trees=500):
+def tune(folds, stats, train_matrix, num_cv_jobs=5, initial_num_trees=100, 
final_num_trees=500, iterations=150):
     """Find appropriate hyperparameters for training df
 
     This is far from perfect, hyperparameter tuning is a bit of a black art
@@ -376,13 +379,15 @@
         dataset_size = 'large'
     elif num_obs > 500000:
         dataset_size = 'med'
-    else:
+    elif num_obs > 500:
         dataset_size = 'small'
+    else:
+        dataset_size = 'xsmall'
 
     # Setup different tuning profiles for different sizes of datasets.
     tune_spaces = [
         ('initial', {
-            'iterations': 150,
+            'iterations': iterations,
             'space': {
                 'xlarge': {
                     'eta': hyperopt.hp.uniform('eta', 0.3, 0.8),
@@ -393,6 +398,7 @@
                         'min_child_weight', np.log(10), np.log(500), 10),
                     # % of features to use for each tree. helps prevent overfit
                     'colsample_bytree': 
hyperopt.hp.quniform('colsample_bytree', 0.8, 1, .01),
+                    'subsample': hyperopt.hp.quniform('subsample', 0.8, 1, 
.01),
                 },
                 'large': {
                     'eta': hyperopt.hp.uniform('eta', 0.3, 0.6),
@@ -400,6 +406,7 @@
                     'min_child_weight': hyperopt.hp.qloguniform(
                         'min_child_weight', np.log(10), np.log(300), 10),
                     'colsample_bytree': 
hyperopt.hp.quniform('colsample_bytree', 0.8, 1, .01),
+                    'subsample': hyperopt.hp.quniform('subsample', 0.8, 1, 
.01),
                 },
                 'med': {
                     'eta': hyperopt.hp.uniform('eta', 0.1, 0.6),
@@ -407,6 +414,7 @@
                     'min_child_weight': hyperopt.hp.qloguniform(
                         'min_child_weight', np.log(10), np.log(300), 10),
                     'colsample_bytree': 
hyperopt.hp.quniform('colsample_bytree', 0.8, 1, .01),
+                    'subsample': hyperopt.hp.quniform('subsample', 0.8, 1, 
.01),
                 },
                 'small': {
                     'eta': hyperopt.hp.uniform('eta', 0.1, 0.4),
@@ -414,6 +422,15 @@
                     'min_child_weight': hyperopt.hp.qloguniform(
                         'min_child_weight', np.log(10), np.log(100), 10),
                     'colsample_bytree': 
hyperopt.hp.quniform('colsample_bytree', 0.8, 1, .01),
+                    'subsample': hyperopt.hp.quniform('subsample', 0.8, 1, 
.01),
+                },
+                'xsmall': {
+                    'eta': hyperopt.hp.uniform('eta', 0.1, 0.4),
+                    'max_depth': hyperopt.hp.quniform('max_depth', 3, 6, 1),
+                    # Never use for real data, but convenient for tiny sets in 
test suite
+                    'min_child_weight': 0,
+                    'colsample_bytree': 
hyperopt.hp.quniform('colsample_bytree', 0.8, 1, .01),
+                    'subsample': hyperopt.hp.quniform('subsample', 0.8, 1, 
.01),
                 }
             }[dataset_size]
         })
@@ -442,6 +459,7 @@
             'large': 6,
             'med': 5,
             'small': 4,
+            'xsmall': 3,
         }[dataset_size],
         'gamma': 0,
         'subsample': 1.0,
diff --git a/mjolnir/utilities/data_pipeline.py 
b/mjolnir/utilities/data_pipeline.py
index ae63de3..b1bc93d 100644
--- a/mjolnir/utilities/data_pipeline.py
+++ b/mjolnir/utilities/data_pipeline.py
@@ -22,18 +22,18 @@
 from pyspark import SparkContext
 from pyspark.sql import HiveContext
 from pyspark.sql import functions as F
+import requests
 
 SEARCH_CLUSTERS = {
     'eqiad': ['http://elastic%d.eqiad.wmnet:9200' % (i) for i in range(1017, 
1052)],
     'codfw': ['http://elastic%d.codfw.wmnet:9200' % (i) for i in range(2001, 
2035)],
+    'localhost': ['http://localhost:9200'],
 }
 
 
 def run_pipeline(sc, sqlContext, input_dir, output_dir, wikis, 
samples_per_wiki,
                  min_sessions_per_query, search_cluster, brokers, 
ltr_feature_definitions,
-                 samples_size_tolerance):
-    # TODO: Should this jar have to be provided on the command line instead?
-    sqlContext.sql("ADD JAR 
/mnt/hdfs/wmf/refinery/current/artifacts/refinery-hive.jar")
+                 samples_size_tolerance, session_factory=requests.Session):
     sqlContext.sql("CREATE TEMPORARY FUNCTION stemmer AS 
'org.wikimedia.analytics.refinery.hive.StemmerUDF'")
 
     # Load click data from HDFS
@@ -61,7 +61,8 @@
         # TODO: While this works for now, at some point we might want to handle
         # things like multimedia search from commons, and non-main namespace 
searches.
         indices={wiki: '%s_content' % (wiki) for wiki in wikis},
-        min_sessions_per_query=min_sessions_per_query)
+        min_sessions_per_query=min_sessions_per_query,
+        session_factory=session_factory)
 
     # Sample to some subset of queries per wiki
     hit_page_id_counts, df_sampled_raw = mjolnir.sampling.sample(
@@ -90,7 +91,7 @@
     df_rel = (
         mjolnir.dbn.train(df_sampled, num_partitions=dbn_partitions, 
dbn_config={
             'MAX_ITERATIONS': 40,
-            'MIN_DOCS_PER_QUERY': 10,
+            'MIN_DOCS_PER_QUERY': 5,
             'MAX_DOCS_PER_QUERY': 20,
             'SERP_SIZE': 20,
             'DEFAULT_REL': 0.5})
@@ -163,7 +164,8 @@
                 # things like multimedia search from commons, and non-main 
namespace searches.
                 indices={wiki: '%s_content' % (wiki) for wiki in wikis},
                 model=ltr_feature_definitions,
-                feature_names_accu=fnames_accu)
+                feature_names_accu=fnames_accu,
+                session_factory=session_factory)
     else:
         if brokers:
             df_features = mjolnir.features.collect_kafka(
@@ -184,7 +186,7 @@
                 # to vary per-wiki. Varied features per wiki would also mean 
they can't be trained
                 # together, which is perhaps a good thing anyways.
                 feature_definitions=mjolnir.features.enwiki_features(),
-                feature_names_accu=fnames_accu)
+                feature_names_accu=fnames_accu, 
session_factory=session_factory)
 
     # collect the accumulator
     df_features.cache().count()
diff --git a/mjolnir/utilities/make_folds.py b/mjolnir/utilities/make_folds.py
index 5cbd682..dde25e5 100644
--- a/mjolnir/utilities/make_folds.py
+++ b/mjolnir/utilities/make_folds.py
@@ -131,7 +131,7 @@
     df_fold = (
         mjolnir.training.tuning.group_k_fold(df, num_folds)
         .repartition(200, 'wikiid', 'query')
-        .sortWithinPartitions('wikiid', 'query'))
+        .sortWithinPartitions('wikiid', 'query', 'label'))
 
     try:
         df_fold.cache()
diff --git a/mjolnir/utilities/training_pipeline.py 
b/mjolnir/utilities/training_pipeline.py
index 37d4473..55072a2 100644
--- a/mjolnir/utilities/training_pipeline.py
+++ b/mjolnir/utilities/training_pipeline.py
@@ -24,7 +24,10 @@
 import sys
 
 
-def run_pipeline(sc, sqlContext, input_dir, output_dir, wikis, 
initial_num_trees, final_num_trees, num_cv_jobs):
+def run_pipeline(
+    sc, sqlContext, input_dir, output_dir, wikis, initial_num_trees,
+    final_num_trees, num_cv_jobs, iterations
+):
     with hdfs_open_read(os.path.join(input_dir, 'stats.json')) as f:
         stats = json.loads(f.read())
 
@@ -68,7 +71,8 @@
             num_cv_jobs=num_cv_jobs,
             train_matrix="train",
             initial_num_trees=initial_num_trees,
-            final_num_trees=final_num_trees)
+            final_num_trees=final_num_trees,
+            iterations=iterations)
 
         print 'CV  test-ndcg@10: %.4f' % (tune_results['metrics']['cv-test'])
         print 'CV train-ndcg@10: %.4f' % (tune_results['metrics']['cv-train'])
@@ -143,6 +147,9 @@
         help='Number of trees in the final ensemble. If not provided the value 
from '
              + '--initial-trees will be used.  (Default: None)')
     parser.add_argument(
+        '-t', '--iterations', dest='iterations', default=150, type=int,
+        help='The number of hyperparameter tuning iterations to perform')
+    parser.add_argument(
         '-v', '--verbose', dest='verbose', default=False, action='store_true',
         help='Increase logging to INFO')
     parser.add_argument(

-- 
To view, visit https://gerrit.wikimedia.org/r/406070
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I8ad5fe1dbbbd50b897362b44411cfc19650b0390
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to