jenkins-bot has submitted this change and it was merged. (
https://gerrit.wikimedia.org/r/403869 )
Change subject: Generalize tuning pipeline
......................................................................
Generalize tuning pipeline
This pipeline was pretty convoluted. Push most of the complexity
up out of the pipeline into the single ModelSelection object leaving
the rest of the model selection code (cross validation, tuning,
parameter selection, etc) clearer and more directly implemented.
This also provides a reusable tuning implementation to share between
xgboost and lightgbm.
Change-Id: I8f2a2f3aeca85fe86cb6d466622a2e83dd249172
---
M mjolnir/test/training/test_hyperopt.py
M mjolnir/test/training/test_tuning.py
M mjolnir/training/hyperopt.py
M mjolnir/training/tuning.py
M mjolnir/training/xgboost.py
M mjolnir/utilities/training_pipeline.py
6 files changed, 229 insertions(+), 175 deletions(-)
Approvals:
jenkins-bot: Verified
DCausse: Looks good to me, approved
diff --git a/mjolnir/test/training/test_hyperopt.py
b/mjolnir/test/training/test_hyperopt.py
index c4c4782..1dec547 100644
--- a/mjolnir/test/training/test_hyperopt.py
+++ b/mjolnir/test/training/test_hyperopt.py
@@ -1,27 +1,19 @@
from __future__ import absolute_import
import hyperopt
import mjolnir.training.hyperopt
-from pyspark.ml.linalg import Vectors
-import pytest
-def _make_q(query, n=4):
- "Generates single feature queries"
- return [('foowiki', query, query, float(f), Vectors.dense([float(f)])) for
f in range(n)]
-
-
[email protected]
-def df_train(spark_context, hive_context):
- # TODO: Use some fixture dataset representing real-ish data? But
- # it needs to be pretty small
- return spark_context.parallelize(
- _make_q('abc') + _make_q('def') + _make_q('ghi') + _make_q('jkl')
- + _make_q('mno') + _make_q('pqr') + _make_q('stu')
- ).toDF(['wikiid', 'norm_query_id', 'query', 'label', 'features'])
-
-
-def test_minimize(folds_b):
+def test_maximize(folds_b):
"Not an amazing test...basically sees if the happy path doesnt blow up"
+ def f(params):
+ assert isinstance(params, dict)
+ assert 'max_depth' in params
+ assert params['num_rounds'] == 50
+ return [{
+ 'train': [0.80],
+ 'test': [0.79],
+ }]
+
space = {
'num_rounds': 50,
'max_depth': hyperopt.hp.quniform('max_depth', 1, 20, 1)
@@ -30,33 +22,11 @@
# mostly hyperopt just calls cross_validate, of which the integration with
# xgboost is separately tested. Instead of going all the way into xgboost
# mock it out w/MockModel.
- best_params, trails = mjolnir.training.hyperopt.minimize(
- folds_b, MockModel, space, max_evals=5)
+ best_params, trails = mjolnir.training.hyperopt.maximize(
+ f, space, max_evals=5)
assert isinstance(best_params, dict)
# num_rounds should have been unchanged
assert 'num_rounds' in best_params
assert best_params['num_rounds'] == 50
# should have max_evals evaluations
assert len(trails.trials) == 5
-
-
-class MockSummary(object):
- def train(self):
- return [1.]
-
- def test(self):
- return [1.]
-
-
-class MockModel(object):
- def __init__(self, df, params, train_matrix=None):
- # Params that were passed to hyperopt
- assert isinstance(params, dict)
- assert 'max_depth' in params
- assert params['num_rounds'] == 50
-
- def eval(self, df_test, j_groups=None, feature_col='features',
label_col='label'):
- return 1.0
-
- def summary(self):
- return MockSummary()
diff --git a/mjolnir/test/training/test_tuning.py
b/mjolnir/test/training/test_tuning.py
index e14982b..22402f1 100644
--- a/mjolnir/test/training/test_tuning.py
+++ b/mjolnir/test/training/test_tuning.py
@@ -1,8 +1,8 @@
from __future__ import absolute_import
+import hyperopt
import mjolnir.training.tuning
import mjolnir.training.xgboost
from pyspark.sql import functions as F
-from pyspark.ml.linalg import Vectors
import pytest
@@ -32,27 +32,115 @@
assert len(queries_in_0.intersection(queries_in_1)) == 0
-def _make_q(query, n=4):
- "Generates single feature queries"
- return [('foowiki', query, query, float(f), Vectors.dense([float(f)])) for
f in range(n)]
+def run_model_selection(tune_stages, f=None, num_cv_jobs=1, **kwargs):
+ stats = {'called': 0}
+ initial_space = {'foo': 10, 'bar': 20, 'baz': 0}
+ folds = [[1, 2, 3], [4, 5, 6]]
+ if not f:
+ def f(fold, params, **kwargs):
+ stats['called'] += 1
+ factor = 1.0 / (6 * params['foo'])
+ return {
+ 'test': [v * factor * 0.9 for v in fold],
+ 'train': [v * factor for v in fold],
+ }
+
+ tuner = mjolnir.training.tuning.ModelSelection(initial_space, tune_stages)
+ train_func = tuner.make_cv_objective(f, folds, num_cv_jobs, **kwargs)
+ trials_pool = tuner.build_pool(folds, num_cv_jobs)
+ result = tuner(train_func, trials_pool)
+ return result, stats['called']
[email protected]
-def df_train(spark_context, hive_context):
- # TODO: Use some fixture dataset representing real-ish data? But
- # it needs to be pretty small
- return spark_context.parallelize(
- _make_q('abc') + _make_q('def') + _make_q('ghi') + _make_q('jkl')
- + _make_q('mno') + _make_q('pqr') + _make_q('stu')
- ).toDF(['wikiid', 'norm_query_id', 'query', 'label', 'features'])
+def test_ModelSelection():
+ num_iterations = 3
+ result, called = run_model_selection([
+ ('a', {
+ 'iterations': num_iterations,
+ 'space': {
+ 'foo': hyperopt.hp.uniform('foo', 1, 9),
+ },
+ }),
+ ('b', {
+ 'iterations': num_iterations,
+ 'space': {
+ 'bar': hyperopt.hp.uniform('bar', 1, 5),
+ },
+ })
+ ])
+ # stages * iterations * folds
+ assert called == 2 * num_iterations * 2
+ # We should still have three parameters
+ assert len(result['params']) == 3
+ # foo should have a new value between 1 and 9
+ assert 1 <= result['params']['foo'] <= 9
+ # bar should have a new value between 1 and 5
+ assert 1 <= result['params']['bar'] <= 5
+ # baz should be untouched
+ assert result['params']['baz'] == 0
-def test_cross_validate_plain_df(folds_a):
- scores = mjolnir.training.tuning.cross_validate(
- folds_a,
- mjolnir.training.xgboost.train,
- {'objective': 'rank:ndcg', 'eval_metric': 'ndcg@3', 'num_rounds': 1},
- pool=None)
- # one score for each fold
- for fold, score in zip(folds_a, scores):
- assert fold[0].keys() == score.keys()
+def test_ModelSelection_stage_condition():
+ num_iterations = 3
+ result, called = run_model_selection([
+ ('a', {
+ 'condition': lambda: False,
+ 'iterations': num_iterations,
+ 'space': {
+ 'foo': hyperopt.hp.uniform('foo', 1, 9),
+ }
+ }),
+ ('b', {
+ 'iterations': num_iterations,
+ 'space': {
+ 'bar': hyperopt.hp.uniform('bar', 1, 9),
+ }
+ }),
+ ])
+ # iterations * folds
+ assert called == num_iterations * 2
+ assert result['params']['foo'] == 10
+ assert 1 <= result['params']['bar'] <= 9
+ assert result['params']['baz'] == 0
+
+
+def test_ModelSelection_kwargs_pass_thru():
+ tuner = mjolnir.training.tuning.ModelSelection(None, None)
+ expected_kwargs = {'hi': 5, 'there': 'test'}
+
+ def f(fold, params, **kwargs):
+ assert kwargs == expected_kwargs
+ return {'test': [fold[0]], 'train': [fold[0]]}
+
+ obj = tuner.make_cv_objective(f, [[1], [2]], 1, **expected_kwargs)
+
+ res = obj(None)
+ assert res == [
+ {'test': [1], 'train': [1]},
+ {'test': [2], 'train': [2]}
+ ]
+
+
[email protected](
+ "num_folds, num_workers, num_cv_jobs, expect_pool", [
+ (1, 1, 1, False),
+ (1, 1, 2, True),
+
+ (3, 1, 1, False),
+ (3, 1, 5, False),
+ (3, 1, 6, True),
+
+ (3, 3, 1, False),
+ (3, 3, 10, False),
+ (3, 3, 17, False),
+ (3, 3, 18, True),
+
+ (5, 1, 5, False),
+ (5, 1, 9, False),
+ (5, 1, 11, True),
+ ])
+def test_ModelSelection_build_pool(num_folds, num_workers, num_cv_jobs,
expect_pool):
+ tuner = mjolnir.training.tuning.ModelSelection(None, None)
+ folds = [[1] * num_workers for i in range(num_folds)]
+ pool = tuner.build_pool(folds, num_cv_jobs)
+ assert (pool is not None) == expect_pool
diff --git a/mjolnir/training/hyperopt.py b/mjolnir/training/hyperopt.py
index 9976bb3..c769a90 100644
--- a/mjolnir/training/hyperopt.py
+++ b/mjolnir/training/hyperopt.py
@@ -5,8 +5,6 @@
import hyperopt
import hyperopt.pyll.base
from hyperopt.utils import coarse_utcnow
-import math
-import mjolnir.training.tuning
import numpy as np
@@ -80,26 +78,22 @@
return state
-def minimize(folds, train_func, space, max_evals=50, algo=hyperopt.tpe.suggest,
- cv_pool=None, trials_pool=None):
- """Perform cross validated hyperparameter optimization of train_func
+def maximize(f, space, max_evals=50, algo=hyperopt.tpe.suggest,
+ trials_pool=None):
+ """Maximize the loss of f over the provided space
Parameters
----------
- folds : list of dict each with two keys, train and test
- Features and Labels to optimize over
- train_func : callable
- Function to use for training individual models
+ f : callable
+ Function to maximize. Will be provided with a dict and expected
+ to return a list of dicts each containing test and train keys
space : dict
- Hyperparameter space to search over.
+ Hyperparameter space to search over, from hyperopt.hp.*.
max_evals : int
Maximum iterations of hyperparameter tuning to perform.
algo : callable
The algorithm to use with hyperopt. See docs of hyperopt.fmin for more
details.
- cv_pool : multiprocessing.dummy.Pool or None
- Controls the number of models to run in parallel. If None models
- are trained sequentially.
trials_pool : multiprocessing.dummy.Pool or None
Controls the number of hyperopt trials run in parallel. If None trials
are run sequentially.
@@ -113,28 +107,24 @@
"""
def objective(params):
- scores = mjolnir.training.tuning.cross_validate(
- folds, train_func, params, pool=cv_pool)
+ scores = f(params)
+ if scores is None:
+ return {
+ 'status': hyperopt.STATUS_FAIL,
+ 'failure': 'Complete failure, no score returned'
+ }
# For now the only metric is NDCG, and hyperopt is a minimizer
# so return the negative NDCG. Also makes the bold assumption
# we had at least two pieces of the fold named 'test' and 'train'
loss = [-s['test'][-1] for s in scores]
true_loss = [s['train'][-1] - s['test'][-1] for s in scores]
- num_failures = sum([math.isnan(s) for s in loss])
- if num_failures > 1:
- return {
- 'status': hyperopt.STATUS_FAIL,
- 'failure': 'Too many failures: %d' % (num_failures)
- }
- else:
- loss = [s for s in loss if not math.isnan(s)]
- true_loss = [s for s in true_loss if not math.isnan(s)]
return {
'status': hyperopt.STATUS_OK,
'loss': sum(loss) / float(len(loss)),
'loss_variance': np.var(loss),
'true_loss': sum(true_loss) / float(len(true_loss)),
'true_loss_variance': np.var(true_loss),
+ 'scores': scores,
}
if trials_pool is None:
diff --git a/mjolnir/training/tuning.py b/mjolnir/training/tuning.py
index f106f88..7d2df68 100644
--- a/mjolnir/training/tuning.py
+++ b/mjolnir/training/tuning.py
@@ -1,11 +1,16 @@
"""
-Support for making test/train or k-fold splits
+Support for choosing model parameters.
+
+Includes dataset splitting, cross validation and model selection
"""
from __future__ import absolute_import
-from collections import defaultdict
+from collections import defaultdict, OrderedDict
import mjolnir.spark
import py4j.protocol
from pyspark.sql import functions as F
+import math
+from multiprocessing.dummy import Pool
+import numpy as np
def split(df, splits, output_column='fold'):
@@ -95,9 +100,11 @@
num_folds : int
output_column : str, optional
- Yields
+ Returns
------
- dict
+ pyspark.sql.DataFrame
+ Input data frame with a 'fold' column indicating fold membership.
+ Normalized queries are equally distributed to each fold.
"""
return (
split(df, [1. / num_folds] * num_folds, output_column)
@@ -126,34 +133,76 @@
return with_retry
-def cross_validate(folds, train_func, params, pool):
- """Perform cross validation of the provided folds
+class ModelSelection(object):
+ def __init__(self, initial_space, tune_stages, transformer=None):
+ self.initial_space = initial_space
+ self.tune_stages = tune_stages
+ self.transformer = transformer
- Parameters
- ----------
- folds : list of dict containing train and test keys
- train_func : callable
- params : dict
- pool : multiprocessing.dummy.Pool or None
+ def build_pool(self, folds, num_cv_jobs):
+ num_folds = len(folds)
+ num_workers = len(folds[0])
+ trials_pool_size = int(math.floor(num_cv_jobs / (num_workers *
num_folds)))
+ if trials_pool_size > 1:
+ return Pool(trials_pool_size)
+ else:
+ return None
- Returns
- -------
- list
- """
- def job(fold):
- model = train_func(fold, params)
- # TODO: Summary is hardcodeed to train/test
+ def make_cv_objective(self, train_func, folds, num_cv_jobs, **kwargs):
+ train_func = _py4j_retry(train_func, None)
+ if num_cv_jobs > 1:
+ cv_pool = Pool(num_cv_jobs)
+ cv_mapper = cv_pool.map
+ else:
+ cv_mapper = map
+
+ def f(params):
+ def inner(fold):
+ return train_func(fold, params, **kwargs)
+
+ return cv_mapper(inner, folds)
+
+ if not self.transformer:
+ return f
+
+ def g(params):
+ return [self.transformer(scores, params) for scores in f(params)]
+
+ return g
+
+ def eval_stage(self, train_func, stage, space, pool):
+ if 'condition' in stage and not stage['condition']():
+ return space, None
+ # Override current space with new space
+ merged = dict(space, **stage['space'])
+ best, trials = mjolnir.training.hyperopt.maximize(
+ train_func, merged, max_evals=stage['iterations'],
trials_pool=pool)
+ # Override space with best parameters
+ # We don't have a guarantee that the name in tune_space and the
+ # name in best are the same. best gets named from the name
+ # parameter of the hyperopt.hp.* call. Would be nice to assert but
+ # couldn't figure out how.
+ merged.update(best)
+ return merged, trials
+
+ def __call__(self, train_func, pool):
+ space = self.initial_space
+ stages = []
+ for stage_name, stage in self.tune_stages:
+ space, trials = self.eval_stage(train_func, stage, space, pool)
+ if trials is not None:
+ stages.append((stage_name, trials))
+
+ trials_final = stages[-1][1]
+ best_trial = np.argmin(trials_final.losses())
+ loss = trials_final.losses()[best_trial]
+ true_loss = trials_final.results[best_trial].get('true_loss')
+
return {
- "train": model.summary().train(),
- "test": model.summary().test(),
+ 'trials': OrderedDict(stages),
+ 'params': space,
+ 'metrics': {
+ 'cv-test': -loss,
+ 'cv-train': -loss + true_loss
+ }
}
-
- job_w_retry = _py4j_retry(job, {
- "train": [float('nan')],
- "test": [float('nan')],
- })
-
- if pool is None:
- return map(job_w_retry, folds)
- else:
- return pool.map(job_w_retry, folds)
diff --git a/mjolnir/training/xgboost.py b/mjolnir/training/xgboost.py
index 984c0e6..1a23a66 100644
--- a/mjolnir/training/xgboost.py
+++ b/mjolnir/training/xgboost.py
@@ -1,10 +1,8 @@
from __future__ import absolute_import
-import functools
import hyperopt
-import math
import mjolnir.spark
import mjolnir.training.hyperopt
-from multiprocessing.dummy import Pool
+from mjolnir.training.tuning import ModelSelection
import numpy as np
import pyspark
import pyspark.sql
@@ -160,6 +158,7 @@
class XGBoostModel(object):
def __init__(self, j_xgb_model):
self._j_xgb_model = j_xgb_model
+ self.summary = XGBoostSummary(self._j_xgb_model.summary())
@staticmethod
def trainWithFiles(fold, train_matrix, params, num_rounds=100,
@@ -315,6 +314,13 @@
return XGBoostModel(j_xgb_model)
+def cv_transformer(model, params):
+ return {
+ "train": model.summary.train(),
+ "test": model.summary.test(),
+ }
+
+
def tune(folds, stats, train_matrix, num_cv_jobs=5, initial_num_trees=100,
final_num_trees=500):
"""Find appropriate hyperparameters for training df
@@ -356,32 +362,6 @@
performed, each containing a hyperopt.Trials object recording what
happened.
"""
- cv_pool = None
- if num_cv_jobs > 1:
- cv_pool = Pool(num_cv_jobs)
-
- # Configure the trials pool large enough to keep cv_pool full
- num_folds = len(folds)
- num_workers = len(folds[0])
- trials_pool_size = int(math.floor(num_cv_jobs / (num_folds * num_workers)))
- if trials_pool_size > 1:
- print 'Running %d cross validations in parallel' % (trials_pool_size)
- trials_pool = Pool(trials_pool_size)
- else:
- trials_pool = None
-
- train_func = functools.partial(train, train_matrix=train_matrix)
-
- def eval_space(space, max_evals):
- """Eval a space using standard hyperopt"""
- best, trials = mjolnir.training.hyperopt.minimize(
- folds, train_func, space, max_evals=max_evals,
- cv_pool=cv_pool, trials_pool=trials_pool)
- for k, v in space.items():
- if not np.isscalar(v):
- print 'best %s: %f' % (k, best[k])
- return best, trials
-
num_obs = stats['num_observations']
if num_obs > 8000000:
@@ -461,30 +441,7 @@
'colsample_bytree': 0.8,
}
- stages = []
- for name, stage_params in tune_spaces:
- if 'condition' in stage_params and not stage_params['condition']():
- continue
- tune_space = stage_params['space']
- for name, dist in tune_space.items():
- space[name] = dist
- best, trials = eval_space(space, stage_params['iterations'])
- for name in tune_space.keys():
- space[name] = best[name]
- stages.append((name, trials))
-
- trials = stages[-1][1]
- best_trial = np.argmin(trials.losses())
- loss = trials.losses()[best_trial]
- true_loss = trials.results[best_trial].get('true_loss')
-
- return {
- 'trials': {
- 'initial': trials,
- },
- 'params': space,
- 'metrics': {
- 'cv-test': -loss,
- 'cv-train': -loss + true_loss
- }
- }
+ tuner = ModelSelection(space, tune_spaces, cv_transformer)
+ train_func = tuner.make_cv_objective(train, folds, num_cv_jobs,
train_matrix=train_matrix)
+ trials_pool = tuner.build_pool(folds, num_cv_jobs)
+ return tuner(train_func, trials_pool)
diff --git a/mjolnir/utilities/training_pipeline.py
b/mjolnir/utilities/training_pipeline.py
index 99a72c8..37d4473 100644
--- a/mjolnir/utilities/training_pipeline.py
+++ b/mjolnir/utilities/training_pipeline.py
@@ -89,7 +89,7 @@
all_paths, best_params, train_matrix="all")
tune_results['metrics'] = {
- 'train': model.summary().train()
+ 'train': model.summary.train()
}
print 'train-ndcg@10: %.5f' % (tune_results['metrics']['train'][-1])
--
To view, visit https://gerrit.wikimedia.org/r/403869
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I8f2a2f3aeca85fe86cb6d466622a2e83dd249172
Gerrit-PatchSet: 4
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>
Gerrit-Reviewer: DCausse <[email protected]>
Gerrit-Reviewer: EBernhardson <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits