EBernhardson has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/403335 )
Change subject: Add lightgbm support
......................................................................
Add lightgbm support
Only support single-executor training at the moment. Distributed
training is left for another day.
Change-Id: Ia9a188ef87afc86985ac9c3e269b6665dcceca10
---
A mjolnir/training/lightgbm.py
M mjolnir/utilities/training_pipeline.py
M setup.py
3 files changed, 248 insertions(+), 6 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR
refs/changes/35/403335/1
diff --git a/mjolnir/training/lightgbm.py b/mjolnir/training/lightgbm.py
new file mode 100644
index 0000000..cbc8883
--- /dev/null
+++ b/mjolnir/training/lightgbm.py
@@ -0,0 +1,221 @@
+from __future__ import absolute_import
+import contextlib
+import functools
+import hyperopt
+import json
+import lightgbm as lgb
+import math
+import mjolnir.training.hyperopt
+from mjolnir.utils import as_local_paths
+from multiprocessing.dummy import Pool
+import numpy as np
+import pyspark
+
+
+def _overrideParamsAccordingToTaskCpus(sc, params):
+ n_cpus = int(sc.getConf().get("spark.task.cpus", "1"))
+ if 'num_threads' not in params:
+ params['num_threads'] = n_cpus
+ elif params['num_threads'] > n_cpus:
+ raise Exception(
+ "the num_threads param %d must be no larger than spark.task.cpus
(%d)" % (
+ params['num_threads'], n_cpus))
+
+
[email protected]
+def load_datasets(fold, train_matrix):
+ with as_local_paths(*fold.values) as local_paths:
+ datasets = dict(zip(fold.keys(), local_paths))
+ try:
+ yield datasets
+ finally:
+ for ds in datasets.values():
+ ds._free_handle()
+
+
+def build_distributed_boosters(rdd, params, train_matrix):
+ def build_partition(rows):
+ fold = rows.next()
+ try:
+ rows.next()
+ raise Exception("Expected single row in partition but received
more.")
+ except StopIteration:
+ pass
+
+ num_rounds = 100
+ if 'num_rounds' in params:
+ num_rounds = params['num_rounds']
+ del params['num_rounds']
+
+ # TODO: Generalize
+ with load_datasets(fold) as datasets:
+ eval_results = {}
+ gbm = lgb.train(
+ params, datasets[train_matrix],
+ num_boost_round=num_rounds,
+ valid_sets=datasets.values(), valid_names=datasets.keys(),
+ early_stopping_rounds=None, evals_result=eval_results)
+ gbm.free_dataset()
+ yield (gbm, eval_results)
+
+ return rdd.mapPartitions(build_partition).cache()
+
+
+def _coerce_params(params):
+ types = {
+ 'min_data_in_leaf': int,
+ 'num_leaves': int,
+ }
+ for k, val_type in types.items():
+ if k in params:
+ params[k] = val_type(params[k])
+
+
+def train(fold, paramOverrides, train_matrix=None):
+ sc = pyspark.SparkContext.getOrCreate()
+ params = {
+ 'boosting_type': 'gbdt',
+ 'objective': 'lambdarank',
+ 'metric': 'ndcg',
+ 'ndcg_eval_at': '1,3,5,10',
+ 'is_training_metric': True,
+ 'num_rounds': 100,
+ 'max_bin': 255,
+ 'num_leaves': 63,
+ 'learning_rate': 0.1,
+ 'feature_fraction': 1.0,
+ 'bagging_fraction': 0.9,
+ 'bagging_freq': 1,
+ 'verbose': 0,
+ }
+ params.update(paramOverrides)
+ _overrideParamsAccordingToTaskCpus(sc, params)
+ _coerce_params(params)
+
+ if (len(fold) > 1):
+ rdd = sc.parallelize(list(enumerate(fold)), 1).partitionBy(len(fold),
lambda x: x).map(lambda x: x[1])
+ raise Exception("TODO: Distributed Training")
+ else:
+ rdd = sc.parallelize(fold, 1)
+
+ if train_matrix is None:
+ train_matrix = "all" if "all" in fold else "train"
+
+ booster, metrics = build_distributed_boosters(rdd, params,
train_matrix).collect()[0]
+ return LightGBMModel(booster, metrics)
+
+
+class LightGBMSummary(object):
+ def __init__(self, metrics):
+ self._metrics = metrics
+
+ def train(self):
+ return self._metrics['train']['ndcg@10']
+
+ def test(self):
+ return self._metrics['test']['ndcg@10']
+
+
+class LightGBMModel(object):
+ def __init__(self, booster, metrics):
+ self._booster = booster
+ self.metrics = metrics
+
+ def summary(self):
+ return LightGBMSummary(self.metrics)
+
+ def dump(self, features=None):
+ # TODO: lightgbm needs features provided when creating the dataset
+ return json.dumps(self._booster.dump_model())
+
+ def saveModelAsLocalFile(self, path):
+ self._booster.save_model(path)
+
+
+def tune(folds, stats, train_matrix, num_cv_jobs=5, num_workers=5,
initial_num_trees=100, final_num_trees=500):
+ cv_pool = None
+ if num_cv_jobs > 1:
+ cv_pool = Pool(num_cv_jobs)
+
+ # Configure the trials pool large enough to keep cv_pool full
+ num_folds = len(folds)
+ num_workers = len(folds[0])
+ trials_pool_size = int(math.floor(num_cv_jobs / (num_workers * num_folds)))
+ if trials_pool_size > 1:
+ trials_pool = Pool(trials_pool_size)
+ else:
+ trials_pool = None
+
+ train_func = functools.partial(train, train_matrix=train_matrix)
+
+ def eval_space(space, max_evals):
+ max_evals = 2 # TODO: remove
+ best, trials = mjolnir.training.hyperopt.minimize(
+ folds, train_func, space, max_evals=max_evals,
+ cv_pool=cv_pool, trials_pool=trials_pool)
+ for k, v in space.items():
+ if not np.isscalar(v):
+ print 'best %s: %f' % (k, best[k])
+ return best, trials
+
+ space = {
+ 'boosting_type': 'gbdt',
+ 'objective': 'lambdarank',
+ 'metric': 'ndcg',
+ 'ndcg_eval_at': '1,3,10',
+ 'is_training_metric': True,
+ 'num_rounds': initial_num_trees,
+ 'max_bin': 255,
+ 'num_leaves': 63,
+ 'learning_rate': 0.1,
+ 'feature_fraction': 1.0,
+ 'bagging_fraction': 0.9,
+ 'bagging_freq': 1,
+ }
+ tune_spaces = [
+ ('initial', {
+ 'iterations': 5,
+ 'space': {
+ 'learning_rate': hyperopt.hp.uniform('learning_rate', 0.1,
0.4),
+ 'num_leaves': hyperopt.hp.quniform('num_leaves', 60, 150, 10),
+ 'min_data_in_leaf': hyperopt.hp.quniform('min_data_in_leaf',
25, 200, 25),
+ 'min_sum_hessian_in_leaf':
hyperopt.hp.uniform('min_sum_hessian_in_leaf', 1.0, 10.0),
+ 'feature_fraction': hyperopt.hp.uniform('feature_fraction',
0.8, 1.0),
+ 'bagging_fraction': hyperopt.hp.uniform('bagging_fraction',
0.8, 1.0),
+ }
+ }),
+ ('trees', {
+ 'iterations': 30,
+ 'condition': lambda: final_num_trees is not None and
final_num_trees != initial_num_trees,
+ 'space': {
+ 'num_rounds': final_num_trees,
+ 'learning_rate': hyperopt.hp.uniform('learning_rate', 0.01,
0.4),
+ }
+ }),
+ ]
+
+ stages = []
+ for name, stage_params in tune_spaces:
+ if 'condition' in stage_params and not stage_params['condition']():
+ continue
+ tune_space = stage_params['space']
+ for name, param in tune_space.items():
+ space[name] = param
+ best, trials = eval_space(space, stage_params['iterations'])
+ for name, param in tune_space.items():
+ space[name] = best[name]
+ stages.append((name, trials))
+
+ trials_final = stages[-1][1]
+ best_trial = np.argmin(trials_final.losses())
+ loss = trials_final.losses()[best_trial]
+ true_loss = trials_final.results[best_trial].get('true_loss')
+
+ return {
+ 'trials': dict(stages),
+ 'params': space,
+ 'metrics': {
+ 'cv-test': -loss,
+ 'cv-train': -loss + true_loss
+ }
+ }
diff --git a/mjolnir/utilities/training_pipeline.py
b/mjolnir/utilities/training_pipeline.py
index d8a6f23..861534a 100644
--- a/mjolnir/utilities/training_pipeline.py
+++ b/mjolnir/utilities/training_pipeline.py
@@ -15,6 +15,7 @@
import json
import logging
import mjolnir.feature_engineering
+import mjolnir.training.lightgbm
import mjolnir.training.xgboost
from mjolnir.utils import hdfs_open_read
import os
@@ -24,7 +25,20 @@
import sys
-def run_pipeline(sc, sqlContext, input_dir, output_dir, wikis,
initial_num_trees, final_num_trees, num_cv_jobs):
+ALGO_CHOICES = {
+ "xgboost": {
+ 'extension': ".xgb",
+ 'module': mjolnir.training.xgboost
+ },
+ "lightgbm": {
+ 'extension': ".lgb",
+ 'module': mjolnir.training.lightgbm
+ },
+}
+
+
+def run_pipeline(sc, sqlContext, input_dir, output_dir, wikis,
initial_num_trees, final_num_trees,
+ num_cv_jobs, algo):
with hdfs_open_read(os.path.join(input_dir, 'stats.json')) as f:
stats = json.loads(f.read())
@@ -48,7 +62,10 @@
num_cv_jobs = num_folds
# Add extension matching training type
- extension = ".xgb"
+ algo_choice = ALGO_CHOICES[algo]
+ extension = algo_choice['extension']
+ tune_func = algo_choice['module'].tune
+ train_func = algo_choice['module'].train
# Add file extensions to all the folds
folds = wiki_stats['folds']
@@ -63,7 +80,7 @@
for name, path in partition.items():
partition[name] = path + extension
- tune_results = mjolnir.training.xgboost.tune(
+ tune_results = tune_func(
folds, wiki_stats['stats'],
num_cv_jobs=num_cv_jobs,
train_matrix="train",
@@ -85,7 +102,7 @@
print 'Best parameters:'
for param, value in best_params.items():
print '\t%20s: %s' % (param, value)
- model = mjolnir.training.xgboost.train(
+ model = train_func(
all_paths, best_params, train_matrix="all")
tune_results['metrics'] = {
@@ -109,13 +126,13 @@
# Generate a feature map so xgboost can include feature names in the
dump.
# The final `q` indicates all features are quantitative values
(floats).
features = wiki_stats['stats']['features']
- json_model_output = os.path.join(output_dir, 'model_%s.json' % (wiki))
+ json_model_output = os.path.join(output_dir, 'model_%s%s.json' %
(wiki, extension))
with open(json_model_output, 'wb') as f:
f.write(model.dump(features))
print 'Wrote xgboost json model to %s' % (json_model_output)
# Write out the xgboost binary format as well, so it can be re-loaded
# and evaluated
- model_output = os.path.join(output_dir, 'model_%s.xgb' % (wiki))
+ model_output = os.path.join(output_dir, 'model_%s%s' % (wiki,
extension))
model.saveModelAsLocalFile(model_output)
print 'Wrote xgboost binary model to %s' % (model_output)
print ''
@@ -124,6 +141,9 @@
def parse_arguments(argv):
parser = argparse.ArgumentParser(description='Train XGBoost ranking
models')
parser.add_argument(
+ '-a', '--algo', dest='algo', choices=ALGO_CHOICES.keys(),
default="xgboost",
+ help='Choose the training algorithm to use')
+ parser.add_argument(
'-i', '--input', dest='input_dir', type=str, required=True,
help='Input path, prefixed with hdfs://, to dataframe with labels and
features')
parser.add_argument(
diff --git a/setup.py b/setup.py
index 9bd27e4..f9d9397 100644
--- a/setup.py
+++ b/setup.py
@@ -9,6 +9,7 @@
'kafka',
'pyyaml',
'hyperopt',
+ 'lightgbm',
# python xgboost is only used for building
# binary datasets. Primary usage is from jvm.
'xgboost',
--
To view, visit https://gerrit.wikimedia.org/r/403335
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia9a188ef87afc86985ac9c3e269b6665dcceca10
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits