DCausse has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/361394 )

Change subject: Split hyperopt integration into its own file
......................................................................


Split hyperopt integration into its own file

The hyperopt integration is getting more and more complex, and an
upcoming patch to run multiple cross-validations in parallel only makes
it even more complex. Make the code a bit clearer by splitting out
hyperopt specific code into its own file.

Change-Id: If5400b8183e31eaf0d56fe50899d196b9f5e1314
---
A mjolnir/test/training/test_hyperopt.py
M mjolnir/test/training/test_tuning.py
A mjolnir/training/hyperopt.py
M mjolnir/training/tuning.py
M mjolnir/training/xgboost.py
5 files changed, 224 insertions(+), 199 deletions(-)

Approvals:
  DCausse: Verified; Looks good to me, approved



diff --git a/mjolnir/test/training/test_hyperopt.py 
b/mjolnir/test/training/test_hyperopt.py
new file mode 100644
index 0000000..f0bafad
--- /dev/null
+++ b/mjolnir/test/training/test_hyperopt.py
@@ -0,0 +1,84 @@
+import hyperopt
+import mjolnir.training.hyperopt
+from pyspark.ml.linalg import Vectors
+import pytest
+
+
+def _make_q(query, n=4):
+    "Generates single feature queries"
+    return [('foowiki', query, query, float(f), Vectors.dense([float(f)])) for 
f in range(n)]
+
+
[email protected]
+def df_train(spark_context, hive_context):
+    # TODO: Use some fixture dataset representing real-ish data? But
+    # it needs to be pretty small
+    return spark_context.parallelize(
+        _make_q('abc') + _make_q('def') + _make_q('ghi') + _make_q('jkl')
+        + _make_q('mno') + _make_q('pqr') + _make_q('stu')
+    ).toDF(['wikiid', 'norm_query_id', 'query', 'label', 'features'])
+
+
+def test_minimize(df_train):
+    "Not an amazing test...basically sees if the happy path doesnt blow up"
+    space = {
+        'num_rounds': 50,
+        'max_depth': hyperopt.hp.quniform('max_depth', 1, 20, 1)
+    }
+
+    # mostly hyperopt just calls cross_validate, of which the integration with
+    # xgboost is separately tested. Instead of going all the way into xgboost
+    # mock it out w/MockModel.
+    best_params, trails = mjolnir.training.hyperopt.minimize(
+        df_train, MockModel, space, max_evals=5, num_folds=2,
+        num_fold_partitions=1, num_cv_jobs=1, num_workers=1)
+    assert isinstance(best_params, dict)
+    # num_rounds should have been unchanged
+    assert 'num_rounds' in best_params
+    assert best_params['num_rounds'] == 50
+    # should have max_evals evaluations
+    assert len(trails.trials) == 5
+
+
+def test_gridsearch(df_train):
+    space = {
+        'num_rounds': 50,
+        'max_depth': hyperopt.hp.choice('max_depth', [10, 20, 30]),
+    }
+
+    gen = MockModelGen()
+    best_params, trials = mjolnir.training.hyperopt.grid_search(
+        df_train, gen, space, num_folds=2,
+        num_fold_partitions=1, num_cv_jobs=1, num_workers=1)
+    assert isinstance(best_params, dict)
+    assert 'num_rounds' in best_params
+    # num rounds should be unchanged
+    assert best_params['num_rounds'] == 50
+    # should have 3 iterations for the 3 max depth's
+    assert len(trials.trials) == 3
+    param_depths = sorted([param['max_depth'] for param in gen.params])
+    # TODO: Why is this called 2x as many times as expected?
+    # For some reason the correct number of trials is still
+    # returned though.
+    assert [10, 10, 20, 20, 30, 30] == param_depths
+
+
+class MockModelGen(object):
+    def __init__(self):
+        self.params = []
+
+    def __call__(self, df, params, num_workers):
+        self.params.append(params)
+        return MockModel(df, params, num_workers)
+
+
+class MockModel(object):
+    def __init__(self, df, params, num_workers):
+        # Params that were passed to hyperopt
+        assert isinstance(params, dict)
+        assert 'max_depth' in params
+        assert params['num_rounds'] == 50
+        assert num_workers == 1
+
+    def eval(self, df_test, j_groups=None, feature_col='features', 
label_col='label'):
+        return 1.0
diff --git a/mjolnir/test/training/test_tuning.py 
b/mjolnir/test/training/test_tuning.py
index b8124b4..73d9057 100644
--- a/mjolnir/test/training/test_tuning.py
+++ b/mjolnir/test/training/test_tuning.py
@@ -1,4 +1,3 @@
-import hyperopt
 import mjolnir.training.tuning
 import mjolnir.training.xgboost
 from pyspark.sql import functions as F
@@ -58,68 +57,3 @@
         num_folds=2, num_fold_partitions=1, num_cv_jobs=1, num_workers=1)
     # one score for each fold
     assert len(scores) == 2
-
-
-def test_hyperopt(df_train):
-    "Not an amazing test...basically sees if the happy path doesnt blow up"
-    space = {
-        'num_rounds': 50,
-        'max_depth': hyperopt.hp.quniform('max_depth', 1, 20, 1)
-    }
-
-    # mostly hyperopt just calls cross_validate, of which the integration with
-    # xgboost is separately tested. Instead of going all the way into xgboost
-    # mock it out w/MockModel.
-    best_params, trails = mjolnir.training.tuning.hyperopt(
-        df_train, MockModel, space, max_evals=5, num_folds=2,
-        num_fold_partitions=1, num_cv_jobs=1, num_workers=1)
-    assert isinstance(best_params, dict)
-    # num_rounds should have been unchanged
-    assert 'num_rounds' in best_params
-    assert best_params['num_rounds'] == 50
-    # should have max_evals evaluations
-    assert len(trails.trials) == 5
-
-
-def test_gridsearch(df_train):
-    space = {
-        'num_rounds': 50,
-        'max_depth': hyperopt.hp.choice('max_depth', [10, 20, 30]),
-    }
-
-    gen = MockModelGen()
-    best_params, trials = mjolnir.training.tuning.grid_search(
-        df_train, gen, space, num_folds=2,
-        num_fold_partitions=1, num_cv_jobs=1, num_workers=1)
-    assert isinstance(best_params, dict)
-    assert 'num_rounds' in best_params
-    # num rounds should be unchanged
-    assert best_params['num_rounds'] == 50
-    # should have 3 iterations for the 3 max depth's
-    assert len(trials.trials) == 3
-    param_depths = sorted([param['max_depth'] for param in gen.params])
-    # TODO: Why is this called 2x as many times as expected?
-    # For some reason the correct number of trials is still
-    # returned though.
-    assert [10, 10, 20, 20, 30, 30] == param_depths
-
-
-class MockModelGen(object):
-    def __init__(self):
-        self.params = []
-
-    def __call__(self, df, params, num_workers):
-        self.params.append(params)
-        return MockModel(df, params, num_workers)
-
-
-class MockModel(object):
-    def __init__(self, df, params, num_workers):
-        # Params that were passed to hyperopt
-        assert isinstance(params, dict)
-        assert 'max_depth' in params
-        assert params['num_rounds'] == 50
-        assert num_workers == 1
-
-    def eval(self, df_test, j_groups=None, feature_col='features', 
label_col='label'):
-        return 1.0
diff --git a/mjolnir/training/hyperopt.py b/mjolnir/training/hyperopt.py
new file mode 100644
index 0000000..518497b
--- /dev/null
+++ b/mjolnir/training/hyperopt.py
@@ -0,0 +1,136 @@
+"""
+Integration and additional features for the hyperopt library
+"""
+from __future__ import absolute_import
+import hyperopt
+import hyperopt.pyll.base
+import itertools
+import math
+import mjolnir.training.tuning
+import numpy as np
+
+
+class _GridSearchAlgo(object):
+    def __init__(self, space):
+        foo = {}
+        for k, v in space.items():
+            if not isinstance(v, hyperopt.pyll.base.Apply):
+                continue
+            literals = v.pos_args[1:]
+            if not all([isinstance(l, hyperopt.pyll.base.Literal) for l in 
literals]):
+                raise ValueError('GridSearch only works with hp.choice')
+            foo[k] = range(len(literals))
+        self.grid_keys = foo.keys()
+        self.grids = list(itertools.product(*foo.values()))
+        self.max_evals = len(self.grids)
+
+    def __call__(self, new_ids, domain, trials, seed):
+        rval = []
+        for ii, new_id in enumerate(new_ids):
+            vals = dict(zip(self.grid_keys, [[v] for v in self.grids.pop()]))
+            new_result = domain.new_result()
+            new_misc = dict(tid=new_id, cmd=domain.cmd, workdir=domain.workdir,
+                            idxs=dict(zip(self.grid_keys, [[new_id]] * 
len(vals))),
+                            vals=vals)
+            rval.extend(trials.new_trial_docs([new_id],
+                        [None], [new_result], [new_misc]))
+        return rval
+
+
+def grid_search(df, train_func, space, num_folds=5, num_fold_partitions=100,
+                num_cv_jobs=5, num_workers=5):
+    # TODO: While this tried to look simple, hyperopt is a bit odd to integrate
+    # with this directly. Perhaps implement naive gridsearch directly instead
+    # of through hyperopt.
+    algo = _GridSearchAlgo(space)
+    return minimize(df, train_func, space, algo.max_evals, algo, num_folds,
+                    num_fold_partitions, num_cv_jobs, num_workers)
+
+
+def minimize(df, train_func, space, max_evals=50, algo=hyperopt.tpe.suggest,
+             num_folds=5, num_fold_partitions=100, num_cv_jobs=5, 
num_workers=5):
+    """Perform cross validated hyperparameter optimization of train_func
+
+    Parameters
+    ----------
+    df : pyspark.sql.DataFrame
+        Features and Labels to optimize over
+    train_func : callable
+        Function to use for training individual models
+    space : dict
+        Hyperparameter space to search over.
+    max_evals : int
+        Maximum iterations of hyperparameter tuning to perform.
+    algo : callable
+        The algorithm to use with hyperopt. See docs of hyperopt.fmin for more
+        details.
+    num_folds : int
+        Number of folds to split df into for cross validation
+    num_fold_partitions : int
+        Sets the number of partitions to split with. Each partition needs
+        to be some minimum size for averages to work out to an evenly split
+        final set. (Default: 100)
+    num_cv_jobs : int
+        Number of cross validation folds to train in parallel
+    num_workers : int
+        Number of executors to use for each model training
+    cache : bool
+        True if the folds of df should be individually cached
+    unpersist : bool
+        True if the folds of df should be unpersisted when complete.
+
+    Returns
+    -------
+    best_params : dict
+        The best parameters found within space
+    trials : hyperopt.Trials
+        Information about every iteration of the search
+    """
+    def objective(params):
+        scores = mjolnir.training.tuning._cross_validate(
+            folds, train_func, params, num_cv_jobs=num_cv_jobs,
+            num_workers=num_workers)
+        # For now the only metric is NDCG, and hyperopt is a minimizer
+        # so return the negative NDCG
+        loss = [-s['test'] for s in scores]
+        true_loss = [s['train'] - s['test'] for s in scores]
+        num_failures = sum([math.isnan(s) for s in loss])
+        if num_failures > 1:
+            return {
+                'status': hyperopt.STATUS_FAIL,
+                'failure': 'Too many failures: %d' % (num_failures)
+            }
+        else:
+            loss = [s for s in loss if not math.isnan(s)]
+            true_loss = [s for s in true_loss if not math.isnan(s)]
+        return {
+            'status': hyperopt.STATUS_OK,
+            'loss': sum(loss) / float(len(loss)),
+            'loss_variance': np.var(loss),
+            'true_loss': sum(true_loss) / float(len(true_loss)),
+            'true_loss_variance': np.var(true_loss),
+        }
+
+    folds = mjolnir.training.tuning._make_folds(
+        df, num_folds=num_folds, num_workers=num_workers,
+        num_fold_partitions=num_fold_partitions, num_cv_jobs=num_cv_jobs)
+
+    for fold in folds:
+        fold['train'].cache()
+        fold['test'].cache()
+
+    try:
+        trials = hyperopt.Trials()
+        best = hyperopt.fmin(objective, space, algo=algo,
+                             max_evals=max_evals, trials=trials)
+    finally:
+        for fold in folds:
+            fold['train'].unpersist()
+            fold['test'].unpersist()
+
+    # hyperopt only returns the non-constant parameters in best. It seems
+    # more convenient to return all of them.
+    best_merged = space.copy()
+    best_merged.update(best)
+
+    return (best_merged, trials)
diff --git a/mjolnir/training/tuning.py b/mjolnir/training/tuning.py
index ef5b288..7814748 100644
--- a/mjolnir/training/tuning.py
+++ b/mjolnir/training/tuning.py
@@ -3,13 +3,8 @@
 """
 
 from collections import defaultdict
-import hyperopt as _hyperopt
-import hyperopt.pyll.base as _hyperopt_pyll_base
-import itertools
-import math
 import mjolnir.spark
 from multiprocessing.dummy import Pool
-import numpy as np
 import py4j.protocol
 from pyspark.sql import functions as F
 
@@ -267,128 +262,3 @@
     folds = _make_folds(df, num_folds, num_fold_partitions, num_cv_jobs, 
num_workers)
     return _cross_validate(folds, train_func, params, num_cv_jobs=num_cv_jobs,
                            num_workers=num_workers)
-
-
-class _GridSearchAlgo(object):
-    def __init__(self, space):
-        foo = {}
-        for k, v in space.items():
-            if not isinstance(v, _hyperopt_pyll_base.Apply):
-                continue
-            literals = v.pos_args[1:]
-            if not all([isinstance(l, _hyperopt_pyll_base.Literal) for l in 
literals]):
-                raise ValueError('GridSearch only works with hp.choice')
-            foo[k] = range(len(literals))
-        self.grid_keys = foo.keys()
-        self.grids = list(itertools.product(*foo.values()))
-        self.max_evals = len(self.grids)
-
-    def __call__(self, new_ids, domain, trials, seed):
-        rval = []
-        for ii, new_id in enumerate(new_ids):
-            vals = dict(zip(self.grid_keys, [[v] for v in self.grids.pop()]))
-            new_result = domain.new_result()
-            new_misc = dict(tid=new_id, cmd=domain.cmd, workdir=domain.workdir,
-                            idxs=dict(zip(self.grid_keys, [[new_id]] * 
len(vals))),
-                            vals=vals)
-            rval.extend(trials.new_trial_docs([new_id],
-                        [None], [new_result], [new_misc]))
-        import pprint
-        pprint.pprint(rval)
-        return rval
-
-
-def grid_search(df, train_func, space, num_folds=5, num_fold_partitions=100,
-                num_cv_jobs=5, num_workers=5):
-    # TODO: While this tried to look simple, hyperopt is a bit odd to integrate
-    # with this directly. Perhaps implement naive gridsearch directly instead
-    # of through hyperopt.
-    algo = _GridSearchAlgo(space)
-    return hyperopt(df, train_func, space, algo.max_evals, algo, num_folds,
-                    num_fold_partitions, num_cv_jobs, num_workers)
-
-
-def hyperopt(df, train_func, space, max_evals=50, algo=_hyperopt.tpe.suggest,
-             num_folds=5, num_fold_partitions=100, num_cv_jobs=5, 
num_workers=5):
-    """Perform cross validated hyperparameter optimization of train_func
-
-    Parameters
-    ----------
-    df : pyspark.sql.DataFrame
-        Features and Labels to optimize over
-    train_func : callable
-        Function to use for training individual models
-    space : dict
-        Hyperparameter space to search over.
-    max_evals : int
-        Maximum iterations of hyperparameter tuning to perform.
-    algo : callable
-        The algorithm to use with hyperopt. See docs of hyperopt.fmin for more
-        details.
-    num_folds : int
-        Number of folds to split df into for cross validation
-    num_fold_partitions : int
-        Sets the number of partitions to split with. Each partition needs
-        to be some minimum size for averages to work out to an evenly split
-        final set. (Default: 100)
-    num_cv_jobs : int
-        Number of cross validation folds to train in parallel
-    num_workers : int
-        Number of executors to use for each model training
-    cache : bool
-        True if the folds of df should be individually cached
-    unpersist : bool
-        True if the folds of df should be unpersisted when complete.
-
-    Returns
-    -------
-    best_params : dict
-        The best parameters found within space
-    trials : hyperopt.Trials
-        Information about every iteration of the search
-    """
-    def objective(params):
-        scores = _cross_validate(folds, train_func, params, 
num_cv_jobs=num_cv_jobs,
-                                 num_workers=num_workers)
-        # For now the only metric is NDCG, and hyperopt is a minimizer
-        # so return the negative NDCG
-        loss = [-s['test'] for s in scores]
-        true_loss = [s['train'] - s['test'] for s in scores]
-        num_failures = sum([math.isnan(s) for s in loss])
-        if num_failures > 1:
-            return {
-                'status': _hyperopt.STATUS_FAIL,
-                'failure': 'Too many failures: %d' % (num_failures)
-            }
-        else:
-            loss = [s for s in loss if not math.isnan(s)]
-            true_loss = [s for s in true_loss if not math.isnan(s)]
-        return {
-            'status': _hyperopt.STATUS_OK,
-            'loss': sum(loss) / float(len(loss)),
-            'loss_variance': np.var(loss),
-            'true_loss': sum(true_loss) / float(len(true_loss)),
-            'true_loss_variance': np.var(true_loss),
-        }
-
-    folds = _make_folds(df, num_folds=num_folds, num_workers=num_workers,
-                        num_fold_partitions=num_fold_partitions, 
num_cv_jobs=num_cv_jobs)
-
-    for fold in folds:
-        fold['train'].cache()
-        fold['test'].cache()
-
-    try:
-        trials = _hyperopt.Trials()
-        best = _hyperopt.fmin(objective, space, algo=algo, 
max_evals=max_evals, trials=trials)
-    finally:
-        for fold in folds:
-            fold['train'].unpersist()
-            fold['test'].unpersist()
-
-    # hyperopt only returns the non-constant parameters in best. It seems
-    # more convenient to return all of them.
-    best_merged = space.copy()
-    best_merged.update(best)
-
-    return (best_merged, trials)
diff --git a/mjolnir/training/xgboost.py b/mjolnir/training/xgboost.py
index c108e11..f64b115 100644
--- a/mjolnir/training/xgboost.py
+++ b/mjolnir/training/xgboost.py
@@ -1,6 +1,7 @@
+from __future__ import absolute_import
 import hyperopt
 import mjolnir.spark
-import mjolnir.training.tuning
+import mjolnir.training.hyperopt
 import numpy as np
 import pprint
 import pyspark.sql
@@ -442,7 +443,7 @@
     """
     def eval_space(space, max_evals):
         """Eval a space using standard hyperopt"""
-        best, trials = mjolnir.training.tuning.hyperopt(
+        best, trials = mjolnir.training.hyperopt.minimize(
             df, train, space, max_evals=max_evals,
             num_folds=num_folds, num_fold_partitions=num_fold_partitions,
             num_cv_jobs=num_cv_jobs, num_workers=num_workers)
@@ -453,7 +454,7 @@
 
     def eval_space_grid(space):
         """Eval all points in the space via a grid search"""
-        best, trials = mjolnir.training.tuning.grid_search(
+        best, trials = mjolnir.training.hyperopt.grid_search(
             df, train, space, num_folds=num_folds, 
num_fold_partitions=num_fold_partitions,
             num_cv_jobs=num_cv_jobs, num_workers=num_workers)
         for k, v in space.items():

-- 
To view, visit https://gerrit.wikimedia.org/r/361394
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: If5400b8183e31eaf0d56fe50899d196b9f5e1314
Gerrit-PatchSet: 5
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>
Gerrit-Reviewer: DCausse <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to