Github user jayluan commented on the pull request:
https://github.com/apache/spark/pull/8896#issuecomment-143379925
Jira ticket created at
[SPARK-10821](https://issues.apache.org/jira/browse/SPARK-10821)
Toy test case to recreate the problem as follows. I can't seem to attach it
as a .txt for some reason...
RandomForestSuite.scala
-----------------------------------------------------------
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.mllib.tree
import scala.collection.mutable
import org.apache.spark.SparkFunSuite
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.configuration.Algo._
import org.apache.spark.mllib.tree.configuration.Strategy
import org.apache.spark.mllib.tree.impl.DecisionTreeMetadata
import org.apache.spark.mllib.tree.impurity.{Gini, Variance}
import org.apache.spark.mllib.tree.model.{Node, RandomForestModel}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.util.Utils
/**
* Test suite for [[RandomForest]].
*/
class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext {
def binaryClassificationTestWithContinuousFeatures(strategy: Strategy) {
val arr = EnsembleTestHelper.generateOrderedLabeledPoints(numFeatures =
50, 1000)
val rdd = sc.parallelize(arr)
val numTrees = 1
val rf = RandomForest.trainClassifier(rdd, strategy, numTrees =
numTrees,
featureSubsetStrategy = "auto", seed = 123)
assert(rf.trees.size === 1)
val rfTree = rf.trees(0)
val dt = DecisionTree.train(rdd, strategy)
EnsembleTestHelper.validateClassifier(rf, arr, 0.9)
DecisionTreeSuite.validateClassifier(dt, arr, 0.9)
// Make sure trees are the same.
assert(rfTree.toString == dt.toString)
}
test("Binary classification with continuous features:" +
" comparing DecisionTree vs. RandomForest(numTrees = 1)") {
val categoricalFeaturesInfo = Map.empty[Int, Int]
val strategy = new Strategy(algo = Classification, impurity = Gini,
maxDepth = 2,
numClasses = 2, categoricalFeaturesInfo = categoricalFeaturesInfo)
binaryClassificationTestWithContinuousFeatures(strategy)
}
test("Binary classification with continuous features and node Id cache :"
+
" comparing DecisionTree vs. RandomForest(numTrees = 1)") {
val categoricalFeaturesInfo = Map.empty[Int, Int]
val strategy = new Strategy(algo = Classification, impurity = Gini,
maxDepth = 2,
numClasses = 2, categoricalFeaturesInfo = categoricalFeaturesInfo,
useNodeIdCache = true)
binaryClassificationTestWithContinuousFeatures(strategy)
}
def regressionTestWithContinuousFeatures(strategy: Strategy) {
val arr = EnsembleTestHelper.generateOrderedLabeledPoints(numFeatures =
50, 1000)
val rdd = sc.parallelize(arr)
val numTrees = 1
val rf = RandomForest.trainRegressor(rdd, strategy, numTrees = numTrees,
featureSubsetStrategy = "auto", seed = 123)
assert(rf.trees.size === 1)
val rfTree = rf.trees(0)
val dt = DecisionTree.train(rdd, strategy)
EnsembleTestHelper.validateRegressor(rf, arr, 0.01)
DecisionTreeSuite.validateRegressor(dt, arr, 0.01)
// Make sure trees are the same.
assert(rfTree.toString == dt.toString)
}
test("Regression with continuous features:" +
" comparing DecisionTree vs. RandomForest(numTrees = 1)") {
val categoricalFeaturesInfo = Map.empty[Int, Int]
val strategy = new Strategy(algo = Regression, impurity = Variance,
maxDepth = 2, maxBins = 10, numClasses = 2,
categoricalFeaturesInfo = categoricalFeaturesInfo)
regressionTestWithContinuousFeatures(strategy)
}
test("Regression with continuous features and node Id cache :" +
" comparing DecisionTree vs. RandomForest(numTrees = 1)") {
val categoricalFeaturesInfo = Map.empty[Int, Int]
val strategy = new Strategy(algo = Regression, impurity = Variance,
maxDepth = 2, maxBins = 10, numClasses = 2,
categoricalFeaturesInfo = categoricalFeaturesInfo, useNodeIdCache =
true)
regressionTestWithContinuousFeatures(strategy)
}
def
binaryClassificationTestWithContinuousFeaturesAndSubsampledFeatures(strategy:
Strategy) {
val numFeatures = 50
val arr = EnsembleTestHelper.generateOrderedLabeledPoints(numFeatures,
1000)
val rdd = sc.parallelize(arr)
// Select feature subset for top nodes. Return true if OK.
def checkFeatureSubsetStrategy(
numTrees: Int,
featureSubsetStrategy: String,
numFeaturesPerNode: Int): Unit = {
val seeds = Array(123, 5354, 230, 349867, 23987)
val maxMemoryUsage: Long = 128 * 1024L * 1024L
val metadata =
DecisionTreeMetadata.buildMetadata(rdd, strategy, numTrees,
featureSubsetStrategy)
seeds.foreach { seed =>
val failString = s"Failed on test with:" +
s"numTrees=$numTrees,
featureSubsetStrategy=$featureSubsetStrategy," +
s" numFeaturesPerNode=$numFeaturesPerNode, seed=$seed"
val nodeQueue = new mutable.Queue[(Int, Node)]()
val topNodes: Array[Node] = new Array[Node](numTrees)
Range(0, numTrees).foreach { treeIndex =>
topNodes(treeIndex) = Node.emptyNode(nodeIndex = 1)
nodeQueue.enqueue((treeIndex, topNodes(treeIndex)))
}
val rng = new scala.util.Random(seed = seed)
val (nodesForGroup: Map[Int, Array[Node]],
treeToNodeToIndexInfo: Map[Int, Map[Int,
RandomForest.NodeIndexInfo]]) =
RandomForest.selectNodesToSplit(nodeQueue, maxMemoryUsage,
metadata, rng)
assert(nodesForGroup.size === numTrees, failString)
assert(nodesForGroup.values.forall(_.size == 1), failString) // 1
node per tree
if (numFeaturesPerNode == numFeatures) {
// featureSubset values should all be None
assert(treeToNodeToIndexInfo.values.forall(_.values.forall(_.featureSubset.isEmpty)),
failString)
} else {
// Check number of features.
assert(treeToNodeToIndexInfo.values.forall(_.values.forall(
_.featureSubset.get.size === numFeaturesPerNode)), failString)
}
}
}
checkFeatureSubsetStrategy(numTrees = 1, "auto", numFeatures)
checkFeatureSubsetStrategy(numTrees = 1, "all", numFeatures)
checkFeatureSubsetStrategy(numTrees = 1, "sqrt",
math.sqrt(numFeatures).ceil.toInt)
checkFeatureSubsetStrategy(numTrees = 1, "log2",
(math.log(numFeatures) / math.log(2)).ceil.toInt)
checkFeatureSubsetStrategy(numTrees = 1, "onethird", (numFeatures /
3.0).ceil.toInt)
checkFeatureSubsetStrategy(numTrees = 2, "all", numFeatures)
checkFeatureSubsetStrategy(numTrees = 2, "auto",
math.sqrt(numFeatures).ceil.toInt)
checkFeatureSubsetStrategy(numTrees = 2, "sqrt",
math.sqrt(numFeatures).ceil.toInt)
checkFeatureSubsetStrategy(numTrees = 2, "log2",
(math.log(numFeatures) / math.log(2)).ceil.toInt)
checkFeatureSubsetStrategy(numTrees = 2, "onethird", (numFeatures /
3.0).ceil.toInt)
}
test("Binary classification with continuous features: subsampling
features") {
val categoricalFeaturesInfo = Map.empty[Int, Int]
val strategy = new Strategy(algo = Classification, impurity = Gini,
maxDepth = 2,
numClasses = 2, categoricalFeaturesInfo = categoricalFeaturesInfo)
binaryClassificationTestWithContinuousFeaturesAndSubsampledFeatures(strategy)
}
test("Binary classification with continuous features and node Id cache:
subsampling features") {
val categoricalFeaturesInfo = Map.empty[Int, Int]
val strategy = new Strategy(algo = Classification, impurity = Gini,
maxDepth = 2,
numClasses = 2, categoricalFeaturesInfo = categoricalFeaturesInfo,
useNodeIdCache = true)
binaryClassificationTestWithContinuousFeaturesAndSubsampledFeatures(strategy)
}
test("alternating categorical and continuous features with multiclass
labels to test indexing") {
val arr = new Array[LabeledPoint](4)
arr(0) = new LabeledPoint(0.0, Vectors.dense(1.0, 0.0, 0.0, 3.0, 1.0))
arr(1) = new LabeledPoint(1.0, Vectors.dense(0.0, 1.0, 1.0, 1.0, 2.0))
arr(2) = new LabeledPoint(0.0, Vectors.dense(2.0, 0.0, 0.0, 6.0, 3.0))
arr(3) = new LabeledPoint(2.0, Vectors.dense(0.0, 2.0, 1.0, 3.0, 2.0))
val categoricalFeaturesInfo = Map(0 -> 3, 2 -> 2, 4 -> 4)
val input = sc.parallelize(arr)
val strategy = new Strategy(algo = Classification, impurity = Gini,
maxDepth = 5,
numClasses = 3, categoricalFeaturesInfo = categoricalFeaturesInfo)
val model = RandomForest.trainClassifier(input, strategy, numTrees = 2,
featureSubsetStrategy = "sqrt", seed = 12345)
}
test("subsampling rate in RandomForest"){
val arr = EnsembleTestHelper.generateOrderedLabeledPoints(5, 20)
val rdd = sc.parallelize(arr)
val strategy = new Strategy(algo = Classification, impurity = Gini,
maxDepth = 2,
numClasses = 2, categoricalFeaturesInfo = Map.empty[Int, Int],
useNodeIdCache = true)
val rf1 = RandomForest.trainClassifier(rdd, strategy, numTrees = 3,
featureSubsetStrategy = "auto", seed = 123)
strategy.subsamplingRate = 0.5
val rf2 = RandomForest.trainClassifier(rdd, strategy, numTrees = 3,
featureSubsetStrategy = "auto", seed = 123)
assert(rf1.toDebugString != rf2.toDebugString)
}
test("model save/load") {
val tempDir = Utils.createTempDir()
val path = tempDir.toURI.toString
Array(Classification, Regression).foreach { algo =>
val trees = Range(0, 3).map(_ =>
DecisionTreeSuite.createModel(algo)).toArray
val model = new RandomForestModel(algo, trees)
// Save model, load it back, and compare.
try {
model.save(sc, path)
val sameModel = RandomForestModel.load(sc, path)
assert(model.algo == sameModel.algo)
model.trees.zip(sameModel.trees).foreach { case (treeA, treeB) =>
DecisionTreeSuite.checkEqual(treeA, treeB)
}
} finally {
Utils.deleteRecursively(tempDir)
}
}
}
test("large sparse feature vector (15 million features)") {
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import org.apache.spark.{SparkConf, SparkContext}
//stop original context
sc.stop()
println("Oroginal Context Stopped")
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("Simple Application")
.set("spark.driver.memory", "5g")
.set("spark.executor.memory", "4g")
.set("spark.executor.cores", "2")
.set("spark.executor.instances", "4")
.set("spark.default.parallelism", "8")
.set("spark.driver.maxResultSize", "20000m")
sc = new SparkContext(conf)
println("New Context Created...")
val r = Random
var size = 3000000
var count = 3000
val indptr = (1 to size by size/count).toArray
val data = Seq.fill(count)(r.nextDouble()).toArray
var dset = ArrayBuffer[LabeledPoint]()
for (i <- 1 to 10) {
dset += LabeledPoint(r.nextInt(2), Vectors.sparse(size, indptr,
data));
}
val distData = sc.parallelize(dset)
val splits = distData.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a RandomForest model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val numTrees = 3 // Use more in practice.
val featureSubsetStrategy = "auto" // Let the algorithm choose.
val impurity = "gini"
val maxDepth = 4
val maxBins = 32
try{
val model = RandomForest.trainClassifier(trainingData, numClasses,
categoricalFeaturesInfo,
numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins)
}
catch{
case e: OutOfMemoryError => println(e)
println("Stopping Current Context")
//reset spark context
sc.stop()
sc = new SparkContext()
println("Created original Context")
}
//reset spark context
sc.stop()
sc = new SparkContext()
}
}
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]