I see. You might try this, create a pipeline of just your feature transformers, then call fit() on the complete dataset to get a model. Finally make second pipeline and add this model and the decision tree as stages.
On Aug 30, 2016 8:19 PM, "Bahubali Jain" <bahub...@gmail.com> wrote: > Hi Bryan, > Thanks for the reply. > I am indexing 5 columns ,then using these indexed columns to generate the > "feature" column thru vector assembler. > Which essentially means that I cannot use *fit()* directly on > "completeDataset" dataframe since it will neither have the "feature" column > and nor the 5 indexed columns. > Of-course there is a dirty way of doing this, but I am wondering if there > some optimized/intelligent approach for this. > > Thanks, > Baahu > > On Wed, Aug 31, 2016 at 3:30 AM, Bryan Cutler <cutl...@gmail.com> wrote: > >> You need to first fit just the VectorIndexer which returns the model, >> then add the model to the pipeline where it will only transform. >> >> val featureVectorIndexer = new VectorIndexer() >> .setInputCol("feature") >> .setOutputCol("indexedfeature") >> .setMaxCategories(180) >> .fit(completeDataset) >> >> On Tue, Aug 30, 2016 at 9:57 AM, Bahubali Jain <bahub...@gmail.com> >> wrote: >> >>> Hi, >>> I had run into similar exception " java.util.NoSuchElementException: >>> key not found: " . >>> After further investigation I realized it is happening due to >>> vectorindexer being executed on training dataset and not on entire dataset. >>> >>> In the dataframe I have 5 categories , each of these have to go thru >>> stringindexer and then these are put thru a vector indexer to generate >>> feature vector. >>> What is the right way to do this, so that vector indexer can be run on >>> the entire data and not just on training data? >>> >>> Below is the current approach, as evident VectorIndexer is being >>> generated based on the training set. >>> >>> Please Note: fit() on Vectorindexer cannot be called on entireset >>> dataframe since it doesn't have the required column(*feature *column is >>> being generated dynamically in pipeline execution) >>> How can the vectorindexer be *fit()* on the entireset? >>> >>> val col1_indexer = new StringIndexer().setInputCol("c >>> ol1").setOutputCol("indexed_col1") >>> val col2_indexer = new StringIndexer().setInputCol("c >>> ol2").setOutputCol("indexed_col2") >>> val col3_indexer = new StringIndexer().setInputCol("c >>> ol3").setOutputCol("indexed_col3") >>> val col4_indexer = new StringIndexer().setInputCol("c >>> ol4").setOutputCol("indexed_col4") >>> val col5_indexer = new StringIndexer().setInputCol("c >>> ol5").setOutputCol("indexed_col5") >>> >>> val featureArray = Array("indexed_col1","indexed_ >>> col2","indexed_col3","indexed_col4","indexed_col5") >>> val vectorAssembler = new VectorAssembler().setInputCols >>> (featureArray).setOutputCol("*feature*") >>> val featureVectorIndexer = new VectorIndexer() >>> .setInputCol("feature") >>> .setOutputCol("indexedfeature") >>> .setMaxCategories(180) >>> >>> val decisionTree = new DecisionTreeClassifier().setMa >>> xBins(300).setMaxDepth(1).setImpurity("entropy").setLabelCol >>> ("indexed_user_action").setFeaturesCol("indexedfeature"). >>> setPredictionCol("prediction") >>> >>> val pipeline = new Pipeline().setStages(Array(col1_indexer,col2_indexer, >>> col3_indexer,col4_indexer,col5_indexer,vectorAssembler,featureVecto >>> rIndexer,decisionTree)) >>> val model = pipeline.*fit(trainingSet)* >>> val output = model.transform(cvSet) >>> >>> >>> Thanks, >>> Baahu >>> >>> On Fri, Jul 8, 2016 at 11:24 PM, Bryan Cutler <cutl...@gmail.com> wrote: >>> >>>> Hi Rich, >>>> >>>> I looked at the notebook and it seems like you are fitting the >>>> StringIndexer and VectorIndexer to only the training data, and it should >>>> the the entire data set. So if the training data does not include all of >>>> the labels and an unknown label appears in the test data during evaluation, >>>> then it will not know how to index it. So your code should be like this, >>>> fit with 'digits' instead of 'training' >>>> >>>> val labelIndexer = new StringIndexer().setInputCol("l >>>> abel").setOutputCol("indexedLabel").fit(digits) >>>> // Automatically identify categorical features, and index them. >>>> // Set maxCategories so features with > 4 distinct values are treated >>>> as continuous. >>>> val featureIndexer = new VectorIndexer().setInputCol("f >>>> eatures").setOutputCol("indexedFeatures").setMaxCategories(4 >>>> ).fit(digits) >>>> >>>> Hope that helps! >>>> >>>> On Fri, Jul 1, 2016 at 9:24 AM, Rich Tarro <richta...@gmail.com> wrote: >>>> >>>>> Hi Bryan. >>>>> >>>>> Thanks for your continued help. >>>>> >>>>> Here is the code shown in a Jupyter notebook. I figured this was >>>>> easier that cutting and pasting the code into an email. If you would like >>>>> me to send you the code in a different format let, me know. The necessary >>>>> data is all downloaded within the notebook itself. >>>>> >>>>> https://console.ng.bluemix.net/data/notebooks/fe7e578a-401f- >>>>> 4744-a318-b1b6bcf6f5f8/view?access_token=2f6df7b1dfcb3c1c2d9 >>>>> 4a794506bb282729dab8f05118fafe5f11886326e02fc >>>>> >>>>> A few additional pieces of information. >>>>> >>>>> 1. The training dataset is cached before training the model. If you do >>>>> not cache the training dataset, the model will not train. The code >>>>> model.transform(test) fails with a similar error. No other changes besides >>>>> caching or not caching. Again, with the training dataset cached, the model >>>>> can be successfully trained as seen in the notebook. >>>>> >>>>> 2. I have another version of the notebook where I download the same >>>>> data in libsvm format rather than csv. That notebook works fine. All the >>>>> code is essentially the same accounting for the difference in file >>>>> formats. >>>>> >>>>> 3. I tested this same code on another Spark cloud platform and it >>>>> displays the same symptoms when run there. >>>>> >>>>> Thanks. >>>>> Rich >>>>> >>>>> >>>>> On Wed, Jun 29, 2016 at 12:59 AM, Bryan Cutler <cutl...@gmail.com> >>>>> wrote: >>>>> >>>>>> Are you fitting the VectorIndexer to the entire data set and not just >>>>>> training or test data? If you are able to post your code and some data >>>>>> to >>>>>> reproduce, that would help in troubleshooting. >>>>>> >>>>>> On Tue, Jun 28, 2016 at 4:40 PM, Rich Tarro <richta...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks for the response, but in my case I reversed the meaning of >>>>>>> "prediction" and "predictedLabel". It seemed to make more sense to me >>>>>>> that >>>>>>> way, but in retrospect, it probably only causes confusion to anyone else >>>>>>> looking at this. I reran the code with all the pipeline stage inputs and >>>>>>> outputs named exactly as in the Random Forest Classifier example to make >>>>>>> sure I hadn't messed anything up when I renamed things. Same error. >>>>>>> >>>>>>> I'm still at the point where I can train the model and make >>>>>>> predictions, but not able to get the MulticlassClassificationEvaluator >>>>>>> to work on the DataFrame of predictions. >>>>>>> >>>>>>> Any other suggestions? Thanks. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Jun 28, 2016 at 4:21 PM, Rich Tarro <richta...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> I created a ML pipeline using the Random Forest Classifier - >>>>>>>> similar to what is described here except in my case the source data is >>>>>>>> in >>>>>>>> csv format rather than libsvm. >>>>>>>> >>>>>>>> https://spark.apache.org/docs/latest/ml-classification-regre >>>>>>>> ssion.html#random-forest-classifier >>>>>>>> >>>>>>>> I am able to successfully train the model and make predictions (on >>>>>>>> test data not used to train the model) as shown here. >>>>>>>> >>>>>>>> +------------+--------------+-----+----------+--------------------+ >>>>>>>> |indexedLabel|predictedLabel|label|prediction| features| >>>>>>>> +------------+--------------+-----+----------+--------------------+ >>>>>>>> | 4.0| 4.0| 0| 0|(784,[124,125,126...| >>>>>>>> | 2.0| 2.0| 3| 3|(784,[119,120,121...| >>>>>>>> | 8.0| 8.0| 8| 8|(784,[180,181,182...| >>>>>>>> | 0.0| 0.0| 1| 1|(784,[154,155,156...| >>>>>>>> | 3.0| 8.0| 2| 8|(784,[148,149,150...| >>>>>>>> +------------+--------------+-----+----------+--------------------+ >>>>>>>> only showing top 5 rows >>>>>>>> >>>>>>>> However, when I attempt to calculate the error between the >>>>>>>> indexedLabel and the precictedLabel using the >>>>>>>> MulticlassClassificationEvaluator, I get the NoSuchElementException >>>>>>>> error attached below. >>>>>>>> >>>>>>>> val evaluator = new >>>>>>>> MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("predictedLabel").setMetricName("precision") >>>>>>>> val accuracy = evaluator.evaluate(predictions) >>>>>>>> println("Test Error = " + (1.0 - accuracy)) >>>>>>>> >>>>>>>> What could be the issue? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Name: org.apache.spark.SparkException >>>>>>>> Message: Job aborted due to stage failure: Task 2 in stage 49.0 failed >>>>>>>> 10 times, most recent failure: Lost task 2.9 in stage 49.0 (TID 162, >>>>>>>> yp-spark-dal09-env5-0024): java.util.NoSuchElementException: key not >>>>>>>> found: 132.0 >>>>>>>> at scala.collection.MapLike$class.default(MapLike.scala:228) >>>>>>>> at scala.collection.AbstractMap.default(Map.scala:58) >>>>>>>> at scala.collection.MapLike$class.apply(MapLike.scala:141) >>>>>>>> at scala.collection.AbstractMap.apply(Map.scala:58) >>>>>>>> at >>>>>>>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:331) >>>>>>>> at >>>>>>>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:309) >>>>>>>> at >>>>>>>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351) >>>>>>>> at >>>>>>>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351) >>>>>>>> at >>>>>>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown >>>>>>>> Source) >>>>>>>> at >>>>>>>> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67) >>>>>>>> at >>>>>>>> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:74) >>>>>>>> at >>>>>>>> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:72) >>>>>>>> at >>>>>>>> scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) >>>>>>>> at >>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>> at >>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>> at >>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>> at >>>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>>> at >>>>>>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189) >>>>>>>> at >>>>>>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>>>>>>> at >>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>>>>>>> at >>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) >>>>>>>> at >>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >>>>>>>> at java.lang.Thread.run(Thread.java:785) >>>>>>>> >>>>>>>> Driver stacktrace: >>>>>>>> StackTrace: >>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) >>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) >>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) >>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) >>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) >>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) >>>>>>>> scala.Option.foreach(Option.scala:236) >>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) >>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) >>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) >>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) >>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>>>>>> java.lang.Thread.getStackTrace(Thread.java:1117) >>>>>>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) >>>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1837) >>>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1850) >>>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1863) >>>>>>>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1934) >>>>>>>> org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) >>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) >>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) >>>>>>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:316) >>>>>>>> org.apache.spark.rdd.RDD.collect(RDD.scala:926) >>>>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:741) >>>>>>>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$collectAsMap$1.apply(PairRDDFunctions.scala:740) >>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) >>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) >>>>>>>> org.apache.spark.rdd.RDD.withScope(RDD.scala:316) >>>>>>>> org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:740) >>>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass$lzycompute(MulticlassMetrics.scala:49) >>>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass(MulticlassMetrics.scala:45) >>>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.precision$lzycompute(MulticlassMetrics.scala:142) >>>>>>>> org.apache.spark.mllib.evaluation.MulticlassMetrics.precision(MulticlassMetrics.scala:142)org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator.evaluate(MulticlassClassificationEvaluator.scala:84) >>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59) >>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64) >>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66) >>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68) >>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70) >>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72) >>>>>>>> $line110.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74) >>>>>>>> $line110.$read$$iwC$$iwC$$iwC.<init>(<console>:76) >>>>>>>> $line110.$read$$iwC$$iwC.<init>(<console>:78) >>>>>>>> $line110.$read$$iwC.<init>(<console>:80) >>>>>>>> $line110.$read.<init>(<console>:82) >>>>>>>> $line110.$read$.<init>(<console>:86) >>>>>>>> $line110.$read$.<clinit>(<console>) >>>>>>>> $line110.$eval$.<init>(<console>:7) >>>>>>>> $line110.$eval$.<clinit>(<console>) >>>>>>>> $line110.$eval.$print(<console>) >>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95) >>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55) >>>>>>>> java.lang.reflect.Method.invoke(Method.java:507) >>>>>>>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) >>>>>>>> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) >>>>>>>> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) >>>>>>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) >>>>>>>> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) >>>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:296) >>>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:291) >>>>>>>> com.ibm.spark.global.StreamState$.withStreams(StreamState.scala:80) >>>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290) >>>>>>>> com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290) >>>>>>>> com.ibm.spark.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:123) >>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153) >>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >>>>>>>> java.lang.Thread.run(Thread.java:785) >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >>> -- >>> Twitter:http://twitter.com/Baahu >>> >>> >> > > > -- > Twitter:http://twitter.com/Baahu > >