EBernhardson has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/396098 )

Change subject: Specialize single-node training
......................................................................

Specialize single-node training

XGBoost has a faster training method, tree_method -> hist, which is
currently not implemented for distributed training. We actually train
quite a few models on a single node (but with many models being trained
in parallel) so it would be nice to be able to utilize this where
possible.

This is perhaps not implemented in the most optimal way if we were going
to upstream the patch, but upstreaming is unlikely as upstream does not
support training multiple models in parallel (we do through a custom
hack). Rather than refactoring existing code this mostly adds new
functions for specialized single-node training so that pulling in
upstream changes will be as pain free as possible.

Change-Id: I2760127edd2c3c4ad26abd23e621059ac9609950
---
M 
jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala
M 
jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala
2 files changed, 87 insertions(+), 4 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/search/xgboost 
refs/changes/98/396098/1

diff --git 
a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala
 
b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala
index ea18ff2..bc052e2 100644
--- 
a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala
+++ 
b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala
@@ -105,6 +105,57 @@
     }
   }
 
+  private[spark] def buildLocalBoosters(
+      data: RDD[XGBLabeledPoint],
+      params: Map[String, Any],
+      round: Int,
+      obj: ObjectiveTrait,
+      eval: EvalTrait,
+      useExternalMemory: Boolean,
+      missing: Float): RDD[Array[Byte]] = {
+    val partitionedData = if (data.getNumPartitions != 1) {
+      logger.info(s"repartitioning training set to 1 partitions")
+      data.coalesce(1)
+    } else {
+      data
+    }
+    val partitionedBaseMargin = partitionedData.map(_.baseMargin)
+    val appName = partitionedData.context.appName
+    partitionedData.zipPartitions(partitionedBaseMargin) { (labeledPoints, 
baseMargins) =>
+      if (labeledPoints.isEmpty) {
+        throw new XGBoostError(
+          s"detected an empty partition in the training data, partition ID:" +
+            s" ${TaskContext.getPartitionId()}")
+      }
+      val cacheFileName = if (useExternalMemory) {
+        s"$appName-${TaskContext.get().stageId()}-" +
+          s"dtrain_cache-${TaskContext.getPartitionId()}"
+      } else {
+        null
+      }
+
+      // Yes it's odd to access this but not do anything. We are ensuring the 
lazily
+      // initialized resource monitor is setup before we enter training.
+      monitor
+
+      val watches = Watches(params,
+        fromDenseToSparseLabeledPoints(labeledPoints, missing),
+        fromBaseMarginsToArray(baseMargins), cacheFileName)
+      try {
+        val numEarlyStoppingRounds = params.get("numEarlyStoppingRounds")
+          .map(_.toString.toInt).getOrElse(0)
+        val booster = SXGBoost.train(watches.train, params, round,
+          watches = watches.toMap, obj = obj, eval = eval,
+          earlyStoppingRound = numEarlyStoppingRounds)
+        val bytes = booster.toByteArray
+        booster.dispose
+        Iterator(bytes)
+      } finally {
+        watches.delete()
+      }
+    }
+  }
+
   private[spark] def buildDistributedBoosters(
       data: RDD[XGBLabeledPoint],
       params: Map[String, Any],
@@ -302,8 +353,40 @@
     val xgbTrainingData = trainingData.map { case MLLabeledPoint(label, 
features) =>
       features.asXGB.copy(label = label.toFloat)
     }
-    trainDistributed(xgbTrainingData, params, round, nWorkers, obj, eval,
-      useExternalMemory, missing)
+    if (nWorkers == 1) {
+      trainLocal(xgbTrainingData, params, round, obj, eval, useExternalMemory, 
missing)
+    } else {
+      trainDistributed(xgbTrainingData, params, round, nWorkers, obj, eval,
+        useExternalMemory, missing)
+    }
+  }
+
+  @throws(classOf[XGBoostError])
+  private[spark] def trainLocal(
+      trainingData: RDD[XGBLabeledPoint],
+      params: Map[String, Any],
+      round: Int,
+      obj: ObjectiveTrait = null,
+      eval: EvalTrait = null,
+      useExternalMemory: Boolean = false,
+      missing: Float = Float.NaN): XGBoostModel = {
+    if (obj != null) {
+      require(params.get("obj_type").isDefined, "parameter \"obj_type\" is not 
defined," +
+        " you have to specify the objective type as classification or 
regression with a" +
+        " customized objective function")
+    }
+    val overriddenParams = overrideParamsAccordingToTaskCPUs(params, 
trainingData.sparkContext)
+    val boosterBytes = buildLocalBoosters(trainingData, overriddenParams,
+      round, obj, eval, useExternalMemory, missing).collect()(0)
+    val isClsTask = isClassificationTask(params)
+    val bais = new ByteArrayInputStream(boosterBytes)
+    val booster = SXGBoost.loadModel(bais)
+    val model = XGBoostModel(booster, isClsTask)
+    if (isClsTask) {
+      model.asInstanceOf[XGBoostClassificationModel].numOfClasses =
+        params.getOrElse("num_class", "2").toString.toInt
+    }
+    model
   }
 
   @throws(classOf[XGBoostError])
@@ -317,7 +400,7 @@
       useExternalMemory: Boolean = false,
       missing: Float = Float.NaN): XGBoostModel = {
     if (params.contains("tree_method")) {
-      require(params("tree_method") != "hist", "xgboost4j-spark does not 
support fast histogram" +
+      require(params("tree_method") != "hist", "xgboost4j-spark distributed 
does not support fast histogram" +
           " for now")
     }
     require(nWorkers > 0, "you must specify more than 0 workers")
diff --git 
a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala
 
b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala
index b8b0c1b..04be44f 100644
--- 
a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala
+++ 
b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala
@@ -346,7 +346,7 @@
     val trainGroupData: Seq[Seq[Int]] = Seq(Ranking.trainGroup0)
     val testRDD = sc.parallelize(Ranking.test, numSlices = 1).map(_.features)
 
-    val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
+    val paramMap = Map("eta" -> "1", "max_depth" -> "3", "silent" -> "1",
       "objective" -> "rank:pairwise", "eval_metric" -> "ndcg", "groupData" -> 
trainGroupData)
 
     val xgBoostModel = XGBoost.trainWithRDD(trainingRDD, paramMap, 2, nWorkers 
= 1)

-- 
To view, visit https://gerrit.wikimedia.org/r/396098
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I2760127edd2c3c4ad26abd23e621059ac9609950
Gerrit-PatchSet: 1
Gerrit-Project: search/xgboost
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to