EBernhardson has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/396098 )
Change subject: Specialize single-node training ...................................................................... Specialize single-node training XGBoost has a faster training method, tree_method -> hist, which is currently not implemented for distributed training. We actually train quite a few models on a single node (but with many models being trained in parallel) so it would be nice to be able to utilize this where possible. This is perhaps not implemented in the most optimal way if we were going to upstream the patch, but upstreaming is unlikely as upstream does not support training multiple models in parallel (we do through a custom hack). Rather than refactoring existing code this mostly adds new functions for specialized single-node training so that pulling in upstream changes will be as pain free as possible. Change-Id: I2760127edd2c3c4ad26abd23e621059ac9609950 --- M jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala M jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala 2 files changed, 87 insertions(+), 4 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/search/xgboost refs/changes/98/396098/1 diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala index ea18ff2..bc052e2 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala @@ -105,6 +105,57 @@ } } + private[spark] def buildLocalBoosters( + data: RDD[XGBLabeledPoint], + params: Map[String, Any], + round: Int, + obj: ObjectiveTrait, + eval: EvalTrait, + useExternalMemory: Boolean, + missing: Float): RDD[Array[Byte]] = { + val partitionedData = if (data.getNumPartitions != 1) { + logger.info(s"repartitioning training set to 1 partitions") + data.coalesce(1) + } else { + data + } + val partitionedBaseMargin = partitionedData.map(_.baseMargin) + val appName = partitionedData.context.appName + partitionedData.zipPartitions(partitionedBaseMargin) { (labeledPoints, baseMargins) => + if (labeledPoints.isEmpty) { + throw new XGBoostError( + s"detected an empty partition in the training data, partition ID:" + + s" ${TaskContext.getPartitionId()}") + } + val cacheFileName = if (useExternalMemory) { + s"$appName-${TaskContext.get().stageId()}-" + + s"dtrain_cache-${TaskContext.getPartitionId()}" + } else { + null + } + + // Yes it's odd to access this but not do anything. We are ensuring the lazily + // initialized resource monitor is setup before we enter training. + monitor + + val watches = Watches(params, + fromDenseToSparseLabeledPoints(labeledPoints, missing), + fromBaseMarginsToArray(baseMargins), cacheFileName) + try { + val numEarlyStoppingRounds = params.get("numEarlyStoppingRounds") + .map(_.toString.toInt).getOrElse(0) + val booster = SXGBoost.train(watches.train, params, round, + watches = watches.toMap, obj = obj, eval = eval, + earlyStoppingRound = numEarlyStoppingRounds) + val bytes = booster.toByteArray + booster.dispose + Iterator(bytes) + } finally { + watches.delete() + } + } + } + private[spark] def buildDistributedBoosters( data: RDD[XGBLabeledPoint], params: Map[String, Any], @@ -302,8 +353,40 @@ val xgbTrainingData = trainingData.map { case MLLabeledPoint(label, features) => features.asXGB.copy(label = label.toFloat) } - trainDistributed(xgbTrainingData, params, round, nWorkers, obj, eval, - useExternalMemory, missing) + if (nWorkers == 1) { + trainLocal(xgbTrainingData, params, round, obj, eval, useExternalMemory, missing) + } else { + trainDistributed(xgbTrainingData, params, round, nWorkers, obj, eval, + useExternalMemory, missing) + } + } + + @throws(classOf[XGBoostError]) + private[spark] def trainLocal( + trainingData: RDD[XGBLabeledPoint], + params: Map[String, Any], + round: Int, + obj: ObjectiveTrait = null, + eval: EvalTrait = null, + useExternalMemory: Boolean = false, + missing: Float = Float.NaN): XGBoostModel = { + if (obj != null) { + require(params.get("obj_type").isDefined, "parameter \"obj_type\" is not defined," + + " you have to specify the objective type as classification or regression with a" + + " customized objective function") + } + val overriddenParams = overrideParamsAccordingToTaskCPUs(params, trainingData.sparkContext) + val boosterBytes = buildLocalBoosters(trainingData, overriddenParams, + round, obj, eval, useExternalMemory, missing).collect()(0) + val isClsTask = isClassificationTask(params) + val bais = new ByteArrayInputStream(boosterBytes) + val booster = SXGBoost.loadModel(bais) + val model = XGBoostModel(booster, isClsTask) + if (isClsTask) { + model.asInstanceOf[XGBoostClassificationModel].numOfClasses = + params.getOrElse("num_class", "2").toString.toInt + } + model } @throws(classOf[XGBoostError]) @@ -317,7 +400,7 @@ useExternalMemory: Boolean = false, missing: Float = Float.NaN): XGBoostModel = { if (params.contains("tree_method")) { - require(params("tree_method") != "hist", "xgboost4j-spark does not support fast histogram" + + require(params("tree_method") != "hist", "xgboost4j-spark distributed does not support fast histogram" + " for now") } require(nWorkers > 0, "you must specify more than 0 workers") diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala index b8b0c1b..04be44f 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala @@ -346,7 +346,7 @@ val trainGroupData: Seq[Seq[Int]] = Seq(Ranking.trainGroup0) val testRDD = sc.parallelize(Ranking.test, numSlices = 1).map(_.features) - val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", + val paramMap = Map("eta" -> "1", "max_depth" -> "3", "silent" -> "1", "objective" -> "rank:pairwise", "eval_metric" -> "ndcg", "groupData" -> trainGroupData) val xgBoostModel = XGBoost.trainWithRDD(trainingRDD, paramMap, 2, nWorkers = 1) -- To view, visit https://gerrit.wikimedia.org/r/396098 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I2760127edd2c3c4ad26abd23e621059ac9609950 Gerrit-PatchSet: 1 Gerrit-Project: search/xgboost Gerrit-Branch: master Gerrit-Owner: EBernhardson <ebernhard...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits