Re: Spark ML Random Forest output.

2015-12-05 Thread Benjamin Fradet
Hi,

To get back the original labels after indexing them with StringIndexer, I
usually use IndexToString

to retrieve my original labels like so:

val labelIndexer = new StringIndexer()
  .setInputCol(myInputLabelColumnName)
  .setOutputCol(myIndexedLabelColumnName)
  .fit(myData)

val randomForest = new RandomForestClassifier()
  .setLabelCol(myIndexedLabelColumnName)
  .setFeaturesCol(myFeaturesColumnName)

val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol(myPredictionColumnWithTheOriginalLabels)
  .setLabels(labelIndexer.labels)

 val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, randomForest, labelConverter))

Hoping that helps,
Ben.

On Sat, Dec 5, 2015 at 12:26 PM, Eugene Morozov 
wrote:

> Figured that out.
>
> StringIndexerModel has field / method labels(), which returns array of
> labels.
> Currently prediction return indices of that array. Which is the subject to
> change: https://issues.apache.org/jira/browse/SPARK-7126.
>
> Having my pipeline model serialized to file and beaing read from it:
>
> ((StringIndexerModel)readModel.stages()[0]).labels()
>
> readModel here is a PipelineModel.
>
>
> --
> Be well!
> Jean Morozov
>
> On Sat, Dec 5, 2015 at 12:06 PM, Eugene Morozov <
> evgeny.a.moro...@gmail.com> wrote:
>
>> Vishnu, thanks for the response.
>>
>> The problem is that I actually do not have index labels, they are hidden
>> in the dataframe as a metadata. And anyone, who'd like to use that have to
>> apply an ugly hack.
>>
>> The issue might be even worse in case I serialize my model into a file
>> for a delayed use. When I later on read it from the file, I do not have
>> such a map at all. The only workaround is to store the map along with
>> serialized model, which is not really great.
>>
>> --
>> Be well!
>> Jean Morozov
>>
>> On Sat, Dec 5, 2015 at 2:24 AM, Vishnu Viswanath <
>> vishnu.viswanat...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> As per my understanding the probability matrix is giving the probability
>>> that that particular item can belong to each class. So the one with highest
>>> probability is your predicted class.
>>>
>>> Since you have converted you label to index label, according the model
>>> the classes are 0.0 to 9.0 and I see you are getting prediction as a value
>>> which is in [0.0,1.0,,9.0] -  which is correct.
>>>
>>> So what you want is a reverse map that can convert your predicted class
>>> back to the String. I don't know if  StringIndexer has such an option, may
>>> be you can create your own map and reverse map of (label to index) and
>>> (index to label) and use this for getting back your original label.
>>>
>>> May be there is better way to do this..
>>>
>>> Regards,
>>> Vishnu
>>>
>>> On Fri, Dec 4, 2015 at 4:56 PM, Eugene Morozov <
>>> evgeny.a.moro...@gmail.com> wrote:
>>>
 Hello,

 I've got an input dataset of handwritten digits and working java code
 that uses random forest classification algorithm to determine the numbers.
 My test set is just some lines from the same input dataset - just to be
 sure I'm doing the right thing. My understanding is that having correct
 classifier in this case would give me the correct prediction.
 At the moment overfitting is not an issue.

 After applying StringIndexer to my input DataFrame I've applied an ugly
 trick and got "indexedLabel" metadata:

 {"ml_attr":{"vals":["1.0","7.0","3.0","9.0","2.0","6.0","0.0","4.0","8.0","5.0"],"type":"nominal","name":"indexedLabel"}}


 So, my algorithm gives me the following result. The question is I'm not
 sure I understand the meaning of the "prediction" here in the output. It
 looks like it's just an index of the highest probability value in the
 "prob" array. Shouldn't "prediction" be the actual class, i.e. one of the
 "0.0", "1.0", ..., "9.0"? If the prediction is just an ordinal number, then
 I have to manually correspond it to my classes, but for that I have to
 either specify classes manually to know their order or somehow be able to
 get them out of the classifier. Both of these way seem to be are not
 accessible.

 (4.0 -> prediction=7.0,
 prob=[0.004708283878223195,0.08478236104777455,0.03594642191080532,0.19286506771018885,0.038304389235523435,0.02841307797386,0.003334431932056404,0.5685242322346109,0.018564705500837587,0.024557028569980155]
 (9.0 -> prediction=3.0,
 prob=[0.018432404716456248,0.16837195846781422,0.05995559403934031,0.32282148259583565,0.018374168600855455,0.04792285114398864,0.018226352623526704,0.1611650363085499,0.11703073969440755,0.06769941180922535]
 (2.0 -> prediction=4.0,
 

Re: Grid search with Random Forest

2015-11-30 Thread Benjamin Fradet
Hi Ndjido,

This is because GBTClassifier doesn't yet have a rawPredictionCol like the.
RandomForestClassifier has.
Cf:
http://spark.apache.org/docs/latest/ml-ensembles.html#output-columns-predictions-1
On 1 Dec 2015 3:57 a.m., "Ndjido Ardo BAR"  wrote:

> Hi Joseph,
>
> Yes Random Forest support Grid Search on Spark 1.5.+ . But I'm getting a
> "rawPredictionCol field does not exist exception" on Spark 1.5.2 for
> Gradient Boosting Trees classifier.
>
>
> Ardo
> On Tue, 1 Dec 2015 at 01:34, Joseph Bradley  wrote:
>
>> It should work with 1.5+.
>>
>> On Thu, Nov 26, 2015 at 12:53 PM, Ndjido Ardo Bar 
>> wrote:
>>
>>>
>>> Hi folks,
>>>
>>> Does anyone know whether the Grid Search capability is enabled since the
>>> issue spark-9011 of version 1.4.0 ? I'm getting the "rawPredictionCol
>>> column doesn't exist" when trying to perform a grid search with Spark 1.4.0.
>>>
>>> Cheers,
>>> Ardo
>>>
>>>
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>


Re: Retrieve best parameters from CrossValidator

2015-11-29 Thread Benjamin Fradet
Hi Yanbo,

Thanks for your answer, I'm looking forward to 1.6 then.

On Sun, Nov 29, 2015 at 3:44 PM, Yanbo Liang  wrote:

> Hi Ben,
>
> We can get the best model from CrossValidatorModel.BestModel, further
> more we can use the write function of CrossValidatorModel
> 
> to implement model persistent and use the best model at other place (after
> 1.6 release). So I think to expose the best model parameters as public API
> is not very necessary.
>
> 2015-11-29 7:36 GMT+08:00 BenFradet :
>
>> Hi everyone,
>>
>> Is there a better way to retrieve the best model parameters obtained from
>> cross validation than inspecting the logs issued while calling the fit
>> method (with the call here:
>>
>> https://github.com/apache/spark/blob/branch-1.5/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L106
>> )?
>>
>> Wouldn't it be useful to expose this to the end user through the
>> crossValidatorModel?
>>
>> Thanks for your response.
>>
>> Best,
>> Ben.
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Retrieve-best-parameters-from-CrossValidator-tp25508.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
Ben Fradet.


Drop multiple columns in the DataFrame API

2015-11-19 Thread Benjamin Fradet
Hi everyone,

I was wondering if there is a better way to drop mutliple columns from a
dataframe or why there is no drop(cols: Column*) method in the dataframe
API.

Indeed, I tend to write code like this:

val filteredDF = df.drop("colA")
   .drop("colB")
   .drop("colC")
//etc

which is a bit lengthy, or:

val colsToRemove = Seq("colA", "colB", "colC", etc)
val filteredDF = df.select(df.columns
  .filter(colName => !colsToRemove.contains(colName))
  .map(colName => new Column(colName)): _*)

which is, I think, a bit ugly.

Thanks,

-- 
Ben Fradet.


Re: Setting JVM heap start and max sizes, -Xms and -Xmx, for executors

2015-07-02 Thread Benjamin Fradet
Hi,

You can set those parameters through the

spark.executor.extraJavaOptions

Which is documented in the configuration guide:
spark.apache.org/docs/latest/configuration.htnl
On 2 Jul 2015 9:06 pm, Mulugeta Mammo mulugeta.abe...@gmail.com wrote:

 Hi,

 I'm running Spark 1.4.0, I want to specify the start and max size (-Xms
 and Xmx) of the jvm heap size for my executors, I tried:

 executor.cores.memory=-Xms1g -Xms8g

 but doesn't work. How do I specify?

 Appreciate your help.

 Thanks,




Re: Kafka Direct Stream - Custom Serialization and Deserilization

2015-06-26 Thread Benjamin Fradet
There is one for the key of your Kafka message and one for its value.
On 26 Jun 2015 4:21 pm, Ashish Soni asoni.le...@gmail.com wrote:

 my question is why there are similar two parameter String.Class and
 StringDecoder.class what is the difference each of them ?

 Ashish

 On Fri, Jun 26, 2015 at 8:53 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 ​JavaPairInputDStreamString, String messages =
 KafkaUtils.createDirectStream(
 jssc,
 String.class,
 String.class,
 StringDecoder.class,
 StringDecoder.class,
 kafkaParams,
 topicsSet
 );

 Here:

 jssc = JavaStreamingContext
 String.class = Key , Value classes
 StringDecoder = Key, Value decoder classes
 KafkaParams = Map in which you specify all the kafka details (like
 brokers, offset etc)
 topicSet = Set of topics from which you want to consume data.​

 ​Here's a sample program
 https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
 for you to start.​



 Thanks
 Best Regards

 On Fri, Jun 26, 2015 at 6:09 PM, Ashish Soni asoni.le...@gmail.com
 wrote:

 Hi ,

 If i have a below data format , how can i use kafka direct stream to
 de-serialize as i am not able to understand all the parameter i need to
 pass , Can some one explain what will be the arguments as i am not clear
 about this

 JavaPairInputDStreamK, V org.apache.spark.streaming.kafka.KafkaUtils
 .createDirectStream(JavaStreamingContext arg0, ClassK arg1, ClassV
 arg2, ClassKD arg3, ClassVD arg4, MapString, String arg5, Set
 String arg6)

 ID
 Name
 Unit
 Rate
 Duration






Re: [Spark Streaming] Null Pointer Exception when accessing broadcast variable to store a hashmap in Java

2015-06-23 Thread Benjamin Fradet
Are you using checkpointing?

I had a similar issue when recreating a streaming context from checkpoint
as broadcast variables are not checkpointed.
On 23 Jun 2015 5:01 pm, Nipun Arora nipunarora2...@gmail.com wrote:

 Hi,

 I have a spark streaming application where I need to access a model saved
 in a HashMap.
 I have *no problems in running the same code with broadcast variables in
 the local installation.* However I get a *null pointer* *exception* when
 I deploy it on my spark test cluster.


 I have stored a model in a HashMapString, FieldModel which is
 serializable. I use a broadcast variables declared as a global static
 variable to broadcast this hashmap:

 public static BroadcastHashMapString,FieldModel br;

 HashMapString,FieldModel hm = checkerObj.getModel(esserver, type);

 br = ssc.sparkContext().broadcast(hm);


 I need to access this model in my mapper phase, and do some operation
 based on the checkup. The following is a snippet of how I access the
 broadcast variable.


 JavaDStreamTuple3Long,Double,String split = matched.map(new 
 GenerateType2Scores());


 class GenerateType2Scores implements FunctionString, Tuple3Long, Double, 
 String {
 @Override
 public Tuple3Long, Double, String call(String s) throws Exception{

 Long time = Type2ViolationChecker.getMTS(s);
 HashMapString,FieldModel temphm= Type2ViolationChecker.br.value();

 Double score = Type2ViolationChecker.getAnomalyScore(temphm,s);
 return new Tuple3Long, Double, String(time,score, s);}
 }

 The temphm should refer to the hashmap stored in the broadcast variable.
 Can anyone help me understand what is the correct way to access broadcast
 variables in JAVA?

 Thanks
 Nipun



Re: [Spark Streaming 1.4.0] SPARK-5063, Checkpointing and queuestream

2015-06-22 Thread Benjamin Fradet
Where does task_batches come from?
On 22 Jun 2015 4:48 pm, Shaanan Cohney shaan...@gmail.com wrote:

 Thanks,

 I've updated my code to use updateStateByKey but am still getting these
 errors when I resume from a checkpoint.

 One thought of mine was that I used sc.parallelize to generate the RDDs
 for the queue, but perhaps on resume, it doesn't recreate the context
 needed?



 --

 Shaanan Cohney
 PhD Student
 University of Pennsylvania


 shaan...@gmail.com

 On Mon, Jun 22, 2015 at 9:27 PM, Benjamin Fradet 
 benjamin.fra...@gmail.com wrote:

 I would suggest you have a look at the updateStateByKey transformation in
 the Spark Streaming programming guide which should fit your needs better
 than your update_state function.
 On 22 Jun 2015 1:03 pm, Shaanan Cohney shaan...@gmail.com wrote:

 Counts is a list (counts = []) in the driver, used to collect the
 results.
 It seems like it's also not the best way to be doing things, but I'm new
 to spark and editing someone else's code so still learning.
 Thanks!


 def update_state(out_files, counts, curr_rdd):
 try:
 for c in curr_rdd.collect():
 fnames, count = c
 counts.append(count)
 out_files |= fnames
 except Py4JJavaError as e:
 print(EXCEPTION: %s % str(e))

 --

 Shaanan Cohney
 PhD Student
 University of Pennsylvania


 shaan...@gmail.com

 On Mon, Jun 22, 2015 at 8:56 PM, Benjamin Fradet 
 benjamin.fra...@gmail.com wrote:

 What does counts refer to?

 Could you also paste the code of your update_state function?
 On 22 Jun 2015 12:48 pm, Shaanan Cohney shaan...@gmail.com wrote:

 I'm receiving the SPARK-5063 error (RDD transformations and actions
 can only be invoked by the driver, not inside of other transformations)
 whenever I try and restore from a checkpoint in spark streaming on my app.

 I'm using python3 and my RDDs are inside a queuestream DStream.

 This is the little chunk of code causing issues:

 -

 p_batches = [sc.parallelize(batch) for batch in task_batches]

 sieving_tasks = ssc.queueStream(p_batches)
 sieving_tasks.checkpoint(20)
 relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly,
 poly_path, fb_paths))
 relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1])
 ).foreachRDD(lambda s: update_state(out_files, counts, s))
 ssc.checkpoint(s3n_path)

 -

 Thanks again!



 --

 Shaanan Cohney
 PhD Student
 University of Pennsylvania


 shaan...@gmail.com






Re: [Spark Streaming 1.4.0] SPARK-5063, Checkpointing and queuestream

2015-06-22 Thread Benjamin Fradet
What does counts refer to?

Could you also paste the code of your update_state function?
On 22 Jun 2015 12:48 pm, Shaanan Cohney shaan...@gmail.com wrote:

 I'm receiving the SPARK-5063 error (RDD transformations and actions can
 only be invoked by the driver, not inside of other transformations)
 whenever I try and restore from a checkpoint in spark streaming on my app.

 I'm using python3 and my RDDs are inside a queuestream DStream.

 This is the little chunk of code causing issues:

 -

 p_batches = [sc.parallelize(batch) for batch in task_batches]

 sieving_tasks = ssc.queueStream(p_batches)
 sieving_tasks.checkpoint(20)
 relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly,
 poly_path, fb_paths))
 relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1])
 ).foreachRDD(lambda s: update_state(out_files, counts, s))
 ssc.checkpoint(s3n_path)

 -

 Thanks again!



 --

 Shaanan Cohney
 PhD Student
 University of Pennsylvania


 shaan...@gmail.com



Re: [Spark Streaming 1.4.0] SPARK-5063, Checkpointing and queuestream

2015-06-22 Thread Benjamin Fradet
I would suggest you have a look at the updateStateByKey transformation in
the Spark Streaming programming guide which should fit your needs better
than your update_state function.
On 22 Jun 2015 1:03 pm, Shaanan Cohney shaan...@gmail.com wrote:

 Counts is a list (counts = []) in the driver, used to collect the results.
 It seems like it's also not the best way to be doing things, but I'm new
 to spark and editing someone else's code so still learning.
 Thanks!


 def update_state(out_files, counts, curr_rdd):
 try:
 for c in curr_rdd.collect():
 fnames, count = c
 counts.append(count)
 out_files |= fnames
 except Py4JJavaError as e:
 print(EXCEPTION: %s % str(e))

 --

 Shaanan Cohney
 PhD Student
 University of Pennsylvania


 shaan...@gmail.com

 On Mon, Jun 22, 2015 at 8:56 PM, Benjamin Fradet 
 benjamin.fra...@gmail.com wrote:

 What does counts refer to?

 Could you also paste the code of your update_state function?
 On 22 Jun 2015 12:48 pm, Shaanan Cohney shaan...@gmail.com wrote:

 I'm receiving the SPARK-5063 error (RDD transformations and actions can
 only be invoked by the driver, not inside of other transformations)
 whenever I try and restore from a checkpoint in spark streaming on my app.

 I'm using python3 and my RDDs are inside a queuestream DStream.

 This is the little chunk of code causing issues:

 -

 p_batches = [sc.parallelize(batch) for batch in task_batches]

 sieving_tasks = ssc.queueStream(p_batches)
 sieving_tasks.checkpoint(20)
 relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly,
 poly_path, fb_paths))
 relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1])
 ).foreachRDD(lambda s: update_state(out_files, counts, s))
 ssc.checkpoint(s3n_path)

 -

 Thanks again!



 --

 Shaanan Cohney
 PhD Student
 University of Pennsylvania


 shaan...@gmail.com





Re: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl

2015-06-09 Thread Benjamin Fradet
Hi,

Are you restarting your Spark streaming context through getOrCreate?
On 9 Jun 2015 09:30, Haopu Wang hw...@qilinsoft.com wrote:

 When I ran a spark streaming application longer, I noticed the local
 directory's size was kept increasing.

 I set spark.cleaner.ttl to 1800 seconds in order clean the metadata.

 The spark streaming batch duration is 10 seconds and checkpoint duration
 is 10 minutes.

 The setting took effect but after that, below exception happened.

 Do you have any idea about this error? Thank you!

 

 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0
 (TID 27045, host2): java.io.IOException:
 org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of
 broadcast_82
 at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
 at
 org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr
 oadcast.scala:164)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro
 adcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal
 a:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc
 ala:87)
 at
 org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
 at
 org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
 $3.apply(HashmapEnrichDStream.scala:39)
 at
 org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
 $3.apply(HashmapEnrichDStream.scala:39)
 at
 scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter
 .scala:202)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.
 scala:56)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6
 8)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4
 1)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav
 a:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja
 va:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: org.apache.spark.SparkException: Failed to get
 broadcast_82_piece0 of broadcast_82
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
 oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
 .scala:137)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
 oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
 .scala:137)
 at scala.Option.getOrElse(Option.scala:120)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
 oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc
 ala:136)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
 oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
 oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T
 orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$
 1.apply(TorrentBroadcast.scala:174)
 at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
 ... 25 more

 15/06/09 12:57:30 ERROR TaskSetManager: Task 2 in stage 5038.0 failed 4
 times; aborting job
 15/06/09 12:57:30 ERROR JobScheduler: Error running job streaming job
 143382585 ms.0





 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org