EBernhardson has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/403335 )

Change subject: Add lightgbm support
......................................................................

Add lightgbm support

Only support single-executor training at the moment. Distributed
training is left for another day.

Change-Id: Ia9a188ef87afc86985ac9c3e269b6665dcceca10
---
A mjolnir/training/lightgbm.py
M mjolnir/utilities/training_pipeline.py
M setup.py
3 files changed, 248 insertions(+), 6 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/search/MjoLniR 
refs/changes/35/403335/1

diff --git a/mjolnir/training/lightgbm.py b/mjolnir/training/lightgbm.py
new file mode 100644
index 0000000..cbc8883
--- /dev/null
+++ b/mjolnir/training/lightgbm.py
@@ -0,0 +1,221 @@
+from __future__ import absolute_import
+import contextlib
+import functools
+import hyperopt
+import json
+import lightgbm as lgb
+import math
+import mjolnir.training.hyperopt
+from mjolnir.utils import as_local_paths
+from multiprocessing.dummy import Pool
+import numpy as np
+import pyspark
+
+
+def _overrideParamsAccordingToTaskCpus(sc, params):
+    n_cpus = int(sc.getConf().get("spark.task.cpus", "1"))
+    if 'num_threads' not in params:
+        params['num_threads'] = n_cpus
+    elif params['num_threads'] > n_cpus:
+        raise Exception(
+            "the num_threads param %d must be no larger than spark.task.cpus 
(%d)" % (
+                params['num_threads'], n_cpus))
+
+
[email protected]
+def load_datasets(fold, train_matrix):
+    with as_local_paths(*fold.values) as local_paths:
+        datasets = dict(zip(fold.keys(), local_paths))
+        try:
+            yield datasets
+        finally:
+            for ds in datasets.values():
+                ds._free_handle()
+
+
+def build_distributed_boosters(rdd, params, train_matrix):
+    def build_partition(rows):
+        fold = rows.next()
+        try:
+            rows.next()
+            raise Exception("Expected single row in partition but received 
more.")
+        except StopIteration:
+            pass
+
+        num_rounds = 100
+        if 'num_rounds' in params:
+            num_rounds = params['num_rounds']
+            del params['num_rounds']
+
+        # TODO: Generalize
+        with load_datasets(fold) as datasets:
+            eval_results = {}
+            gbm = lgb.train(
+                params, datasets[train_matrix],
+                num_boost_round=num_rounds,
+                valid_sets=datasets.values(), valid_names=datasets.keys(),
+                early_stopping_rounds=None, evals_result=eval_results)
+            gbm.free_dataset()
+            yield (gbm, eval_results)
+
+    return rdd.mapPartitions(build_partition).cache()
+
+
+def _coerce_params(params):
+    types = {
+        'min_data_in_leaf': int,
+        'num_leaves': int,
+    }
+    for k, val_type in types.items():
+        if k in params:
+            params[k] = val_type(params[k])
+
+
+def train(fold, paramOverrides, train_matrix=None):
+    sc = pyspark.SparkContext.getOrCreate()
+    params = {
+        'boosting_type': 'gbdt',
+        'objective': 'lambdarank',
+        'metric': 'ndcg',
+        'ndcg_eval_at': '1,3,5,10',
+        'is_training_metric': True,
+        'num_rounds': 100,
+        'max_bin': 255,
+        'num_leaves': 63,
+        'learning_rate': 0.1,
+        'feature_fraction': 1.0,
+        'bagging_fraction': 0.9,
+        'bagging_freq': 1,
+        'verbose': 0,
+    }
+    params.update(paramOverrides)
+    _overrideParamsAccordingToTaskCpus(sc, params)
+    _coerce_params(params)
+
+    if (len(fold) > 1):
+        rdd = sc.parallelize(list(enumerate(fold)), 1).partitionBy(len(fold), 
lambda x: x).map(lambda x: x[1])
+        raise Exception("TODO: Distributed Training")
+    else:
+        rdd = sc.parallelize(fold, 1)
+
+    if train_matrix is None:
+        train_matrix = "all" if "all" in fold else "train"
+
+    booster, metrics = build_distributed_boosters(rdd, params, 
train_matrix).collect()[0]
+    return LightGBMModel(booster, metrics)
+
+
+class LightGBMSummary(object):
+    def __init__(self, metrics):
+        self._metrics = metrics
+
+    def train(self):
+        return self._metrics['train']['ndcg@10']
+
+    def test(self):
+        return self._metrics['test']['ndcg@10']
+
+
+class LightGBMModel(object):
+    def __init__(self, booster, metrics):
+        self._booster = booster
+        self.metrics = metrics
+
+    def summary(self):
+        return LightGBMSummary(self.metrics)
+
+    def dump(self, features=None):
+        # TODO: lightgbm needs features provided when creating the dataset
+        return json.dumps(self._booster.dump_model())
+
+    def saveModelAsLocalFile(self, path):
+        self._booster.save_model(path)
+
+
+def tune(folds, stats, train_matrix, num_cv_jobs=5, num_workers=5, 
initial_num_trees=100, final_num_trees=500):
+    cv_pool = None
+    if num_cv_jobs > 1:
+        cv_pool = Pool(num_cv_jobs)
+
+    # Configure the trials pool large enough to keep cv_pool full
+    num_folds = len(folds)
+    num_workers = len(folds[0])
+    trials_pool_size = int(math.floor(num_cv_jobs / (num_workers * num_folds)))
+    if trials_pool_size > 1:
+        trials_pool = Pool(trials_pool_size)
+    else:
+        trials_pool = None
+
+    train_func = functools.partial(train, train_matrix=train_matrix)
+
+    def eval_space(space, max_evals):
+        max_evals = 2  # TODO: remove
+        best, trials = mjolnir.training.hyperopt.minimize(
+            folds, train_func, space, max_evals=max_evals,
+            cv_pool=cv_pool, trials_pool=trials_pool)
+        for k, v in space.items():
+            if not np.isscalar(v):
+                print 'best %s: %f' % (k, best[k])
+        return best, trials
+
+    space = {
+        'boosting_type': 'gbdt',
+        'objective': 'lambdarank',
+        'metric': 'ndcg',
+        'ndcg_eval_at': '1,3,10',
+        'is_training_metric': True,
+        'num_rounds': initial_num_trees,
+        'max_bin': 255,
+        'num_leaves': 63,
+        'learning_rate': 0.1,
+        'feature_fraction': 1.0,
+        'bagging_fraction': 0.9,
+        'bagging_freq': 1,
+    }
+    tune_spaces = [
+        ('initial', {
+            'iterations': 5,
+            'space': {
+                'learning_rate': hyperopt.hp.uniform('learning_rate', 0.1, 
0.4),
+                'num_leaves': hyperopt.hp.quniform('num_leaves', 60, 150, 10),
+                'min_data_in_leaf': hyperopt.hp.quniform('min_data_in_leaf', 
25, 200, 25),
+                'min_sum_hessian_in_leaf': 
hyperopt.hp.uniform('min_sum_hessian_in_leaf', 1.0, 10.0),
+                'feature_fraction': hyperopt.hp.uniform('feature_fraction', 
0.8, 1.0),
+                'bagging_fraction': hyperopt.hp.uniform('bagging_fraction', 
0.8, 1.0),
+            }
+        }),
+        ('trees', {
+            'iterations': 30,
+            'condition': lambda: final_num_trees is not None and 
final_num_trees != initial_num_trees,
+            'space': {
+                'num_rounds': final_num_trees,
+                'learning_rate': hyperopt.hp.uniform('learning_rate', 0.01, 
0.4),
+            }
+        }),
+    ]
+
+    stages = []
+    for name, stage_params in tune_spaces:
+        if 'condition' in stage_params and not stage_params['condition']():
+            continue
+        tune_space = stage_params['space']
+        for name, param in tune_space.items():
+            space[name] = param
+        best, trials = eval_space(space, stage_params['iterations'])
+        for name, param in tune_space.items():
+            space[name] = best[name]
+        stages.append((name, trials))
+
+    trials_final = stages[-1][1]
+    best_trial = np.argmin(trials_final.losses())
+    loss = trials_final.losses()[best_trial]
+    true_loss = trials_final.results[best_trial].get('true_loss')
+
+    return {
+        'trials': dict(stages),
+        'params': space,
+        'metrics': {
+            'cv-test': -loss,
+            'cv-train': -loss + true_loss
+        }
+    }
diff --git a/mjolnir/utilities/training_pipeline.py 
b/mjolnir/utilities/training_pipeline.py
index d8a6f23..861534a 100644
--- a/mjolnir/utilities/training_pipeline.py
+++ b/mjolnir/utilities/training_pipeline.py
@@ -15,6 +15,7 @@
 import json
 import logging
 import mjolnir.feature_engineering
+import mjolnir.training.lightgbm
 import mjolnir.training.xgboost
 from mjolnir.utils import hdfs_open_read
 import os
@@ -24,7 +25,20 @@
 import sys
 
 
-def run_pipeline(sc, sqlContext, input_dir, output_dir, wikis, 
initial_num_trees, final_num_trees, num_cv_jobs):
+ALGO_CHOICES = {
+    "xgboost": {
+        'extension': ".xgb",
+        'module': mjolnir.training.xgboost
+    },
+    "lightgbm": {
+        'extension': ".lgb",
+        'module': mjolnir.training.lightgbm
+    },
+}
+
+
+def run_pipeline(sc, sqlContext, input_dir, output_dir, wikis, 
initial_num_trees, final_num_trees,
+                 num_cv_jobs, algo):
     with hdfs_open_read(os.path.join(input_dir, 'stats.json')) as f:
         stats = json.loads(f.read())
 
@@ -48,7 +62,10 @@
             num_cv_jobs = num_folds
 
         # Add extension matching training type
-        extension = ".xgb"
+        algo_choice = ALGO_CHOICES[algo]
+        extension = algo_choice['extension']
+        tune_func = algo_choice['module'].tune
+        train_func = algo_choice['module'].train
 
         # Add file extensions to all the folds
         folds = wiki_stats['folds']
@@ -63,7 +80,7 @@
             for name, path in partition.items():
                 partition[name] = path + extension
 
-        tune_results = mjolnir.training.xgboost.tune(
+        tune_results = tune_func(
             folds, wiki_stats['stats'],
             num_cv_jobs=num_cv_jobs,
             train_matrix="train",
@@ -85,7 +102,7 @@
         print 'Best parameters:'
         for param, value in best_params.items():
             print '\t%20s: %s' % (param, value)
-        model = mjolnir.training.xgboost.train(
+        model = train_func(
             all_paths, best_params, train_matrix="all")
 
         tune_results['metrics'] = {
@@ -109,13 +126,13 @@
         # Generate a feature map so xgboost can include feature names in the 
dump.
         # The final `q` indicates all features are quantitative values 
(floats).
         features = wiki_stats['stats']['features']
-        json_model_output = os.path.join(output_dir, 'model_%s.json' % (wiki))
+        json_model_output = os.path.join(output_dir, 'model_%s%s.json' % 
(wiki, extension))
         with open(json_model_output, 'wb') as f:
             f.write(model.dump(features))
             print 'Wrote xgboost json model to %s' % (json_model_output)
         # Write out the xgboost binary format as well, so it can be re-loaded
         # and evaluated
-        model_output = os.path.join(output_dir, 'model_%s.xgb' % (wiki))
+        model_output = os.path.join(output_dir, 'model_%s%s' % (wiki, 
extension))
         model.saveModelAsLocalFile(model_output)
         print 'Wrote xgboost binary model to %s' % (model_output)
         print ''
@@ -124,6 +141,9 @@
 def parse_arguments(argv):
     parser = argparse.ArgumentParser(description='Train XGBoost ranking 
models')
     parser.add_argument(
+        '-a', '--algo', dest='algo', choices=ALGO_CHOICES.keys(), 
default="xgboost",
+        help='Choose the training algorithm to use')
+    parser.add_argument(
         '-i', '--input', dest='input_dir', type=str, required=True,
         help='Input path, prefixed with hdfs://, to dataframe with labels and 
features')
     parser.add_argument(
diff --git a/setup.py b/setup.py
index 9bd27e4..f9d9397 100644
--- a/setup.py
+++ b/setup.py
@@ -9,6 +9,7 @@
     'kafka',
     'pyyaml',
     'hyperopt',
+    'lightgbm',
     # python xgboost is only used for building
     # binary datasets. Primary usage is from jvm.
     'xgboost',

-- 
To view, visit https://gerrit.wikimedia.org/r/403335
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia9a188ef87afc86985ac9c3e269b6665dcceca10
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to