EBernhardson has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/403333 )
Change subject: Simplify hyperparameter tuning
......................................................................
Simplify hyperparameter tuning
I tested letting all the tuning happen at once instead of
the iterative approach we were using, it went quicker and
gave comparable results. This will also make it easier to
add in lightgbm as an alternate training algo.
Also removed use_external_memory parameter from xgboost. This
is specialized and won't be necessary anymore after an
upcoming refactor for file based training.
Change-Id: I8cc4ee504d0e49bc61ffc5d2781e131fabe4372c
---
M example_train.yaml
A mjolnir/pruning.py
A mjolnir/scan_es.py
M mjolnir/test/fixtures/load_config/example_train.expect
M mjolnir/test/training/test_hyperopt.py
M mjolnir/test/training/test_tuning.py
M mjolnir/training/hyperopt.py
M mjolnir/training/tuning.py
M mjolnir/training/xgboost.py
M mjolnir/utilities/training_pipeline.py
10 files changed, 306 insertions(+), 330 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR
refs/changes/33/403333/1
diff --git a/example_train.yaml b/example_train.yaml
index 183ea6e..31ce45c 100644
--- a/example_train.yaml
+++ b/example_train.yaml
@@ -138,7 +138,6 @@
cv-jobs: 22
folds: 3
final-trees: 100
- use-external-memory: yes
medium:
# 4M to 12M observations per executor.
diff --git a/mjolnir/pruning.py b/mjolnir/pruning.py
new file mode 100644
index 0000000..c2e78f9
--- /dev/null
+++ b/mjolnir/pruning.py
@@ -0,0 +1,134 @@
+from __future__ import absolute_import
+import json
+import math
+from pyspark.sql import functions as F
+from pyspark.sql.types import FloatType, StructField, StructType
+
+
+class Split(object):
+ def __init__(self, left, right, feature, threshold):
+ self.left = left
+ self.right = right
+ self.feature = feature
+ self.threshold = threshold
+
+ def isLeaf(self):
+ return False
+
+ def eval(self, features):
+ n = self
+ while not n.isLeaf():
+ if n.threshold > features[n.feature]:
+ n = n.left
+ else:
+ n = n.right
+ return n.output
+
+
+class Leaf(object):
+ def __init__(self, output):
+ self.output = output
+
+ def isLeaf(self):
+ return True
+
+
+def _parse_node(json_node):
+ if 'leaf' in json_node:
+ return Leaf(json_node['leaf'])
+ else:
+ left = _parse_node(json_node['children'][0])
+ right = _parse_node(json_node['children'][1])
+ return Split(left, right, json_node['split'],
json_node['split_condition'])
+
+
+def parse_xgboost(json_tree):
+ return [_parse_node(tree) for tree in json.loads(json_tree)]
+
+
+def ndcg_at_k(k, predicted, actual):
+ idcg = sum([((1 << label) - 1) / math.log(i + 2.0, 2) for i, label in
enumerate(actual[:k])])
+ if idcg == 0:
+ return 0.
+ else:
+ dcg = sum([((1 << label) - 1) / math.log(i + 2.0, 2) for i, label in
enumerate(predicted[:k])])
+ return dcg / idcg
+
+
+# Horrible name ... it returns the ndcg for each removed tree
+def gen_per_tree_ndcg(tree_cols, removed_trees, label_col, k=10):
+ def f(rows):
+ # Remove trees from the sum
+ cur_sum = [reduce(lambda acc, tree: acc - row[tree], removed_trees,
row.sum) for row in rows]
+ data = zip(rows, cur_sum)
+
+ # TODO: actual could be pre-calculated? Actually full idcg could be
pre-calculated
+ actual = [x[0][label_col] for x in sorted(data, key=lambda x:
x[0][label_col], reverse=True)]
+ # baseline ndcg
+ predicted = [x[0][label_col] for x in sorted(data, key=lambda x: x[1],
reverse=True)]
+ res = [ndcg_at_k(k, predicted, actual)]
+ # Per-tree ndcgs
+ for tree_pred in tree_cols:
+ predicted = [x[0][label_col] for x in sorted(data, key=lambda x:
x[1] - x[0][tree_pred], reverse=True)]
+ res.append(ndcg_at_k(k, predicted, actual))
+ return res
+ fields = [StructField(name, FloatType()) for name in ['orig'] + tree_cols]
+ return F.udf(f, StructType(fields))
+
+
+def gen_eval_tree_udf(bc_trees):
+ def f(tree_id, features):
+ return bc_trees.value[tree_id].eval(features)
+ return F.udf(f, FloatType())
+
+
+def prune(df, trees, feature_col='features', label_col='label',
group_cols=['wikiid', 'query']):
+ # Calculate per-tree scores
+ eval_tree_udf = gen_eval_tree_udf(df._sc.broadcast(trees))
+ cols = [eval_tree_udf(F.lit(i), feature_col).alias('tree_%d' % (i)) for i
in range(len(trees))]
+ tree_cols = ['tree_%d' % (i) for i in range(len(trees))]
+
+ # We should iterate until it gets worse or we hit some desired # of trees
+ df_w_scores = (
+ df
+ .select(feature_col, label_col,
F.concat(*group_cols).alias('group_id'))
+ # Does the above select shrink the data size for this calculation? or
would
+ # spark manage that anyways?
+ .repartition(200)
+ .select('group_id', label_col, *cols)
+ .withColumn('sum', reduce(lambda x, y: x + F.col(y), tree_cols,
F.lit(0)))
+ .groupBy('group_id')
+ # This grouping makes it impossible to drop fields as we go ...
+ .agg(F.collect_list(F.struct('sum', 'label',
*tree_cols)).alias('pages'))
+ .drop('group_id')
+ .cache())
+
+ try:
+ removed_trees = set()
+ results = []
+ while len(removed_trees) < len(trees):
+ print 'Remaining steps: %d' % (len(trees) - len(removed_trees))
+ if removed_trees:
+ remaining_tree_cols = [x for x in tree_cols if x not in
removed_trees]
+ else:
+ remaining_tree_cols = tree_cols
+
+ per_tree_ndcg_udf = gen_per_tree_ndcg(remaining_tree_cols,
removed_trees, label_col)
+
+ scores = (
+ df_w_scores
+ .select(per_tree_ndcg_udf('pages').alias('per_tree_scores'))
+ # Average together each query group
+ .agg(*[F.avg('per_tree_scores.%s' %
(tree_col)).alias('%s_ndcg' % (tree_col))
+ for tree_col in ['orig'] + remaining_tree_cols])
+ .collect())[0]
+
+ # Record score before any mucking around
+ if len(results) == 0:
+ results.append(('original', scores.orig_ndcg))
+ worst_tree, best_score = sorted(zip(remaining_tree_cols,
scores[1:]), key=lambda x: x[1], reverse=True)[0]
+ removed_trees.add(worst_tree)
+ results.append((worst_tree, best_score))
+ return results
+ finally:
+ df_w_scores.unpersist()
diff --git a/mjolnir/scan_es.py b/mjolnir/scan_es.py
new file mode 100644
index 0000000..67f22cc
--- /dev/null
+++ b/mjolnir/scan_es.py
@@ -0,0 +1,81 @@
+from __future__ import absolute_import
+import json
+import mjolnir.spark
+import numpy as np
+from pyspark.ml.linalg import Vectors, VectorUDT
+import pyspark.sql.types
+from pyspark.sql import functions as F
+import random
+import requests
+
+
+def read_fields(sc, n_slices, fields, size=500):
+ def read_slice(id):
+ hosts = ['elastic%d.codfw.wmnet' % i for i in range(2001, 2037)]
+ host = random.choice(hosts)
+ url = 'http://%s:9200/enwiki_content/page/_search?scroll=1m' % (host)
+ scroll_url = 'http://%s:9200/_search/scroll' % (host)
+ query = {
+ "query": {
+ "match_all": {},
+ },
+ "sort": ["_doc"],
+ "size": size,
+ "_source": fields,
+ }
+ if n_slices > 1:
+ query['slice'] = {
+ "id": id,
+ "max": n_slices
+ }
+ session = requests.Session()
+ res = session.get(url, data=json.dumps(query)).json()
+ while len(res['hits']['hits']) > 0:
+ for hit in res['hits']['hits']:
+ yield [hit['_id']] + [hit['_source'][field] if field in
hit['_source'] else [] for field in fields]
+ res = session.get(scroll_url, data=json.dumps({
+ "scroll": "1m",
+ "scroll_id": res['_scroll_id'],
+ })).json()
+
+ fields_schema = [
+ pyspark.sql.types.StructField("page_id",
pyspark.sql.types.StringType())
+ ]
+ for field in fields:
+ fields_schema.append(pyspark.sql.types.StructField(
+ field,
pyspark.sql.types.ArrayType(pyspark.sql.types.StringType())))
+
+ return sc.parallelize(range(n_slices), n_slices).flatMap(read_slice).toDF(
+ pyspark.sql.types.StructType(fields_schema))
+
+
+def take_imporant(df_feat, df_es, field, min_count=1000):
+ rows = (
+ df_es
+ .select(F.explode(field).alias(field))
+ .groupBy(field)
+ .agg(F.count(F.lit(1)).alias('count'))
+ .where(F.col('count') > min_count)
+ .collect())
+ row_map = {row[field]: i for i, row in enumerate(rows)}
+
+ df_es_feat = (
+ df_es
+ .rdd.map(lambda row: (
+ row.page_id,
+ Vectors.sparse(
+ len(row_map),
+ [[row_map[item], True] for item in row[field] if item in
row_map])
+ ))
+ .toDF(['hit_page_id', 'es_features']))
+
+ merge_feats = F.udf(lambda a, b: Vectors.dense(np.append(a.toArray(),
b.toArray())), VectorUDT())
+
+ return (
+ df_feat
+ .join(df_es_feat, how='inner', on=['hit_page_id'])
+ .withColumn('features', merge_feats('features', 'es_features'))
+ .drop('es_features')
+ .withColumn('features', mjolnir.spark.add_meta(df_feat._sc,
F.col('features'), {
+ 'features': df_feat.schema['features'].metadata['features'] +
[row[field] for row in rows],
+ })))
diff --git a/mjolnir/test/fixtures/load_config/example_train.expect
b/mjolnir/test/fixtures/load_config/example_train.expect
index 217b6cc..c56d132 100644
--- a/mjolnir/test/fixtures/load_config/example_train.expect
+++ b/mjolnir/test/fixtures/load_config/example_train.expect
@@ -243,7 +243,6 @@
folds: '3'
input: hdfs://analytics-hadoop/user/pytest/mjolnir/marker
output: /home/pytest/training_size/marker_large
- use-external-memory: 'True'
workers: '3'
environment:
HOME: /home/pytest
diff --git a/mjolnir/test/training/test_hyperopt.py
b/mjolnir/test/training/test_hyperopt.py
index cd5d712..6ef6f8a 100644
--- a/mjolnir/test/training/test_hyperopt.py
+++ b/mjolnir/test/training/test_hyperopt.py
@@ -41,38 +41,6 @@
assert len(trails.trials) == 5
-def test_gridsearch(df_train):
- space = {
- 'num_rounds': 50,
- 'max_depth': hyperopt.hp.choice('max_depth', [10, 20, 30]),
- }
-
- gen = MockModelGen()
- best_params, trials = mjolnir.training.hyperopt.grid_search(
- df_train, gen, space, num_folds=2,
- num_workers=1)
- assert isinstance(best_params, dict)
- assert 'num_rounds' in best_params
- # num rounds should be unchanged
- assert best_params['num_rounds'] == 50
- # should have 3 iterations for the 3 max depth's
- assert len(trials.trials) == 3
- param_depths = sorted([param['max_depth'] for param in gen.params])
- # TODO: Why is this called 2x as many times as expected?
- # For some reason the correct number of trials is still
- # returned though.
- assert [10, 10, 20, 20, 30, 30] == param_depths
-
-
-class MockModelGen(object):
- def __init__(self):
- self.params = []
-
- def __call__(self, df, params, num_workers):
- self.params.append(params)
- return MockModel(df, params, num_workers)
-
-
class MockModel(object):
def __init__(self, df, params, num_workers):
# Params that were passed to hyperopt
diff --git a/mjolnir/test/training/test_tuning.py
b/mjolnir/test/training/test_tuning.py
index 13d8549..eaa35b6 100644
--- a/mjolnir/test/training/test_tuning.py
+++ b/mjolnir/test/training/test_tuning.py
@@ -16,7 +16,7 @@
.select(F.lit('foowiki').alias('wikiid'),
(F.col('id')/100).cast('int').alias('norm_query_id')))
- with_folds = mjolnir.training.tuning.split(df, (0.8, 0.2),
num_partitions=4).collect()
+ with_folds = mjolnir.training.tuning.split(df, (0.8, 0.2)).collect()
fold_0 = [row for row in with_folds if row.fold == 0]
fold_1 = [row for row in with_folds if row.fold == 1]
diff --git a/mjolnir/training/hyperopt.py b/mjolnir/training/hyperopt.py
index bf2d752..b4001eb 100644
--- a/mjolnir/training/hyperopt.py
+++ b/mjolnir/training/hyperopt.py
@@ -5,7 +5,6 @@
import hyperopt
import hyperopt.pyll.base
from hyperopt.utils import coarse_utcnow
-import itertools
import math
import mjolnir.training.tuning
import numpy as np
@@ -79,43 +78,6 @@
except KeyError:
pass
return state
-
-
-class _GridSearchAlgo(object):
- def __init__(self, space):
- foo = {}
- for k, v in space.items():
- if not isinstance(v, hyperopt.pyll.base.Apply):
- continue
- literals = v.pos_args[1:]
- if not all([isinstance(l, hyperopt.pyll.base.Literal) for l in
literals]):
- raise ValueError('GridSearch only works with hp.choice')
- foo[k] = range(len(literals))
- self.grid_keys = foo.keys()
- self.grids = list(itertools.product(*foo.values()))
- self.max_evals = len(self.grids)
-
- def __call__(self, new_ids, domain, trials, seed):
- rval = []
- for ii, new_id in enumerate(new_ids):
- vals = dict(zip(self.grid_keys, [[v] for v in self.grids.pop()]))
- new_result = domain.new_result()
- new_misc = dict(tid=new_id, cmd=domain.cmd, workdir=domain.workdir,
- idxs=dict(zip(self.grid_keys, [[new_id]] *
len(vals))),
- vals=vals)
- rval.extend(trials.new_trial_docs([new_id],
- [None], [new_result], [new_misc]))
- return rval
-
-
-def grid_search(df, train_func, space, num_folds=5, num_workers=5,
- cv_pool=None, trials_pool=None):
- # TODO: While this tried to look simple, hyperopt is a bit odd to integrate
- # with this directly. Perhaps implement naive gridsearch directly instead
- # of through hyperopt.
- algo = _GridSearchAlgo(space)
- return minimize(df, train_func, space, algo.max_evals, algo, num_folds,
- num_workers, cv_pool, trials_pool)
def minimize(df, train_func, space, max_evals=50, algo=hyperopt.tpe.suggest,
diff --git a/mjolnir/training/tuning.py b/mjolnir/training/tuning.py
index f956bae..d76fd1b 100644
--- a/mjolnir/training/tuning.py
+++ b/mjolnir/training/tuning.py
@@ -8,7 +8,7 @@
from pyspark.sql import functions as F
-def split(df, splits, output_column='fold', num_partitions=100):
+def split(df, splits, output_column='fold'):
"""Assign splits to a dataframe of search results
Individual hits from the same normalized query are not independent,
@@ -31,10 +31,6 @@
into.
output_column : str, optional
Name of the new column indicating the split
- num_partitions : int, optional
- Sets the number of partitions to split with. Each partition needs
- to be some minimum size for averages to work out to an evenly split
- final set. (Default: 100)
Returns
-------
@@ -88,7 +84,7 @@
return df.join(df_splits, how='inner', on=['wikiid', 'norm_query_id'])
-def group_k_fold(df, num_folds, num_partitions=100, output_column='fold'):
+def group_k_fold(df, num_folds, output_column='fold'):
"""
Generates group k-fold splits. The fold a row belongs to is
assigned to the column identified by the output_column parameter.
@@ -97,16 +93,13 @@
----------
df : pyspark.sql.DataFrame
num_folds : int
- test_folds : int, optional
- vali_folds : int, optional
- num_partitions : int, optional
Yields
------
dict
"""
return (
- split(df, [1. / num_folds] * num_folds, output_column, num_partitions)
+ split(df, [1. / num_folds] * num_folds, output_column)
.withColumn(output_column, mjolnir.spark.add_meta(df._sc,
F.col(output_column), {
'num_folds': num_folds,
})))
diff --git a/mjolnir/training/xgboost.py b/mjolnir/training/xgboost.py
index 03e8599..2e30a36 100644
--- a/mjolnir/training/xgboost.py
+++ b/mjolnir/training/xgboost.py
@@ -5,11 +5,9 @@
import mjolnir.training.hyperopt
from multiprocessing.dummy import Pool
import numpy as np
-import pprint
import pyspark.sql
from pyspark.sql import functions as F
import tempfile
-import scipy.sparse
# Example Command line:
# PYSPARK_PYTHON=venv/bin/python SPARK_CONF_DIR=/etc/spark/conf
~/spark-2.1.0-bin-hadoop2.6/bin/pyspark \
@@ -108,7 +106,7 @@
return retval
-def train(df, params, num_workers=None, use_external_memory=False):
+def train(df, params, num_workers=None):
"""Train a single xgboost ranking model.
df : pyspark.sql.DataFrame
@@ -168,7 +166,6 @@
try:
return XGBoostModel.trainWithDataFrame(df_grouped, params, num_rounds,
num_workers,
feature_col='features',
-
use_external_memory=use_external_memory,
label_col='label')
finally:
if unpersist:
@@ -181,7 +178,7 @@
@staticmethod
def trainWithDataFrame(trainingData, params, num_rounds, num_workers,
objective=None,
- eval_metric=None, use_external_memory=False,
missing=float('nan'),
+ eval_metric=None, missing=float('nan'),
feature_col='features', label_col='label'):
"""Wrapper around scala XGBoostModel.trainWithRDD
@@ -200,11 +197,6 @@
eval_metric : py4j.java_gateway.JavaObject, optional
Allows providing a custom evaluation metric implementation.
(Default: None)
- use_external_memory : bool, optional
- indicate whether to use external memory cache, by setting this flag
- as true, the user may save the RAM cost for running XGBoost within
- spark. Essentially this puts the data on local disk, and takes
- advantage of the kernel disk cache(maybe?). (Default: False)
missing : float, optional
The value representing the missing value in the dataset. features
with
this value will be removed and the vectors treated as sparse.
(Default: nan)
@@ -227,8 +219,8 @@
trainingData._jdf, feature_col, label_col)
j_xgb_model =
sc._jvm.ml.dmlc.xgboost4j.scala.spark.XGBoost.trainWithRDD(
- j_rdd, j_params, num_rounds, num_workers, objective, eval_metric,
- use_external_memory, missing)
+ j_rdd, j_params, num_rounds, num_workers,
+ objective, eval_metric, False, missing)
return XGBoostModel(j_xgb_model)
def transform(self, df_test):
@@ -249,13 +241,13 @@
j_df = self._j_xgb_model.transform(df_test._jdf)
return pyspark.sql.DataFrame(j_df, df_test.sql_ctx)
- def dump(self, feature_map=None, with_stats=False, format="json"):
+ def dump(self, features=None, with_stats=False, format="json"):
"""Dumps the xgboost model
Parameters
----------
- featureMap : str or None, optional
- Formatted as per xgboost documentation for featmap.txt.
+ features : list of str or None, optional
+ list of features names, or None for no feature names in dump.
(Default: None)
withStats : bool, optional
Should various additional statistics be included? These are not
@@ -270,13 +262,14 @@
"""
# Annoyingly the xgboost api doesn't take the feature map as a string,
but
# instead as a filename. Write the feature map out to a file if
necessary.
- if feature_map is None:
- fmap_path = None
- else:
+ if features:
+ feat_map = "\n".join(["%d %s q" % (i, fname) for i, fname in
enumerate(features)])
fmap_f = tempfile.NamedTemporaryFile()
- fmap_f.write(feature_map)
+ fmap_f.write(feat_map)
fmap_f.flush()
fmap_path = fmap_f.name
+ else:
+ fmap_path = None
# returns an Array[String] from scala, where each element of the array
# is a json string representing a single tree.
j_dump = self._j_xgb_model.booster().getModelDump(fmap_path,
with_stats, format)
@@ -338,70 +331,6 @@
j_xgb_booster = sc._jvm.ml.dmlc.xgboost4j.scala.XGBoost.loadModel(path)
j_xgb_model =
sc._jvm.ml.dmlc.xgboost4j.scala.spark.XGBoostRegressionModel(j_xgb_booster)
return XGBoostModel(j_xgb_model)
-
-
-# from https://gist.github.com/hernamesbarbara/7238736
-def _loess_predict(X, y_tr, X_pred, bandwidth):
- X_tr = np.column_stack((np.ones_like(X), X))
- X_te = np.column_stack((np.ones_like(X_pred), X_pred))
- y_te = []
- for x in X_te:
- ws = np.exp(-np.sum((X_tr - x)**2, axis=1) / (2 * bandwidth**2))
- W = scipy.sparse.dia_matrix((ws, 0), shape=(X_tr.shape[0],) * 2)
- theta =
np.linalg.pinv(X_tr.T.dot(W.dot(X_tr))).dot(X_tr.T.dot(W.dot(y_tr)))
- y_te.append(np.dot(x, theta))
- return np.array(y_te)
-
-
-def _estimate_best_eta(trials, source_etas, length=1e4):
- """Estimate the best eta from a small sample of trials
-
- The final stage of training can take quite some time, at 10 to 15 minutes
- or more per model evaluated. The relationship between eta and ndcg@10 along
- with eta and true loss is fairly stable, so instead of searching the whole
space
- evaluate a few evenly spaced points and then try to fit a line to determine
- the best eta.
-
- Best eta is chosen by finding where the derivative of ndcg@10 vs true loss
first
- transitions from >1 to <=1
-
- Parameters
- ----------
- trials : hyperopt.Trials
- Trials object that was used to tune only eta
- source_etas : list of float
- For some reason hyperopt.hp.choice doesn't include the actual value in
- the trials object, only the index. The results as indexed into this
- list to get the actual eta tested.
- length : int
- Number of eta points to estimate
-
- Returns
- -------
- float
- Estimated best ETA
- """
-
- ndcg10 = np.asarray([-l for l in trials.losses()])
- true_loss = np.asarray([r.get('true_loss') for r in trials.results])
- eta = np.asarray([source_etas[v] for v in trials.vals['eta']])
-
- # Range of predictions we want to make
- eta_pred = np.arange(np.min(eta), np.max(eta), (np.max(eta) - np.min(eta))
/ length)
- # Predicted ndcg@10 values for eta_pred
- # TODO: Can 0.02 not be magic? Was chosen by hand on one sample
- ndcg10_pred = _loess_predict(eta, ndcg10, eta_pred, 0.02)
- # Predicted true loss for eta_pred
- # TODO: Can 0.03 not be magic? Was chosen by hand on one sample
- true_loss_pred = _loess_predict(eta, true_loss, eta_pred, 0.03)
-
- # Find the first point where derivative transitions from > 1 to <= 1.
- # TODO: What if the sample is from too narrow a range, and doesn't capture
this?
- derivative = np.diff(ndcg10_pred) / np.diff(true_loss_pred)
- idx = (np.abs(derivative-1)).argmin()
-
- # eta for point closest to transition of derivative from >1 to <=1>
- return eta_pred[idx]
def tune(df, num_folds=5, num_cv_jobs=5, num_workers=5, initial_num_trees=100,
final_num_trees=500):
@@ -469,16 +398,6 @@
print 'best %s: %f' % (k, best[k])
return best, trials
- def eval_space_grid(space):
- """Eval all points in the space via a grid search"""
- best, trials = mjolnir.training.hyperopt.grid_search(
- df, train, space, num_folds=num_folds,
- num_workers=num_workers, cv_pool=cv_pool, trials_pool=trials_pool)
- for k, v in space.items():
- if not np.isscalar(v):
- print 'best %s: %f' % (k, best[k])
- return best, trials
-
num_obs = df.count()
if num_obs > 8000000:
@@ -491,36 +410,52 @@
dataset_size = 'small'
# Setup different tuning profiles for different sizes of datasets.
- tune_space = {
- 'xlarge': {
- # This is intentionally a numpy space for grid search, as xlarge
- # datasets get slightly different handling here.
- 'eta': np.linspace(0.3, 0.8, 30),
- # Have seen values of 7 and 10 as best on roughly same size
- # datasets from different wikis. It really just depends.
- 'max_depth': hyperopt.hp.quniform('max_depth', 6, 11, 1),
- 'min_child_weight': hyperopt.hp.qloguniform(
- 'min_child_weight', np.log(10), np.log(500), 10),
- },
- 'large': {
- 'eta': hyperopt.hp.uniform('eta', 0.3, 0.6),
- 'max_depth': hyperopt.hp.quniform('max_depth', 5, 9, 1),
- 'min_child_weight': hyperopt.hp.qloguniform(
- 'min_child_weight', np.log(10), np.log(300), 10),
- },
- 'med': {
- 'eta': hyperopt.hp.uniform('eta', 0.1, 0.6),
- 'max_depth': hyperopt.hp.quniform('max_depth', 4, 7, 1),
- 'min_child_weight': hyperopt.hp.qloguniform(
- 'min_child_weight', np.log(10), np.log(300), 10),
- },
- 'small': {
- 'eta': hyperopt.hp.uniform('eta', 0.1, 0.4),
- 'max_depth': hyperopt.hp.quniform('max_depth', 3, 6, 1),
- 'min_child_weight': hyperopt.hp.qloguniform(
- 'min_child_weight', np.log(10), np.log(100), 10),
- }
- }
+ tune_spaces = [
+ ('initial', {
+ 'iterations': 150,
+ 'space': {
+ 'xlarge': {
+ 'eta': hyperopt.hp.uniform('eta', 0.3, 0.8),
+ # Have seen values of 7 and 10 as best on roughly same size
+ # datasets from different wikis. It really just depends.
+ 'max_depth': hyperopt.hp.quniform('max_depth', 6, 11, 1),
+ 'min_child_weight': hyperopt.hp.qloguniform(
+ 'min_child_weight', np.log(10), np.log(500), 10),
+ # % of features to use for each tree. helps prevent overfit
+ 'colsample_bytree':
hyperopt.hp.quniform('colsample_bytree', 0.8, 1, .01),
+ },
+ 'large': {
+ 'eta': hyperopt.hp.uniform('eta', 0.3, 0.6),
+ 'max_depth': hyperopt.hp.quniform('max_depth', 5, 9, 1),
+ 'min_child_weight': hyperopt.hp.qloguniform(
+ 'min_child_weight', np.log(10), np.log(300), 10),
+ 'colsample_bytree':
hyperopt.hp.quniform('colsample_bytree', 0.8, 1, .01),
+ },
+ 'med': {
+ 'eta': hyperopt.hp.uniform('eta', 0.1, 0.6),
+ 'max_depth': hyperopt.hp.quniform('max_depth', 4, 7, 1),
+ 'min_child_weight': hyperopt.hp.qloguniform(
+ 'min_child_weight', np.log(10), np.log(300), 10),
+ 'colsample_bytree':
hyperopt.hp.quniform('colsample_bytree', 0.8, 1, .01),
+ },
+ 'small': {
+ 'eta': hyperopt.hp.uniform('eta', 0.1, 0.4),
+ 'max_depth': hyperopt.hp.quniform('max_depth', 3, 6, 1),
+ 'min_child_weight': hyperopt.hp.qloguniform(
+ 'min_child_weight', np.log(10), np.log(100), 10),
+ 'colsample_bytree':
hyperopt.hp.quniform('colsample_bytree', 0.8, 1, .01),
+ }
+ }[dataset_size]
+ }),
+ ('trees', {
+ 'iterations': 30,
+ 'condition': lambda: final_num_trees is not None and
final_num_trees != initial_num_trees,
+ 'space': {
+ 'num_rounds': final_num_trees,
+ 'eta': hyperopt.hp.uniform('eta', 0.1, 0.4),
+ }
+ })
+ ]
# Baseline parameters to start with. Roughly tuned by what has worked in
# the past. These vary though depending on number of training samples.
These
@@ -531,117 +466,37 @@
'eval_metric': 'ndcg@10',
'num_rounds': initial_num_trees,
'min_child_weight': 200,
- 'max_depth': 4,
+ 'max_depth': {
+ 'xlarge': 7,
+ 'large': 6,
+ 'med': 5,
+ 'small': 4,
+ }[dataset_size],
'gamma': 0,
'subsample': 1.0,
'colsample_bytree': 0.8,
}
- # Overrides for the first round of training when tuning eta.
- space_overrides = {
- 'xlarge': {
- 'max_depth': 7,
- },
- 'large': {
- 'max_depth': 6,
- },
- 'med': {
- 'max_depth': 5,
- },
- 'small': {}
- }
+ stages = []
+ for name, stage_params in tune_spaces:
+ if 'condition' in stage_params and not stage_params['condition']():
+ continue
+ tune_space = stage_params['space']
+ for name, dist in tune_space.items():
+ space[name] = dist
+ best, trials = eval_space(space, stage_params['iterations'])
+ for name in tune_space.keys():
+ space[name] = best[name]
+ stages.append((name, trials))
- for k, v in space_overrides[dataset_size].items():
- space[k] = v
-
- # Find an eta that gives good results with only 100 trees. This is done
- # so most of the tuning is relatively quick. A final step will re-tune
- # eta with more trees.
- # This estimate only seems to work well for xlarge datasets. On smaller
- # datasets the shape of the graph is much less consistent and doesn't
- # match the builtin expectation of an L shaped graph.
- if dataset_size == 'xlarge':
- etas = tune_space[dataset_size]['eta']
- space['eta'] = hyperopt.hp.choice('eta', etas)
- best_eta, trials_eta = eval_space_grid(space)
- space['eta'] = _estimate_best_eta(trials_eta, etas)
- else:
- space['eta'] = tune_space[dataset_size]['eta']
- best_eta, trials_eta = eval_space(space, 50)
- space['eta'] = best_eta['eta']
- pprint.pprint(space)
-
- # Determines the size of each tree. Larger trees increase model complexity
- # and make it more likely to overfit the training data. Larger trees also
- # do a better job at capturing interactions between features. Larger
training
- # sets support deeper trees. Not all trees will be this depth,
min_child_weight
- # gamma, and regularization all push back on this.
- space['max_depth'] = tune_space[dataset_size]['max_depth']
- # The minimum number of samples that must be in each leaf node. This pushes
- # back against tree depth, preventing the tree from growing if a potential
- # split applies to too few samples. ndcg@10 on the test set increases
linearly
- # with smaller min_child_weight, but true_loss also increases.
- space['min_child_weight'] = tune_space[dataset_size]['min_child_weight']
-
- # TODO: Somewhat similar to eta, as min_child_weight decreases the
- # true_loss increases. Need to figure out how to choose the max_depth that
- # provides best ndcg@10 without losing generalizability.
- best_complexity, trials_complexity = eval_space(space, 50)
- space['max_depth'] = int(best_complexity['max_depth'])
- space['min_child_weight'] = int(best_complexity['min_child_weight'])
- pprint.pprint(space)
-
- # subsample helps make the model more robust to noisy data. For each
update to
- # a tree only this % of samples are considered.
- space['subsample'] = hyperopt.hp.quniform('subsample', 0.8, 1, .01)
- # colsample also helps make the model more robust to noise. For each update
- # to a tree only this % of features are considered.
- space['colsample_bytree'] = hyperopt.hp.quniform('colsample_bytree', 0.8,
1, .01)
-
- # With a high min_child_weight subsampling of any kind gives a linear
decrease
- # in quality. But with a relatively low min_child_weight it can give some
benefits,
- # pushing back against over fitting due to small amounts of data per leaf.
- # colsample is less clear, with 0.8 and 1.0 having similar results.
- best_noise, trials_noise = eval_space(space, 50)
- space['subsample'] = best_noise['subsample']
- space['colsample_bytree'] = best_noise['colsample_bytree']
- pprint.pprint(space)
-
- # Finally increase the number of trees to our target, if it was requested.
- if final_num_trees is None or final_num_trees == initial_num_trees:
- trials_trees = None
- trials_final = trials_noise
- else:
- space['num_rounds'] = final_num_trees
- # TODO: Is 30 steps right amount? too many? too few? This generally
- # uses a large number of trees which takes 10 to 20 minutes per
evaluation
- # on large training sets.That means evaluating 15 points is 2.5 to 5
hours.
- # TODO: The appropriate space here really depends on the amount of
data and
- # the number of trees. A small wiki with 300k observations and 500
trees needs
- # to search a very different space than a large wiki with 30M
observations
- # and the same 500 trees.
- if dataset_size == 'xlarge':
- etas = np.linspace(0.2, 0.7, 30)
- space['eta'] = hyperopt.hp.choice('eta', etas)
- best_trees, trials_trees = eval_space_grid(space)
- space['eta'] = _estimate_best_eta(trials_trees, etas)
- else:
- space['eta'] = hyperopt.hp.uniform('eta', 0.01, 0.5)
- best_trees, trials_trees = eval_space(space, 30)
- space['eta'] = best_trees['eta']
- trials_final = trials_trees
- pprint.pprint(space)
-
- best_trial = np.argmin(trials_final.losses())
- loss = trials_final.losses()[best_trial]
- true_loss = trials_final.results[best_trial].get('true_loss')
+ trials = stages[-1][1]
+ best_trial = np.argmin(trials.losses())
+ loss = trials.losses()[best_trial]
+ true_loss = trials.results[best_trial].get('true_loss')
return {
'trials': {
- 'initial': trials_eta,
- 'complexity': trials_complexity,
- 'noise': trials_noise,
- 'trees': trials_trees,
+ 'initial': trials,
},
'params': space,
'metrics': {
diff --git a/mjolnir/utilities/training_pipeline.py
b/mjolnir/utilities/training_pipeline.py
index da7d641..6918768 100644
--- a/mjolnir/utilities/training_pipeline.py
+++ b/mjolnir/utilities/training_pipeline.py
@@ -4,7 +4,7 @@
To run:
PYSPARK_PYTHON=venv/bin/python spark-submit \
- --jars /path/to/mjolnir-with-dependencies.jar
+ --jars /path/to/mjolnir-with-dependencies.jar \
--artifacts 'mjolnir_venv.zip#venv' \
path/to/training_pipeline.py
"""
@@ -51,7 +51,7 @@
def run_pipeline(sc, sqlContext, input_dir, output_dir, wikis,
initial_num_trees, final_num_trees,
- num_workers, num_cv_jobs, num_folds, test_dir, zero_features,
use_external_memory):
+ num_workers, num_cv_jobs, num_folds, zero_features):
for wiki in wikis:
print 'Training wiki: %s' % (wiki)
df_hits_with_features = (
@@ -99,21 +99,11 @@
df_hits_with_features, num_workers)
best_params['groupData'] = j_groups
model = mjolnir.training.xgboost.train(
- df_grouped, best_params,
use_external_memory=use_external_memory)
+ df_grouped, best_params)
tune_results['metrics']['train'] = model.eval(df_grouped, j_groups)
df_grouped.unpersist()
print 'train-ndcg@10: %.5f' % (tune_results['metrics']['train'])
-
- if test_dir is not None:
- try:
- df_test = sqlContext.read.parquet(test_dir)
- tune_results['metrics']['test'] = model.eval(df_test)
- print 'test-ndcg@10: %.5f' % (tune_results['metrics']['test'])
- except: # noqa: E722
- # It has probably taken some time to get this far. Don't bail
- # because the user input an invalid test dir.
- logging.exception('Could not evaluate test_dir: %s' %
(test_dir))
# Save the tune results somewhere for later analysis. Use pickle
# to maintain the hyperopt.Trials objects as is. It might be nice
@@ -131,10 +121,9 @@
# Generate a feature map so xgboost can include feature names in the
dump.
# The final `q` indicates all features are quantitative values
(floats).
features =
df_hits_with_features.schema['features'].metadata['features']
- feat_map = ["%d %s q" % (i, fname) for i, fname in enumerate(features)]
json_model_output = os.path.join(output_dir, 'model_%s.json' % (wiki))
with open(json_model_output, 'wb') as f:
- f.write(model.dump("\n".join(feat_map)))
+ f.write(model.dump(features))
print 'Wrote xgboost json model to %s' % (json_model_output)
# Write out the xgboost binary format as well, so it can be re-loaded
# and evaluated
@@ -178,15 +167,9 @@
'--initial-trees', dest='initial_num_trees', default=100, type=int,
help='Number of trees to perform hyperparamter tuning with. (Default:
100)')
parser.add_argument(
- '-e', '--use-external-memory', dest='use_external_memory',
default=False,
- type=str_to_bool, help='Use external memory for feature matrix')
- parser.add_argument(
'--final-trees', dest='final_num_trees', default=None, type=int,
help='Number of trees in the final ensemble. If not provided the value
from '
+ '--initial-trees will be used. (Default: None)')
- parser.add_argument(
- '-t', '--test-path', dest='test_dir', type=str, required=False,
default=None,
- help='A holdout test set to evaluate the final model against')
parser.add_argument(
'-z', '--zero-feature', dest='zero_features', type=str, nargs='+',
help='Zero out feature in input')
@@ -218,7 +201,9 @@
del args['very_verbose']
# TODO: Set spark configuration? Some can't actually be set here though,
so best might be to set all of it
# on the command line for consistency.
- sc = SparkContext(appName="MLR: training pipeline")
+ app_name = "MLR: training pipeline xgboost"
+ app_name += ': ' + ', '.join(args['wikis'])
+ sc = SparkContext(appName=app_name)
sc.setLogLevel('WARN')
sqlContext = HiveContext(sc)
--
To view, visit https://gerrit.wikimedia.org/r/403333
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I8cc4ee504d0e49bc61ffc5d2781e131fabe4372c
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits