Re: Spark Streaming: Combine MLlib Prediction and Features on Dstreams
Hi, Anybody has any idea on below? -Obaid On Friday, 27 May 2016, obaidul karim <obaidc...@gmail.com> wrote: > Hi Guys, > > This is my first mail to spark users mailing list. > > I need help on Dstream operation. > > In fact, I am using a MLlib randomforest model to predict using spark > streaming. In the end, I want to combine the feature Dstream & prediction > Dstream together for further downstream processing. > > I am predicting using below piece of code: > > predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x : > x.split(',')).map( lambda parts : [float(i) for i in parts] > ).transform(lambda rdd: rf_model.predict(rdd)) > > Here texts is dstream having single line of text as records > getFeatures generates a comma separated features extracted from each record > > > I want the output as below tuple: > ("predicted value", "original text") > > How can I achieve that ? > or > at least can I perform .zip like normal RDD operation on two Dstreams, > like below: > output = texts.zip(predictions) > > > Thanks in advance. > > -Obaid >
Re: Spark Streaming: Combine MLlib Prediction and Features on Dstreams
Hi Guys, In the end, I am using below. The trick is using "native python map" along with "spark spreaming transform". May not an elegent way, however it works :). def predictScore(texts, modelRF): predictions = texts.map( lambda txt : (txt , getFeatures(txt)) ).\ map(lambda (txt, features) : (txt ,(features.split(','))) ).\ map( lambda (txt, features) : (txt, ([float(i) for i in features])) ).\ transform( lambda rdd: sc.parallelize(\ map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\ )\ ) # in the transform operation: x=text and y=features # Retrun will be tuple of (score,'original text') return predictions Hope, it will help somebody who is facing same problem. If anybody has better idea, please post it here. -Obaid On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan <newvalu...@gmail.com> wrote: > Dstream has an method foreachRDD, so you can walk through all RDDs inside > DStream as you want. > > > https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html > > 2016-05-30 19:30 GMT+07:00 obaidul karim <obaidc...@gmail.com>: > >> Hi nguyen, >> >> If I am not mistaken, we cannot call "predict" on "dstream" as you have >> suggested. >> We have to use "transform" to be able to perform normal RDD operations on >> dstreams and here I am trapped. >> >> -Obaid >> >> >> >> On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan <newvalu...@gmail.com> >> wrote: >> >>> How about this ? >>> >>> def extract_feature(rf_model, x): >>> text = getFeatures(x).split(',') >>> fea = [float(i) for i in text] >>> prediction = rf_model.predict(fea) >>> return (prediction, x) >>> output = texts.map(lambda x: extract_feature(rf_model, x)) >>> >>> 2016-05-30 14:17 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >>> >>>> Hi, >>>> >>>> Anybody has any idea on below? >>>> >>>> -Obaid >>>> >>>> >>>> On Friday, 27 May 2016, obaidul karim <obaidc...@gmail.com> wrote: >>>> >>>>> Hi Guys, >>>>> >>>>> This is my first mail to spark users mailing list. >>>>> >>>>> I need help on Dstream operation. >>>>> >>>>> In fact, I am using a MLlib randomforest model to predict using spark >>>>> streaming. In the end, I want to combine the feature Dstream & prediction >>>>> Dstream together for further downstream processing. >>>>> >>>>> I am predicting using below piece of code: >>>>> >>>>> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x : >>>>> x.split(',')).map( lambda parts : [float(i) for i in parts] >>>>> ).transform(lambda rdd: rf_model.predict(rdd)) >>>>> >>>>> Here texts is dstream having single line of text as records >>>>> getFeatures generates a comma separated features extracted from each >>>>> record >>>>> >>>>> >>>>> I want the output as below tuple: >>>>> ("predicted value", "original text") >>>>> >>>>> How can I achieve that ? >>>>> or >>>>> at least can I perform .zip like normal RDD operation on two Dstreams, >>>>> like below: >>>>> output = texts.zip(predictions) >>>>> >>>>> >>>>> Thanks in advance. >>>>> >>>>> -Obaid >>>>> >>>> >>> >> >
Re: Spark Streaming: Combine MLlib Prediction and Features on Dstreams
foreachRDD does not return any value. I can be used just to send result to another place/context, like db,file etc. I could use that but seems like over head of having another hop. I wanted to make it simple and light. On Tuesday, 31 May 2016, nguyen duc tuan <newvalu...@gmail.com> wrote: > How about using foreachRDD ? I think this is much better than your trick. > > > 2016-05-31 12:32 GMT+07:00 obaidul karim <obaidc...@gmail.com > <javascript:_e(%7B%7D,'cvml','obaidc...@gmail.com');>>: > >> Hi Guys, >> >> In the end, I am using below. >> The trick is using "native python map" along with "spark spreaming >> transform". >> May not an elegent way, however it works :). >> >> def predictScore(texts, modelRF): >> predictions = texts.map( lambda txt : (txt , getFeatures(txt)) ).\ >> map(lambda (txt, features) : (txt ,(features.split(','))) ).\ >> map( lambda (txt, features) : (txt, ([float(i) for i in features])) >> ).\ >> transform( lambda rdd: sc.parallelize(\ >>map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda >> (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\ >>)\ >> ) >> # in the transform operation: x=text and y=features >> # Retrun will be tuple of (score,'original text') >> return predictions >> >> >> Hope, it will help somebody who is facing same problem. >> If anybody has better idea, please post it here. >> >> -Obaid >> >> On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan <newvalu...@gmail.com >> <javascript:_e(%7B%7D,'cvml','newvalu...@gmail.com');>> wrote: >> >>> Dstream has an method foreachRDD, so you can walk through all RDDs >>> inside DStream as you want. >>> >>> >>> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html >>> >>> 2016-05-30 19:30 GMT+07:00 obaidul karim <obaidc...@gmail.com >>> <javascript:_e(%7B%7D,'cvml','obaidc...@gmail.com');>>: >>> >>>> Hi nguyen, >>>> >>>> If I am not mistaken, we cannot call "predict" on "dstream" as you >>>> have suggested. >>>> We have to use "transform" to be able to perform normal RDD operations >>>> on dstreams and here I am trapped. >>>> >>>> -Obaid >>>> >>>> >>>> >>>> On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan <newvalu...@gmail.com >>>> <javascript:_e(%7B%7D,'cvml','newvalu...@gmail.com');>> wrote: >>>> >>>>> How about this ? >>>>> >>>>> def extract_feature(rf_model, x): >>>>> text = getFeatures(x).split(',') >>>>> fea = [float(i) for i in text] >>>>> prediction = rf_model.predict(fea) >>>>> return (prediction, x) >>>>> output = texts.map(lambda x: extract_feature(rf_model, x)) >>>>> >>>>> 2016-05-30 14:17 GMT+07:00 obaidul karim <obaidc...@gmail.com >>>>> <javascript:_e(%7B%7D,'cvml','obaidc...@gmail.com');>>: >>>>> >>>>>> Hi, >>>>>> >>>>>> Anybody has any idea on below? >>>>>> >>>>>> -Obaid >>>>>> >>>>>> >>>>>> On Friday, 27 May 2016, obaidul karim <obaidc...@gmail.com >>>>>> <javascript:_e(%7B%7D,'cvml','obaidc...@gmail.com');>> wrote: >>>>>> >>>>>>> Hi Guys, >>>>>>> >>>>>>> This is my first mail to spark users mailing list. >>>>>>> >>>>>>> I need help on Dstream operation. >>>>>>> >>>>>>> In fact, I am using a MLlib randomforest model to predict using >>>>>>> spark streaming. In the end, I want to combine the feature Dstream & >>>>>>> prediction Dstream together for further downstream processing. >>>>>>> >>>>>>> I am predicting using below piece of code: >>>>>>> >>>>>>> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x : >>>>>>> x.split(',')).map( lambda parts : [float(i) for i in parts] >>>>>>> ).transform(lambda rdd: rf_model.predict(rdd)) >>>>>>> >>>>>>> Here texts is dstream having single line of text as records >>>>>>> getFeatures generates a comma separated features extracted from each >>>>>>> record >>>>>>> >>>>>>> >>>>>>> I want the output as below tuple: >>>>>>> ("predicted value", "original text") >>>>>>> >>>>>>> How can I achieve that ? >>>>>>> or >>>>>>> at least can I perform .zip like normal RDD operation on two >>>>>>> Dstreams, like below: >>>>>>> output = texts.zip(predictions) >>>>>>> >>>>>>> >>>>>>> Thanks in advance. >>>>>>> >>>>>>> -Obaid >>>>>>> >>>>>> >>>>> >>>> >>> >> >
Re: Spark Streaming: Combine MLlib Prediction and Features on Dstreams
Hi nguyen, Thanks a lot for your time and really appreciate good suggestions. Please find my concerns in line below: def extract_feature(rf_model, x): text = getFeatures(x).split(',') fea = [float(i) for i in text] prediction = rf_model.predict(fea) return (prediction, x) <<< this will return two separate list as tuple, but i want one to one mapping (pred, text) not (predlist, textlist) def process_rdd(rdd): fea = rdd.map(lambda x: extract_feature(rf_model, x)) //do something as you want (saving,...) <<< I want to avoid saving to external system(definitely not in global variable). As I said, it could be an overhead considering streaming. stream.foreachRDD(process_rdd) <<< As you can see here, no variable to store the output from foreachRDD. My target is to get (pred, text) pair and then use Whatever it is, the output from "extract_feature" is not what I want. I will be more than happy if you please correct my mistakes here. -Obaid On Tue, May 31, 2016 at 2:04 PM, nguyen duc tuan <newvalu...@gmail.com> wrote: > I'm not sure what do you mean by saying "does not return any value". How > do you use this method? > I will use this method as following : > def extract_feature(rf_model, x): > text = getFeatures(x).split(',') > fea = [float(i) for i in text] > prediction = rf_model.predict(fea) > return (prediction, x) > > def process_rdd(rdd): > fea = rdd.map(lambda x: extract_feature(rf_model, x)) > //do something as you want (saving,...) > > stream.foreachRDD(process_rdd) > > 2016-05-31 12:57 GMT+07:00 obaidul karim <obaidc...@gmail.com>: > >> foreachRDD does not return any value. I can be used just to send result >> to another place/context, like db,file etc. >> I could use that but seems like over head of having another hop. >> I wanted to make it simple and light. >> >> >> On Tuesday, 31 May 2016, nguyen duc tuan <newvalu...@gmail.com> wrote: >> >>> How about using foreachRDD ? I think this is much better than your trick. >>> >>> >>> 2016-05-31 12:32 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >>> >>>> Hi Guys, >>>> >>>> In the end, I am using below. >>>> The trick is using "native python map" along with "spark spreaming >>>> transform". >>>> May not an elegent way, however it works :). >>>> >>>> def predictScore(texts, modelRF): >>>> predictions = texts.map( lambda txt : (txt , getFeatures(txt)) ).\ >>>> map(lambda (txt, features) : (txt ,(features.split(','))) ).\ >>>> map( lambda (txt, features) : (txt, ([float(i) for i in >>>> features])) ).\ >>>> transform( lambda rdd: sc.parallelize(\ >>>>map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda >>>> (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\ >>>>)\ >>>> ) >>>> # in the transform operation: x=text and y=features >>>> # Retrun will be tuple of (score,'original text') >>>> return predictions >>>> >>>> >>>> Hope, it will help somebody who is facing same problem. >>>> If anybody has better idea, please post it here. >>>> >>>> -Obaid >>>> >>>> On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan <newvalu...@gmail.com> >>>> wrote: >>>> >>>>> Dstream has an method foreachRDD, so you can walk through all RDDs >>>>> inside DStream as you want. >>>>> >>>>> >>>>> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html >>>>> >>>>> 2016-05-30 19:30 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >>>>> >>>>>> Hi nguyen, >>>>>> >>>>>> If I am not mistaken, we cannot call "predict" on "dstream" as you >>>>>> have suggested. >>>>>> We have to use "transform" to be able to perform normal RDD >>>>>> operations on dstreams and here I am trapped. >>>>>> >>>>>> -Obaid >>>>>> >>>>>> >>>>>> >>>>>> On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan < >>>>>> newvalu...@gmail.com> wrote: >>>>>> >>>>>>> How about this ? >>>>>>> >>>>>>> def extract_feature(rf_model, x): >>>>>
Spark Streaming: Combine MLlib Prediction and Features on Dstreams
Hi nguyen, Thanks again. Yes, faltMap may do the trick as well. I may try it out. I will let you know the result when done. On Tue, May 31, 2016 at 3:58 PM, nguyen duc tuan <newvalu...@gmail.com <javascript:_e(%7B%7D,'cvml','newvalu...@gmail.com');>> wrote: > 1. RandomForest 'predict' method supports both RDD or Vector as input ( > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.tree.model.RandomForestModel) > . So, in this case, function extract_feature should return > tuple.(prediction, rawtext). If each input text can creates a list of > vectors, try using "flatMap" instead of "map" > 2, 3: From spark documents: "*Discretized Stream* or *DStream* is the > basic abstraction provided by Spark Streaming. It represents a continuous > stream of data, either the input data stream received from source, or the > processed data stream generated by transforming the input stream. > Internally, it is represented by a continuous sequence of RDDs, which is > Spark’s abstraction of an immutable, distributed dataset. Each RDD in a > DStream contains data from a certain interval, as shown in the following > figure."( > https://spark.apache.org/docs/0.9.1/streaming-programming-guide.html) > > So, in order to handle a stream, you should handle each rdd in that > stream. This means with everything things want to do with your new data, > put them in 'process_rdd' function. There's nothing return in output of > 'foreachRdd' function, of course. > > 2016-05-31 14:39 GMT+07:00 obaidul karim <obaidc...@gmail.com > <javascript:_e(%7B%7D,'cvml','obaidc...@gmail.com');>>: > >> Hi nguyen, >> >> Thanks a lot for your time and really appreciate good suggestions. >> >> Please find my concerns in line below: >> >> def extract_feature(rf_model, x): >> text = getFeatures(x).split(',') >> fea = [float(i) for i in text] >> prediction = rf_model.predict(fea) >> return (prediction, x) <<< this will return two separate list as tuple, >> but i want one to one mapping (pred, text) not (predlist, textlist) >> >> def process_rdd(rdd): >> fea = rdd.map(lambda x: extract_feature(rf_model, x)) >> //do something as you want (saving,...) <<< I want to avoid saving >> to external system(definitely not in global variable). As I said, it could >> be an overhead considering streaming. >> >> stream.foreachRDD(process_rdd) <<< As you can see here, no variable to >> store the output from foreachRDD. My target is to get (pred, text) pair and >> then use >> >> Whatever it is, the output from "extract_feature" is not what I want. >> I will be more than happy if you please correct my mistakes here. >> >> >> -Obaid >> >> On Tue, May 31, 2016 at 2:04 PM, nguyen duc tuan <newvalu...@gmail.com >> <javascript:_e(%7B%7D,'cvml','newvalu...@gmail.com');>> wrote: >> >>> I'm not sure what do you mean by saying "does not return any value". >>> How do you use this method? >>> I will use this method as following : >>> def extract_feature(rf_model, x): >>> text = getFeatures(x).split(',') >>> fea = [float(i) for i in text] >>> prediction = rf_model.predict(fea) >>> return (prediction, x) >>> >>> def process_rdd(rdd): >>> fea = rdd.map(lambda x: extract_feature(rf_model, x)) >>> //do something as you want (saving,...) >>> >>> stream.foreachRDD(process_rdd) >>> >>> 2016-05-31 12:57 GMT+07:00 obaidul karim <obaidc...@gmail.com >>> <javascript:_e(%7B%7D,'cvml','obaidc...@gmail.com');>>: >>> >>>> foreachRDD does not return any value. I can be used just to send result >>>> to another place/context, like db,file etc. >>>> I could use that but seems like over head of having another hop. >>>> I wanted to make it simple and light. >>>> >>>> >>>> On Tuesday, 31 May 2016, nguyen duc tuan <newvalu...@gmail.com >>>> <javascript:_e(%7B%7D,'cvml','newvalu...@gmail.com');>> wrote: >>>> >>>>> How about using foreachRDD ? I think this is much better than your >>>>> trick. >>>>> >>>>> >>>>> 2016-05-31 12:32 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >>>>> >>>>>> Hi Guys, >>>>>> >>>>>> In the end, I am using below. >>>>>> The trick is using "native python map" along with "spark spreaming >>
Spark Streaming: Combine MLlib Prediction and Features on Dstreams
Hi Guys, This is my first mail to spark users mailing list. I need help on Dstream operation. In fact, I am using a MLlib randomforest model to predict using spark streaming. In the end, I want to combine the feature Dstream & prediction Dstream together for further downstream processing. I am predicting using below piece of code: predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x : x.split(',')).map( lambda parts : [float(i) for i in parts] ).transform(lambda rdd: rf_model.predict(rdd)) Here texts is dstream having single line of text as records getFeatures generates a comma separated features extracted from each record I want the output as below tuple: ("predicted value", "original text") How can I achieve that ? or at least can I perform .zip like normal RDD operation on two Dstreams, like below: output = texts.zip(predictions) Thanks in advance. -Obaid
Re: Spark Streaming: Combine MLlib Prediction and Features on Dstreams
Sorry for lots of typos (writing from mobile) On Tuesday, 31 May 2016, obaidul karim <obaidc...@gmail.com> wrote: > foreachRDD does not return any value. I can be used just to send result to > another place/context, like db,file etc. > I could use that but seems like over head of having another hop. > I wanted to make it simple and light. > > On Tuesday, 31 May 2016, nguyen duc tuan <newvalu...@gmail.com > <javascript:_e(%7B%7D,'cvml','newvalu...@gmail.com');>> wrote: > >> How about using foreachRDD ? I think this is much better than your trick. >> >> >> 2016-05-31 12:32 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >> >>> Hi Guys, >>> >>> In the end, I am using below. >>> The trick is using "native python map" along with "spark spreaming >>> transform". >>> May not an elegent way, however it works :). >>> >>> def predictScore(texts, modelRF): >>> predictions = texts.map( lambda txt : (txt , getFeatures(txt)) ).\ >>> map(lambda (txt, features) : (txt ,(features.split(','))) ).\ >>> map( lambda (txt, features) : (txt, ([float(i) for i in features])) >>> ).\ >>> transform( lambda rdd: sc.parallelize(\ >>>map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda >>> (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\ >>>)\ >>> ) >>> # in the transform operation: x=text and y=features >>> # Retrun will be tuple of (score,'original text') >>> return predictions >>> >>> >>> Hope, it will help somebody who is facing same problem. >>> If anybody has better idea, please post it here. >>> >>> -Obaid >>> >>> On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan <newvalu...@gmail.com> >>> wrote: >>> >>>> Dstream has an method foreachRDD, so you can walk through all RDDs >>>> inside DStream as you want. >>>> >>>> >>>> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html >>>> >>>> 2016-05-30 19:30 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >>>> >>>>> Hi nguyen, >>>>> >>>>> If I am not mistaken, we cannot call "predict" on "dstream" as you >>>>> have suggested. >>>>> We have to use "transform" to be able to perform normal RDD operations >>>>> on dstreams and here I am trapped. >>>>> >>>>> -Obaid >>>>> >>>>> >>>>> >>>>> On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan <newvalu...@gmail.com >>>>> > wrote: >>>>> >>>>>> How about this ? >>>>>> >>>>>> def extract_feature(rf_model, x): >>>>>> text = getFeatures(x).split(',') >>>>>> fea = [float(i) for i in text] >>>>>> prediction = rf_model.predict(fea) >>>>>> return (prediction, x) >>>>>> output = texts.map(lambda x: extract_feature(rf_model, x)) >>>>>> >>>>>> 2016-05-30 14:17 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Anybody has any idea on below? >>>>>>> >>>>>>> -Obaid >>>>>>> >>>>>>> >>>>>>> On Friday, 27 May 2016, obaidul karim <obaidc...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Guys, >>>>>>>> >>>>>>>> This is my first mail to spark users mailing list. >>>>>>>> >>>>>>>> I need help on Dstream operation. >>>>>>>> >>>>>>>> In fact, I am using a MLlib randomforest model to predict using >>>>>>>> spark streaming. In the end, I want to combine the feature Dstream & >>>>>>>> prediction Dstream together for further downstream processing. >>>>>>>> >>>>>>>> I am predicting using below piece of code: >>>>>>>> >>>>>>>> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x : >>>>>>>> x.split(',')).map( lambda parts : [float(i) for i in parts] >>>>>>>> ).transform(lambda rdd: rf_model.predict(rdd)) >>>>>>>> >>>>>>>> Here texts is dstream having single line of text as records >>>>>>>> getFeatures generates a comma separated features extracted from >>>>>>>> each record >>>>>>>> >>>>>>>> >>>>>>>> I want the output as below tuple: >>>>>>>> ("predicted value", "original text") >>>>>>>> >>>>>>>> How can I achieve that ? >>>>>>>> or >>>>>>>> at least can I perform .zip like normal RDD operation on two >>>>>>>> Dstreams, like below: >>>>>>>> output = texts.zip(predictions) >>>>>>>> >>>>>>>> >>>>>>>> Thanks in advance. >>>>>>>> >>>>>>>> -Obaid >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>