DCausse has submitted this change and it was merged. (
https://gerrit.wikimedia.org/r/361394 )
Change subject: Split hyperopt integration into its own file
......................................................................
Split hyperopt integration into its own file
The hyperopt integration is getting more and more complex, and an
upcoming patch to run multiple cross-validations in parallel only makes
it even more complex. Make the code a bit clearer by splitting out
hyperopt specific code into its own file.
Change-Id: If5400b8183e31eaf0d56fe50899d196b9f5e1314
---
A mjolnir/test/training/test_hyperopt.py
M mjolnir/test/training/test_tuning.py
A mjolnir/training/hyperopt.py
M mjolnir/training/tuning.py
M mjolnir/training/xgboost.py
5 files changed, 224 insertions(+), 199 deletions(-)
Approvals:
DCausse: Verified; Looks good to me, approved
diff --git a/mjolnir/test/training/test_hyperopt.py
b/mjolnir/test/training/test_hyperopt.py
new file mode 100644
index 0000000..f0bafad
--- /dev/null
+++ b/mjolnir/test/training/test_hyperopt.py
@@ -0,0 +1,84 @@
+import hyperopt
+import mjolnir.training.hyperopt
+from pyspark.ml.linalg import Vectors
+import pytest
+
+
+def _make_q(query, n=4):
+ "Generates single feature queries"
+ return [('foowiki', query, query, float(f), Vectors.dense([float(f)])) for
f in range(n)]
+
+
[email protected]
+def df_train(spark_context, hive_context):
+ # TODO: Use some fixture dataset representing real-ish data? But
+ # it needs to be pretty small
+ return spark_context.parallelize(
+ _make_q('abc') + _make_q('def') + _make_q('ghi') + _make_q('jkl')
+ + _make_q('mno') + _make_q('pqr') + _make_q('stu')
+ ).toDF(['wikiid', 'norm_query_id', 'query', 'label', 'features'])
+
+
+def test_minimize(df_train):
+ "Not an amazing test...basically sees if the happy path doesnt blow up"
+ space = {
+ 'num_rounds': 50,
+ 'max_depth': hyperopt.hp.quniform('max_depth', 1, 20, 1)
+ }
+
+ # mostly hyperopt just calls cross_validate, of which the integration with
+ # xgboost is separately tested. Instead of going all the way into xgboost
+ # mock it out w/MockModel.
+ best_params, trails = mjolnir.training.hyperopt.minimize(
+ df_train, MockModel, space, max_evals=5, num_folds=2,
+ num_fold_partitions=1, num_cv_jobs=1, num_workers=1)
+ assert isinstance(best_params, dict)
+ # num_rounds should have been unchanged
+ assert 'num_rounds' in best_params
+ assert best_params['num_rounds'] == 50
+ # should have max_evals evaluations
+ assert len(trails.trials) == 5
+
+
+def test_gridsearch(df_train):
+ space = {
+ 'num_rounds': 50,
+ 'max_depth': hyperopt.hp.choice('max_depth', [10, 20, 30]),
+ }
+
+ gen = MockModelGen()
+ best_params, trials = mjolnir.training.hyperopt.grid_search(
+ df_train, gen, space, num_folds=2,
+ num_fold_partitions=1, num_cv_jobs=1, num_workers=1)
+ assert isinstance(best_params, dict)
+ assert 'num_rounds' in best_params
+ # num rounds should be unchanged
+ assert best_params['num_rounds'] == 50
+ # should have 3 iterations for the 3 max depth's
+ assert len(trials.trials) == 3
+ param_depths = sorted([param['max_depth'] for param in gen.params])
+ # TODO: Why is this called 2x as many times as expected?
+ # For some reason the correct number of trials is still
+ # returned though.
+ assert [10, 10, 20, 20, 30, 30] == param_depths
+
+
+class MockModelGen(object):
+ def __init__(self):
+ self.params = []
+
+ def __call__(self, df, params, num_workers):
+ self.params.append(params)
+ return MockModel(df, params, num_workers)
+
+
+class MockModel(object):
+ def __init__(self, df, params, num_workers):
+ # Params that were passed to hyperopt
+ assert isinstance(params, dict)
+ assert 'max_depth' in params
+ assert params['num_rounds'] == 50
+ assert num_workers == 1
+
+ def eval(self, df_test, j_groups=None, feature_col='features',
label_col='label'):
+ return 1.0
diff --git a/mjolnir/test/training/test_tuning.py
b/mjolnir/test/training/test_tuning.py
index b8124b4..73d9057 100644
--- a/mjolnir/test/training/test_tuning.py
+++ b/mjolnir/test/training/test_tuning.py
@@ -1,4 +1,3 @@
-import hyperopt
import mjolnir.training.tuning
import mjolnir.training.xgboost
from pyspark.sql import functions as F
@@ -58,68 +57,3 @@
num_folds=2, num_fold_partitions=1, num_cv_jobs=1, num_workers=1)
# one score for each fold
assert len(scores) == 2
-
-
-def test_hyperopt(df_train):
- "Not an amazing test...basically sees if the happy path doesnt blow up"
- space = {
- 'num_rounds': 50,
- 'max_depth': hyperopt.hp.quniform('max_depth', 1, 20, 1)
- }
-
- # mostly hyperopt just calls cross_validate, of which the integration with
- # xgboost is separately tested. Instead of going all the way into xgboost
- # mock it out w/MockModel.
- best_params, trails = mjolnir.training.tuning.hyperopt(
- df_train, MockModel, space, max_evals=5, num_folds=2,
- num_fold_partitions=1, num_cv_jobs=1, num_workers=1)
- assert isinstance(best_params, dict)
- # num_rounds should have been unchanged
- assert 'num_rounds' in best_params
- assert best_params['num_rounds'] == 50
- # should have max_evals evaluations
- assert len(trails.trials) == 5
-
-
-def test_gridsearch(df_train):
- space = {
- 'num_rounds': 50,
- 'max_depth': hyperopt.hp.choice('max_depth', [10, 20, 30]),
- }
-
- gen = MockModelGen()
- best_params, trials = mjolnir.training.tuning.grid_search(
- df_train, gen, space, num_folds=2,
- num_fold_partitions=1, num_cv_jobs=1, num_workers=1)
- assert isinstance(best_params, dict)
- assert 'num_rounds' in best_params
- # num rounds should be unchanged
- assert best_params['num_rounds'] == 50
- # should have 3 iterations for the 3 max depth's
- assert len(trials.trials) == 3
- param_depths = sorted([param['max_depth'] for param in gen.params])
- # TODO: Why is this called 2x as many times as expected?
- # For some reason the correct number of trials is still
- # returned though.
- assert [10, 10, 20, 20, 30, 30] == param_depths
-
-
-class MockModelGen(object):
- def __init__(self):
- self.params = []
-
- def __call__(self, df, params, num_workers):
- self.params.append(params)
- return MockModel(df, params, num_workers)
-
-
-class MockModel(object):
- def __init__(self, df, params, num_workers):
- # Params that were passed to hyperopt
- assert isinstance(params, dict)
- assert 'max_depth' in params
- assert params['num_rounds'] == 50
- assert num_workers == 1
-
- def eval(self, df_test, j_groups=None, feature_col='features',
label_col='label'):
- return 1.0
diff --git a/mjolnir/training/hyperopt.py b/mjolnir/training/hyperopt.py
new file mode 100644
index 0000000..518497b
--- /dev/null
+++ b/mjolnir/training/hyperopt.py
@@ -0,0 +1,136 @@
+"""
+Integration and additional features for the hyperopt library
+"""
+from __future__ import absolute_import
+import hyperopt
+import hyperopt.pyll.base
+import itertools
+import math
+import mjolnir.training.tuning
+import numpy as np
+
+
+class _GridSearchAlgo(object):
+ def __init__(self, space):
+ foo = {}
+ for k, v in space.items():
+ if not isinstance(v, hyperopt.pyll.base.Apply):
+ continue
+ literals = v.pos_args[1:]
+ if not all([isinstance(l, hyperopt.pyll.base.Literal) for l in
literals]):
+ raise ValueError('GridSearch only works with hp.choice')
+ foo[k] = range(len(literals))
+ self.grid_keys = foo.keys()
+ self.grids = list(itertools.product(*foo.values()))
+ self.max_evals = len(self.grids)
+
+ def __call__(self, new_ids, domain, trials, seed):
+ rval = []
+ for ii, new_id in enumerate(new_ids):
+ vals = dict(zip(self.grid_keys, [[v] for v in self.grids.pop()]))
+ new_result = domain.new_result()
+ new_misc = dict(tid=new_id, cmd=domain.cmd, workdir=domain.workdir,
+ idxs=dict(zip(self.grid_keys, [[new_id]] *
len(vals))),
+ vals=vals)
+ rval.extend(trials.new_trial_docs([new_id],
+ [None], [new_result], [new_misc]))
+ return rval
+
+
+def grid_search(df, train_func, space, num_folds=5, num_fold_partitions=100,
+ num_cv_jobs=5, num_workers=5):
+ # TODO: While this tried to look simple, hyperopt is a bit odd to integrate
+ # with this directly. Perhaps implement naive gridsearch directly instead
+ # of through hyperopt.
+ algo = _GridSearchAlgo(space)
+ return minimize(df, train_func, space, algo.max_evals, algo, num_folds,
+ num_fold_partitions, num_cv_jobs, num_workers)
+
+
+def minimize(df, train_func, space, max_evals=50, algo=hyperopt.tpe.suggest,
+ num_folds=5, num_fold_partitions=100, num_cv_jobs=5,
num_workers=5):
+ """Perform cross validated hyperparameter optimization of train_func
+
+ Parameters
+ ----------
+ df : pyspark.sql.DataFrame
+ Features and Labels to optimize over
+ train_func : callable
+ Function to use for training individual models
+ space : dict
+ Hyperparameter space to search over.
+ max_evals : int
+ Maximum iterations of hyperparameter tuning to perform.
+ algo : callable
+ The algorithm to use with hyperopt. See docs of hyperopt.fmin for more
+ details.
+ num_folds : int
+ Number of folds to split df into for cross validation
+ num_fold_partitions : int
+ Sets the number of partitions to split with. Each partition needs
+ to be some minimum size for averages to work out to an evenly split
+ final set. (Default: 100)
+ num_cv_jobs : int
+ Number of cross validation folds to train in parallel
+ num_workers : int
+ Number of executors to use for each model training
+ cache : bool
+ True if the folds of df should be individually cached
+ unpersist : bool
+ True if the folds of df should be unpersisted when complete.
+
+ Returns
+ -------
+ best_params : dict
+ The best parameters found within space
+ trials : hyperopt.Trials
+ Information about every iteration of the search
+ """
+ def objective(params):
+ scores = mjolnir.training.tuning._cross_validate(
+ folds, train_func, params, num_cv_jobs=num_cv_jobs,
+ num_workers=num_workers)
+ # For now the only metric is NDCG, and hyperopt is a minimizer
+ # so return the negative NDCG
+ loss = [-s['test'] for s in scores]
+ true_loss = [s['train'] - s['test'] for s in scores]
+ num_failures = sum([math.isnan(s) for s in loss])
+ if num_failures > 1:
+ return {
+ 'status': hyperopt.STATUS_FAIL,
+ 'failure': 'Too many failures: %d' % (num_failures)
+ }
+ else:
+ loss = [s for s in loss if not math.isnan(s)]
+ true_loss = [s for s in true_loss if not math.isnan(s)]
+ return {
+ 'status': hyperopt.STATUS_OK,
+ 'loss': sum(loss) / float(len(loss)),
+ 'loss_variance': np.var(loss),
+ 'true_loss': sum(true_loss) / float(len(true_loss)),
+ 'true_loss_variance': np.var(true_loss),
+ }
+
+ folds = mjolnir.training.tuning._make_folds(
+ df, num_folds=num_folds, num_workers=num_workers,
+ num_fold_partitions=num_fold_partitions, num_cv_jobs=num_cv_jobs)
+
+ for fold in folds:
+ fold['train'].cache()
+ fold['test'].cache()
+
+ try:
+ trials = hyperopt.Trials()
+ best = hyperopt.fmin(objective, space, algo=algo,
+ max_evals=max_evals, trials=trials)
+ finally:
+ for fold in folds:
+ fold['train'].unpersist()
+ fold['test'].unpersist()
+
+ # hyperopt only returns the non-constant parameters in best. It seems
+ # more convenient to return all of them.
+ best_merged = space.copy()
+ best_merged.update(best)
+
+ return (best_merged, trials)
diff --git a/mjolnir/training/tuning.py b/mjolnir/training/tuning.py
index ef5b288..7814748 100644
--- a/mjolnir/training/tuning.py
+++ b/mjolnir/training/tuning.py
@@ -3,13 +3,8 @@
"""
from collections import defaultdict
-import hyperopt as _hyperopt
-import hyperopt.pyll.base as _hyperopt_pyll_base
-import itertools
-import math
import mjolnir.spark
from multiprocessing.dummy import Pool
-import numpy as np
import py4j.protocol
from pyspark.sql import functions as F
@@ -267,128 +262,3 @@
folds = _make_folds(df, num_folds, num_fold_partitions, num_cv_jobs,
num_workers)
return _cross_validate(folds, train_func, params, num_cv_jobs=num_cv_jobs,
num_workers=num_workers)
-
-
-class _GridSearchAlgo(object):
- def __init__(self, space):
- foo = {}
- for k, v in space.items():
- if not isinstance(v, _hyperopt_pyll_base.Apply):
- continue
- literals = v.pos_args[1:]
- if not all([isinstance(l, _hyperopt_pyll_base.Literal) for l in
literals]):
- raise ValueError('GridSearch only works with hp.choice')
- foo[k] = range(len(literals))
- self.grid_keys = foo.keys()
- self.grids = list(itertools.product(*foo.values()))
- self.max_evals = len(self.grids)
-
- def __call__(self, new_ids, domain, trials, seed):
- rval = []
- for ii, new_id in enumerate(new_ids):
- vals = dict(zip(self.grid_keys, [[v] for v in self.grids.pop()]))
- new_result = domain.new_result()
- new_misc = dict(tid=new_id, cmd=domain.cmd, workdir=domain.workdir,
- idxs=dict(zip(self.grid_keys, [[new_id]] *
len(vals))),
- vals=vals)
- rval.extend(trials.new_trial_docs([new_id],
- [None], [new_result], [new_misc]))
- import pprint
- pprint.pprint(rval)
- return rval
-
-
-def grid_search(df, train_func, space, num_folds=5, num_fold_partitions=100,
- num_cv_jobs=5, num_workers=5):
- # TODO: While this tried to look simple, hyperopt is a bit odd to integrate
- # with this directly. Perhaps implement naive gridsearch directly instead
- # of through hyperopt.
- algo = _GridSearchAlgo(space)
- return hyperopt(df, train_func, space, algo.max_evals, algo, num_folds,
- num_fold_partitions, num_cv_jobs, num_workers)
-
-
-def hyperopt(df, train_func, space, max_evals=50, algo=_hyperopt.tpe.suggest,
- num_folds=5, num_fold_partitions=100, num_cv_jobs=5,
num_workers=5):
- """Perform cross validated hyperparameter optimization of train_func
-
- Parameters
- ----------
- df : pyspark.sql.DataFrame
- Features and Labels to optimize over
- train_func : callable
- Function to use for training individual models
- space : dict
- Hyperparameter space to search over.
- max_evals : int
- Maximum iterations of hyperparameter tuning to perform.
- algo : callable
- The algorithm to use with hyperopt. See docs of hyperopt.fmin for more
- details.
- num_folds : int
- Number of folds to split df into for cross validation
- num_fold_partitions : int
- Sets the number of partitions to split with. Each partition needs
- to be some minimum size for averages to work out to an evenly split
- final set. (Default: 100)
- num_cv_jobs : int
- Number of cross validation folds to train in parallel
- num_workers : int
- Number of executors to use for each model training
- cache : bool
- True if the folds of df should be individually cached
- unpersist : bool
- True if the folds of df should be unpersisted when complete.
-
- Returns
- -------
- best_params : dict
- The best parameters found within space
- trials : hyperopt.Trials
- Information about every iteration of the search
- """
- def objective(params):
- scores = _cross_validate(folds, train_func, params,
num_cv_jobs=num_cv_jobs,
- num_workers=num_workers)
- # For now the only metric is NDCG, and hyperopt is a minimizer
- # so return the negative NDCG
- loss = [-s['test'] for s in scores]
- true_loss = [s['train'] - s['test'] for s in scores]
- num_failures = sum([math.isnan(s) for s in loss])
- if num_failures > 1:
- return {
- 'status': _hyperopt.STATUS_FAIL,
- 'failure': 'Too many failures: %d' % (num_failures)
- }
- else:
- loss = [s for s in loss if not math.isnan(s)]
- true_loss = [s for s in true_loss if not math.isnan(s)]
- return {
- 'status': _hyperopt.STATUS_OK,
- 'loss': sum(loss) / float(len(loss)),
- 'loss_variance': np.var(loss),
- 'true_loss': sum(true_loss) / float(len(true_loss)),
- 'true_loss_variance': np.var(true_loss),
- }
-
- folds = _make_folds(df, num_folds=num_folds, num_workers=num_workers,
- num_fold_partitions=num_fold_partitions,
num_cv_jobs=num_cv_jobs)
-
- for fold in folds:
- fold['train'].cache()
- fold['test'].cache()
-
- try:
- trials = _hyperopt.Trials()
- best = _hyperopt.fmin(objective, space, algo=algo,
max_evals=max_evals, trials=trials)
- finally:
- for fold in folds:
- fold['train'].unpersist()
- fold['test'].unpersist()
-
- # hyperopt only returns the non-constant parameters in best. It seems
- # more convenient to return all of them.
- best_merged = space.copy()
- best_merged.update(best)
-
- return (best_merged, trials)
diff --git a/mjolnir/training/xgboost.py b/mjolnir/training/xgboost.py
index c108e11..f64b115 100644
--- a/mjolnir/training/xgboost.py
+++ b/mjolnir/training/xgboost.py
@@ -1,6 +1,7 @@
+from __future__ import absolute_import
import hyperopt
import mjolnir.spark
-import mjolnir.training.tuning
+import mjolnir.training.hyperopt
import numpy as np
import pprint
import pyspark.sql
@@ -442,7 +443,7 @@
"""
def eval_space(space, max_evals):
"""Eval a space using standard hyperopt"""
- best, trials = mjolnir.training.tuning.hyperopt(
+ best, trials = mjolnir.training.hyperopt.minimize(
df, train, space, max_evals=max_evals,
num_folds=num_folds, num_fold_partitions=num_fold_partitions,
num_cv_jobs=num_cv_jobs, num_workers=num_workers)
@@ -453,7 +454,7 @@
def eval_space_grid(space):
"""Eval all points in the space via a grid search"""
- best, trials = mjolnir.training.tuning.grid_search(
+ best, trials = mjolnir.training.hyperopt.grid_search(
df, train, space, num_folds=num_folds,
num_fold_partitions=num_fold_partitions,
num_cv_jobs=num_cv_jobs, num_workers=num_workers)
for k, v in space.items():
--
To view, visit https://gerrit.wikimedia.org/r/361394
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: If5400b8183e31eaf0d56fe50899d196b9f5e1314
Gerrit-PatchSet: 5
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>
Gerrit-Reviewer: DCausse <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits