Hi nguyen, Thanks again. Yes, faltMap may do the trick as well. I may try it out.
I will let you know the result when done. On Tue, May 31, 2016 at 3:58 PM, nguyen duc tuan <newvalu...@gmail.com <javascript:_e(%7B%7D,'cvml','newvalu...@gmail.com');>> wrote: > 1. RandomForest 'predict' method supports both RDD or Vector as input ( > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.tree.model.RandomForestModel) > . So, in this case, function extract_feature should return > tuple.(prediction, rawtext). If each input text can creates a list of > vectors, try using "flatMap" instead of "map" > 2, 3: From spark documents: "*Discretized Stream* or *DStream* is the > basic abstraction provided by Spark Streaming. It represents a continuous > stream of data, either the input data stream received from source, or the > processed data stream generated by transforming the input stream. > Internally, it is represented by a continuous sequence of RDDs, which is > Spark’s abstraction of an immutable, distributed dataset. Each RDD in a > DStream contains data from a certain interval, as shown in the following > figure."( > https://spark.apache.org/docs/0.9.1/streaming-programming-guide.html) > > So, in order to handle a stream, you should handle each rdd in that > stream. This means with everything things want to do with your new data, > put them in 'process_rdd' function. There's nothing return in output of > 'foreachRdd' function, of course. > > 2016-05-31 14:39 GMT+07:00 obaidul karim <obaidc...@gmail.com > <javascript:_e(%7B%7D,'cvml','obaidc...@gmail.com');>>: > >> Hi nguyen, >> >> Thanks a lot for your time and really appreciate good suggestions. >> >> Please find my concerns in line below: >> >> def extract_feature(rf_model, x): >> text = getFeatures(x).split(',') >> fea = [float(i) for i in text] >> prediction = rf_model.predict(fea) >> return (prediction, x) <<< this will return two separate list as tuple, >> but i want one to one mapping (pred, text) not (predlist, textlist) >> >> def process_rdd(rdd): >> fea = rdd.map(lambda x: extract_feature(rf_model, x)) >> //do something as you want (saving,...) <<< I want to avoid saving >> to external system(definitely not in global variable). As I said, it could >> be an overhead considering streaming. >> >> stream.foreachRDD(process_rdd) <<< As you can see here, no variable to >> store the output from foreachRDD. My target is to get (pred, text) pair and >> then use >> >> Whatever it is, the output from "extract_feature" is not what I want. >> I will be more than happy if you please correct my mistakes here. >> >> >> -Obaid >> >> On Tue, May 31, 2016 at 2:04 PM, nguyen duc tuan <newvalu...@gmail.com >> <javascript:_e(%7B%7D,'cvml','newvalu...@gmail.com');>> wrote: >> >>> I'm not sure what do you mean by saying "does not return any value". >>> How do you use this method? >>> I will use this method as following : >>> def extract_feature(rf_model, x): >>> text = getFeatures(x).split(',') >>> fea = [float(i) for i in text] >>> prediction = rf_model.predict(fea) >>> return (prediction, x) >>> >>> def process_rdd(rdd): >>> fea = rdd.map(lambda x: extract_feature(rf_model, x)) >>> //do something as you want (saving,...) >>> >>> stream.foreachRDD(process_rdd) >>> >>> 2016-05-31 12:57 GMT+07:00 obaidul karim <obaidc...@gmail.com >>> <javascript:_e(%7B%7D,'cvml','obaidc...@gmail.com');>>: >>> >>>> foreachRDD does not return any value. I can be used just to send result >>>> to another place/context, like db,file etc. >>>> I could use that but seems like over head of having another hop. >>>> I wanted to make it simple and light. >>>> >>>> >>>> On Tuesday, 31 May 2016, nguyen duc tuan <newvalu...@gmail.com >>>> <javascript:_e(%7B%7D,'cvml','newvalu...@gmail.com');>> wrote: >>>> >>>>> How about using foreachRDD ? I think this is much better than your >>>>> trick. >>>>> >>>>> >>>>> 2016-05-31 12:32 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >>>>> >>>>>> Hi Guys, >>>>>> >>>>>> In the end, I am using below. >>>>>> The trick is using "native python map" along with "spark spreaming >>>>>> transform". >>>>>> May not an elegent way, however it works :). >>>>>> >>>>>> def predictScore(texts, modelRF): >>>>>> predictions = texts.map( lambda txt : (txt , getFeatures(txt)) >>>>>> ).\ >>>>>> map(lambda (txt, features) : (txt ,(features.split(','))) ).\ >>>>>> map( lambda (txt, features) : (txt, ([float(i) for i in >>>>>> features])) ).\ >>>>>> transform( lambda rdd: sc.parallelize(\ >>>>>> map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda >>>>>> (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\ >>>>>> )\ >>>>>> ) >>>>>> # in the transform operation: x=text and y=features >>>>>> # Retrun will be tuple of (score,'original text') >>>>>> return predictions >>>>>> >>>>>> >>>>>> Hope, it will help somebody who is facing same problem. >>>>>> If anybody has better idea, please post it here. >>>>>> >>>>>> -Obaid >>>>>> >>>>>> On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan < >>>>>> newvalu...@gmail.com> wrote: >>>>>> >>>>>>> Dstream has an method foreachRDD, so you can walk through all RDDs >>>>>>> inside DStream as you want. >>>>>>> >>>>>>> >>>>>>> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html >>>>>>> >>>>>>> 2016-05-30 19:30 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >>>>>>> >>>>>>>> Hi nguyen, >>>>>>>> >>>>>>>> If I am not mistaken, we cannot call "predict" on "dstream" as >>>>>>>> you have suggested. >>>>>>>> We have to use "transform" to be able to perform normal RDD >>>>>>>> operations on dstreams and here I am trapped. >>>>>>>> >>>>>>>> -Obaid >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan < >>>>>>>> newvalu...@gmail.com> wrote: >>>>>>>> >>>>>>>>> How about this ? >>>>>>>>> >>>>>>>>> def extract_feature(rf_model, x): >>>>>>>>> text = getFeatures(x).split(',') >>>>>>>>> fea = [float(i) for i in text] >>>>>>>>> prediction = rf_model.predict(fea) >>>>>>>>> return (prediction, x) >>>>>>>>> output = texts.map(lambda x: extract_feature(rf_model, x)) >>>>>>>>> >>>>>>>>> 2016-05-30 14:17 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Anybody has any idea on below? >>>>>>>>>> >>>>>>>>>> -Obaid >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Friday, 27 May 2016, obaidul karim <obaidc...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Guys, >>>>>>>>>>> >>>>>>>>>>> This is my first mail to spark users mailing list. >>>>>>>>>>> >>>>>>>>>>> I need help on Dstream operation. >>>>>>>>>>> >>>>>>>>>>> In fact, I am using a MLlib randomforest model to predict using >>>>>>>>>>> spark streaming. In the end, I want to combine the feature Dstream & >>>>>>>>>>> prediction Dstream together for further downstream processing. >>>>>>>>>>> >>>>>>>>>>> I am predicting using below piece of code: >>>>>>>>>>> >>>>>>>>>>> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda >>>>>>>>>>> x : x.split(',')).map( lambda parts : [float(i) for i in parts] >>>>>>>>>>> ).transform(lambda rdd: rf_model.predict(rdd)) >>>>>>>>>>> >>>>>>>>>>> Here texts is dstream having single line of text as records >>>>>>>>>>> getFeatures generates a comma separated features extracted from >>>>>>>>>>> each record >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I want the output as below tuple: >>>>>>>>>>> ("predicted value", "original text") >>>>>>>>>>> >>>>>>>>>>> How can I achieve that ? >>>>>>>>>>> or >>>>>>>>>>> at least can I perform .zip like normal RDD operation on two >>>>>>>>>>> Dstreams, like below: >>>>>>>>>>> output = texts.zip(predictions) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thanks in advance. >>>>>>>>>>> >>>>>>>>>>> -Obaid >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>> >> >