jenkins-bot has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/403869 )

Change subject: Generalize tuning pipeline
......................................................................


Generalize tuning pipeline

This pipeline was pretty convoluted. Push most of the complexity
up out of the pipeline into the single ModelSelection object leaving
the rest of the model selection code (cross validation, tuning,
parameter selection, etc) clearer and more directly implemented.
This also provides a reusable tuning implementation to share between
xgboost and lightgbm.

Change-Id: I8f2a2f3aeca85fe86cb6d466622a2e83dd249172
---
M mjolnir/test/training/test_hyperopt.py
M mjolnir/test/training/test_tuning.py
M mjolnir/training/hyperopt.py
M mjolnir/training/tuning.py
M mjolnir/training/xgboost.py
M mjolnir/utilities/training_pipeline.py
6 files changed, 229 insertions(+), 175 deletions(-)

Approvals:
  jenkins-bot: Verified
  DCausse: Looks good to me, approved



diff --git a/mjolnir/test/training/test_hyperopt.py 
b/mjolnir/test/training/test_hyperopt.py
index c4c4782..1dec547 100644
--- a/mjolnir/test/training/test_hyperopt.py
+++ b/mjolnir/test/training/test_hyperopt.py
@@ -1,27 +1,19 @@
 from __future__ import absolute_import
 import hyperopt
 import mjolnir.training.hyperopt
-from pyspark.ml.linalg import Vectors
-import pytest
 
 
-def _make_q(query, n=4):
-    "Generates single feature queries"
-    return [('foowiki', query, query, float(f), Vectors.dense([float(f)])) for 
f in range(n)]
-
-
[email protected]
-def df_train(spark_context, hive_context):
-    # TODO: Use some fixture dataset representing real-ish data? But
-    # it needs to be pretty small
-    return spark_context.parallelize(
-        _make_q('abc') + _make_q('def') + _make_q('ghi') + _make_q('jkl')
-        + _make_q('mno') + _make_q('pqr') + _make_q('stu')
-    ).toDF(['wikiid', 'norm_query_id', 'query', 'label', 'features'])
-
-
-def test_minimize(folds_b):
+def test_maximize(folds_b):
     "Not an amazing test...basically sees if the happy path doesnt blow up"
+    def f(params):
+        assert isinstance(params, dict)
+        assert 'max_depth' in params
+        assert params['num_rounds'] == 50
+        return [{
+            'train': [0.80],
+            'test': [0.79],
+        }]
+
     space = {
         'num_rounds': 50,
         'max_depth': hyperopt.hp.quniform('max_depth', 1, 20, 1)
@@ -30,33 +22,11 @@
     # mostly hyperopt just calls cross_validate, of which the integration with
     # xgboost is separately tested. Instead of going all the way into xgboost
     # mock it out w/MockModel.
-    best_params, trails = mjolnir.training.hyperopt.minimize(
-        folds_b, MockModel, space, max_evals=5)
+    best_params, trails = mjolnir.training.hyperopt.maximize(
+        f, space, max_evals=5)
     assert isinstance(best_params, dict)
     # num_rounds should have been unchanged
     assert 'num_rounds' in best_params
     assert best_params['num_rounds'] == 50
     # should have max_evals evaluations
     assert len(trails.trials) == 5
-
-
-class MockSummary(object):
-    def train(self):
-        return [1.]
-
-    def test(self):
-        return [1.]
-
-
-class MockModel(object):
-    def __init__(self, df, params, train_matrix=None):
-        # Params that were passed to hyperopt
-        assert isinstance(params, dict)
-        assert 'max_depth' in params
-        assert params['num_rounds'] == 50
-
-    def eval(self, df_test, j_groups=None, feature_col='features', 
label_col='label'):
-        return 1.0
-
-    def summary(self):
-        return MockSummary()
diff --git a/mjolnir/test/training/test_tuning.py 
b/mjolnir/test/training/test_tuning.py
index e14982b..22402f1 100644
--- a/mjolnir/test/training/test_tuning.py
+++ b/mjolnir/test/training/test_tuning.py
@@ -1,8 +1,8 @@
 from __future__ import absolute_import
+import hyperopt
 import mjolnir.training.tuning
 import mjolnir.training.xgboost
 from pyspark.sql import functions as F
-from pyspark.ml.linalg import Vectors
 import pytest
 
 
@@ -32,27 +32,115 @@
     assert len(queries_in_0.intersection(queries_in_1)) == 0
 
 
-def _make_q(query, n=4):
-    "Generates single feature queries"
-    return [('foowiki', query, query, float(f), Vectors.dense([float(f)])) for 
f in range(n)]
+def run_model_selection(tune_stages, f=None, num_cv_jobs=1, **kwargs):
+    stats = {'called': 0}
+    initial_space = {'foo': 10, 'bar': 20, 'baz': 0}
+    folds = [[1, 2, 3], [4, 5, 6]]
+    if not f:
+        def f(fold, params, **kwargs):
+            stats['called'] += 1
+            factor = 1.0 / (6 * params['foo'])
+            return {
+                'test': [v * factor * 0.9 for v in fold],
+                'train': [v * factor for v in fold],
+            }
+
+    tuner = mjolnir.training.tuning.ModelSelection(initial_space, tune_stages)
+    train_func = tuner.make_cv_objective(f, folds, num_cv_jobs, **kwargs)
+    trials_pool = tuner.build_pool(folds, num_cv_jobs)
+    result = tuner(train_func, trials_pool)
+    return result, stats['called']
 
 
[email protected]
-def df_train(spark_context, hive_context):
-    # TODO: Use some fixture dataset representing real-ish data? But
-    # it needs to be pretty small
-    return spark_context.parallelize(
-        _make_q('abc') + _make_q('def') + _make_q('ghi') + _make_q('jkl')
-        + _make_q('mno') + _make_q('pqr') + _make_q('stu')
-    ).toDF(['wikiid', 'norm_query_id', 'query', 'label', 'features'])
+def test_ModelSelection():
+    num_iterations = 3
+    result, called = run_model_selection([
+        ('a', {
+            'iterations': num_iterations,
+            'space': {
+                'foo': hyperopt.hp.uniform('foo', 1, 9),
+            },
+        }),
+        ('b', {
+            'iterations': num_iterations,
+            'space': {
+                'bar': hyperopt.hp.uniform('bar', 1, 5),
+            },
+        })
+    ])
+    # stages * iterations * folds
+    assert called == 2 * num_iterations * 2
+    # We should still have three parameters
+    assert len(result['params']) == 3
+    # foo should have a new value between 1 and 9
+    assert 1 <= result['params']['foo'] <= 9
+    # bar should have a new value between 1 and 5
+    assert 1 <= result['params']['bar'] <= 5
+    # baz should be untouched
+    assert result['params']['baz'] == 0
 
 
-def test_cross_validate_plain_df(folds_a):
-    scores = mjolnir.training.tuning.cross_validate(
-        folds_a,
-        mjolnir.training.xgboost.train,
-        {'objective': 'rank:ndcg', 'eval_metric': 'ndcg@3', 'num_rounds': 1},
-        pool=None)
-    # one score for each fold
-    for fold, score in zip(folds_a, scores):
-        assert fold[0].keys() == score.keys()
+def test_ModelSelection_stage_condition():
+    num_iterations = 3
+    result, called = run_model_selection([
+        ('a', {
+            'condition': lambda: False,
+            'iterations': num_iterations,
+            'space': {
+                'foo': hyperopt.hp.uniform('foo', 1, 9),
+            }
+        }),
+        ('b', {
+            'iterations': num_iterations,
+            'space': {
+                'bar': hyperopt.hp.uniform('bar', 1, 9),
+            }
+        }),
+    ])
+    # iterations * folds
+    assert called == num_iterations * 2
+    assert result['params']['foo'] == 10
+    assert 1 <= result['params']['bar'] <= 9
+    assert result['params']['baz'] == 0
+
+
+def test_ModelSelection_kwargs_pass_thru():
+    tuner = mjolnir.training.tuning.ModelSelection(None, None)
+    expected_kwargs = {'hi': 5, 'there': 'test'}
+
+    def f(fold, params, **kwargs):
+        assert kwargs == expected_kwargs
+        return {'test': [fold[0]], 'train': [fold[0]]}
+
+    obj = tuner.make_cv_objective(f, [[1], [2]], 1, **expected_kwargs)
+
+    res = obj(None)
+    assert res == [
+        {'test': [1], 'train': [1]},
+        {'test': [2], 'train': [2]}
+    ]
+
+
[email protected](
+    "num_folds, num_workers, num_cv_jobs, expect_pool", [
+        (1,           1,           1,       False),
+        (1,           1,           2,       True),
+
+        (3,           1,           1,       False),
+        (3,           1,           5,       False),
+        (3,           1,           6,       True),
+
+        (3,           3,           1,       False),
+        (3,           3,          10,       False),
+        (3,           3,          17,       False),
+        (3,           3,          18,       True),
+
+        (5,           1,           5,       False),
+        (5,           1,           9,       False),
+        (5,           1,          11,       True),
+    ])
+def test_ModelSelection_build_pool(num_folds, num_workers, num_cv_jobs, 
expect_pool):
+    tuner = mjolnir.training.tuning.ModelSelection(None, None)
+    folds = [[1] * num_workers for i in range(num_folds)]
+    pool = tuner.build_pool(folds, num_cv_jobs)
+    assert (pool is not None) == expect_pool
diff --git a/mjolnir/training/hyperopt.py b/mjolnir/training/hyperopt.py
index 9976bb3..c769a90 100644
--- a/mjolnir/training/hyperopt.py
+++ b/mjolnir/training/hyperopt.py
@@ -5,8 +5,6 @@
 import hyperopt
 import hyperopt.pyll.base
 from hyperopt.utils import coarse_utcnow
-import math
-import mjolnir.training.tuning
 import numpy as np
 
 
@@ -80,26 +78,22 @@
         return state
 
 
-def minimize(folds, train_func, space, max_evals=50, algo=hyperopt.tpe.suggest,
-             cv_pool=None, trials_pool=None):
-    """Perform cross validated hyperparameter optimization of train_func
+def maximize(f, space, max_evals=50, algo=hyperopt.tpe.suggest,
+             trials_pool=None):
+    """Maximize the loss of f over the provided space
 
     Parameters
     ----------
-    folds : list of dict each with two keys, train and test
-        Features and Labels to optimize over
-    train_func : callable
-        Function to use for training individual models
+    f : callable
+        Function to maximize. Will be provided with a dict and expected
+        to return a list of dicts each containing test and train keys
     space : dict
-        Hyperparameter space to search over.
+        Hyperparameter space to search over, from hyperopt.hp.*.
     max_evals : int
         Maximum iterations of hyperparameter tuning to perform.
     algo : callable
         The algorithm to use with hyperopt. See docs of hyperopt.fmin for more
         details.
-    cv_pool : multiprocessing.dummy.Pool or None
-        Controls the number of models to run in parallel. If None models
-        are trained sequentially.
     trials_pool : multiprocessing.dummy.Pool or None
         Controls the number of hyperopt trials run in parallel. If None trials
         are run sequentially.
@@ -113,28 +107,24 @@
     """
 
     def objective(params):
-        scores = mjolnir.training.tuning.cross_validate(
-            folds, train_func, params, pool=cv_pool)
+        scores = f(params)
+        if scores is None:
+            return {
+                'status': hyperopt.STATUS_FAIL,
+                'failure': 'Complete failure, no score returned'
+            }
         # For now the only metric is NDCG, and hyperopt is a minimizer
         # so return the negative NDCG. Also makes the bold assumption
         # we had at least two pieces of the fold named 'test' and 'train'
         loss = [-s['test'][-1] for s in scores]
         true_loss = [s['train'][-1] - s['test'][-1] for s in scores]
-        num_failures = sum([math.isnan(s) for s in loss])
-        if num_failures > 1:
-            return {
-                'status': hyperopt.STATUS_FAIL,
-                'failure': 'Too many failures: %d' % (num_failures)
-            }
-        else:
-            loss = [s for s in loss if not math.isnan(s)]
-            true_loss = [s for s in true_loss if not math.isnan(s)]
         return {
             'status': hyperopt.STATUS_OK,
             'loss': sum(loss) / float(len(loss)),
             'loss_variance': np.var(loss),
             'true_loss': sum(true_loss) / float(len(true_loss)),
             'true_loss_variance': np.var(true_loss),
+            'scores': scores,
         }
 
     if trials_pool is None:
diff --git a/mjolnir/training/tuning.py b/mjolnir/training/tuning.py
index f106f88..7d2df68 100644
--- a/mjolnir/training/tuning.py
+++ b/mjolnir/training/tuning.py
@@ -1,11 +1,16 @@
 """
-Support for making test/train or k-fold splits
+Support for choosing model parameters.
+
+Includes dataset splitting, cross validation and model selection
 """
 from __future__ import absolute_import
-from collections import defaultdict
+from collections import defaultdict, OrderedDict
 import mjolnir.spark
 import py4j.protocol
 from pyspark.sql import functions as F
+import math
+from multiprocessing.dummy import Pool
+import numpy as np
 
 
 def split(df, splits, output_column='fold'):
@@ -95,9 +100,11 @@
     num_folds : int
     output_column : str, optional
 
-    Yields
+    Returns
     ------
-    dict
+    pyspark.sql.DataFrame
+        Input data frame with a 'fold' column indicating fold membership.
+        Normalized queries are equally distributed to each fold.
     """
     return (
         split(df, [1. / num_folds] * num_folds, output_column)
@@ -126,34 +133,76 @@
     return with_retry
 
 
-def cross_validate(folds, train_func, params, pool):
-    """Perform cross validation of the provided folds
+class ModelSelection(object):
+    def __init__(self, initial_space, tune_stages, transformer=None):
+        self.initial_space = initial_space
+        self.tune_stages = tune_stages
+        self.transformer = transformer
 
-    Parameters
-    ----------
-    folds : list of dict containing train and test keys
-    train_func : callable
-    params : dict
-    pool : multiprocessing.dummy.Pool or None
+    def build_pool(self, folds, num_cv_jobs):
+        num_folds = len(folds)
+        num_workers = len(folds[0])
+        trials_pool_size = int(math.floor(num_cv_jobs / (num_workers * 
num_folds)))
+        if trials_pool_size > 1:
+            return Pool(trials_pool_size)
+        else:
+            return None
 
-    Returns
-    -------
-    list
-    """
-    def job(fold):
-        model = train_func(fold, params)
-        # TODO: Summary is hardcodeed to train/test
+    def make_cv_objective(self, train_func, folds, num_cv_jobs, **kwargs):
+        train_func = _py4j_retry(train_func, None)
+        if num_cv_jobs > 1:
+            cv_pool = Pool(num_cv_jobs)
+            cv_mapper = cv_pool.map
+        else:
+            cv_mapper = map
+
+        def f(params):
+            def inner(fold):
+                return train_func(fold, params, **kwargs)
+
+            return cv_mapper(inner, folds)
+
+        if not self.transformer:
+            return f
+
+        def g(params):
+            return [self.transformer(scores, params) for scores in f(params)]
+
+        return g
+
+    def eval_stage(self, train_func, stage, space, pool):
+        if 'condition' in stage and not stage['condition']():
+            return space, None
+        # Override current space with new space
+        merged = dict(space, **stage['space'])
+        best, trials = mjolnir.training.hyperopt.maximize(
+            train_func, merged, max_evals=stage['iterations'], 
trials_pool=pool)
+        # Override space with best parameters
+        # We don't have a guarantee that the name in tune_space and the
+        # name in best are the same. best gets named from the name
+        # parameter of the hyperopt.hp.* call. Would be nice to assert but
+        # couldn't figure out how.
+        merged.update(best)
+        return merged, trials
+
+    def __call__(self, train_func, pool):
+        space = self.initial_space
+        stages = []
+        for stage_name, stage in self.tune_stages:
+            space, trials = self.eval_stage(train_func, stage, space, pool)
+            if trials is not None:
+                stages.append((stage_name, trials))
+
+        trials_final = stages[-1][1]
+        best_trial = np.argmin(trials_final.losses())
+        loss = trials_final.losses()[best_trial]
+        true_loss = trials_final.results[best_trial].get('true_loss')
+
         return {
-            "train": model.summary().train(),
-            "test": model.summary().test(),
+            'trials': OrderedDict(stages),
+            'params': space,
+            'metrics': {
+                'cv-test': -loss,
+                'cv-train': -loss + true_loss
+            }
         }
-
-    job_w_retry = _py4j_retry(job, {
-        "train": [float('nan')],
-        "test": [float('nan')],
-    })
-
-    if pool is None:
-        return map(job_w_retry, folds)
-    else:
-        return pool.map(job_w_retry, folds)
diff --git a/mjolnir/training/xgboost.py b/mjolnir/training/xgboost.py
index 984c0e6..1a23a66 100644
--- a/mjolnir/training/xgboost.py
+++ b/mjolnir/training/xgboost.py
@@ -1,10 +1,8 @@
 from __future__ import absolute_import
-import functools
 import hyperopt
-import math
 import mjolnir.spark
 import mjolnir.training.hyperopt
-from multiprocessing.dummy import Pool
+from mjolnir.training.tuning import ModelSelection
 import numpy as np
 import pyspark
 import pyspark.sql
@@ -160,6 +158,7 @@
 class XGBoostModel(object):
     def __init__(self, j_xgb_model):
         self._j_xgb_model = j_xgb_model
+        self.summary = XGBoostSummary(self._j_xgb_model.summary())
 
     @staticmethod
     def trainWithFiles(fold, train_matrix, params, num_rounds=100,
@@ -315,6 +314,13 @@
         return XGBoostModel(j_xgb_model)
 
 
+def cv_transformer(model, params):
+    return {
+        "train": model.summary.train(),
+        "test": model.summary.test(),
+    }
+
+
 def tune(folds, stats, train_matrix, num_cv_jobs=5, initial_num_trees=100, 
final_num_trees=500):
     """Find appropriate hyperparameters for training df
 
@@ -356,32 +362,6 @@
         performed, each containing a hyperopt.Trials object recording what
         happened.
     """
-    cv_pool = None
-    if num_cv_jobs > 1:
-        cv_pool = Pool(num_cv_jobs)
-
-    # Configure the trials pool large enough to keep cv_pool full
-    num_folds = len(folds)
-    num_workers = len(folds[0])
-    trials_pool_size = int(math.floor(num_cv_jobs / (num_folds * num_workers)))
-    if trials_pool_size > 1:
-        print 'Running %d cross validations in parallel' % (trials_pool_size)
-        trials_pool = Pool(trials_pool_size)
-    else:
-        trials_pool = None
-
-    train_func = functools.partial(train, train_matrix=train_matrix)
-
-    def eval_space(space, max_evals):
-        """Eval a space using standard hyperopt"""
-        best, trials = mjolnir.training.hyperopt.minimize(
-            folds, train_func, space, max_evals=max_evals,
-            cv_pool=cv_pool, trials_pool=trials_pool)
-        for k, v in space.items():
-            if not np.isscalar(v):
-                print 'best %s: %f' % (k, best[k])
-        return best, trials
-
     num_obs = stats['num_observations']
 
     if num_obs > 8000000:
@@ -461,30 +441,7 @@
         'colsample_bytree': 0.8,
     }
 
-    stages = []
-    for name, stage_params in tune_spaces:
-        if 'condition' in stage_params and not stage_params['condition']():
-            continue
-        tune_space = stage_params['space']
-        for name, dist in tune_space.items():
-            space[name] = dist
-        best, trials = eval_space(space, stage_params['iterations'])
-        for name in tune_space.keys():
-            space[name] = best[name]
-        stages.append((name, trials))
-
-    trials = stages[-1][1]
-    best_trial = np.argmin(trials.losses())
-    loss = trials.losses()[best_trial]
-    true_loss = trials.results[best_trial].get('true_loss')
-
-    return {
-        'trials': {
-            'initial': trials,
-        },
-        'params': space,
-        'metrics': {
-            'cv-test': -loss,
-            'cv-train': -loss + true_loss
-        }
-    }
+    tuner = ModelSelection(space, tune_spaces, cv_transformer)
+    train_func = tuner.make_cv_objective(train, folds, num_cv_jobs, 
train_matrix=train_matrix)
+    trials_pool = tuner.build_pool(folds, num_cv_jobs)
+    return tuner(train_func, trials_pool)
diff --git a/mjolnir/utilities/training_pipeline.py 
b/mjolnir/utilities/training_pipeline.py
index 99a72c8..37d4473 100644
--- a/mjolnir/utilities/training_pipeline.py
+++ b/mjolnir/utilities/training_pipeline.py
@@ -89,7 +89,7 @@
             all_paths, best_params, train_matrix="all")
 
         tune_results['metrics'] = {
-            'train': model.summary().train()
+            'train': model.summary.train()
         }
         print 'train-ndcg@10: %.5f' % (tune_results['metrics']['train'][-1])
 

-- 
To view, visit https://gerrit.wikimedia.org/r/403869
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I8f2a2f3aeca85fe86cb6d466622a2e83dd249172
Gerrit-PatchSet: 4
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>
Gerrit-Reviewer: DCausse <[email protected]>
Gerrit-Reviewer: EBernhardson <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to