Hi Guys, In the end, I am using below. The trick is using "native python map" along with "spark spreaming transform". May not an elegent way, however it works :).
def predictScore(texts, modelRF): predictions = texts.map( lambda txt : (txt , getFeatures(txt)) ).\ map(lambda (txt, features) : (txt ,(features.split(','))) ).\ map( lambda (txt, features) : (txt, ([float(i) for i in features])) ).\ transform( lambda rdd: sc.parallelize(\ map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda (x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\ )\ ) # in the transform operation: x=text and y=features # Retrun will be tuple of (score,'original text') return predictions Hope, it will help somebody who is facing same problem. If anybody has better idea, please post it here. -Obaid On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan <newvalu...@gmail.com> wrote: > Dstream has an method foreachRDD, so you can walk through all RDDs inside > DStream as you want. > > > https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html > > 2016-05-30 19:30 GMT+07:00 obaidul karim <obaidc...@gmail.com>: > >> Hi nguyen, >> >> If I am not mistaken, we cannot call "predict" on "dstream" as you have >> suggested. >> We have to use "transform" to be able to perform normal RDD operations on >> dstreams and here I am trapped. >> >> -Obaid >> >> >> >> On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan <newvalu...@gmail.com> >> wrote: >> >>> How about this ? >>> >>> def extract_feature(rf_model, x): >>> text = getFeatures(x).split(',') >>> fea = [float(i) for i in text] >>> prediction = rf_model.predict(fea) >>> return (prediction, x) >>> output = texts.map(lambda x: extract_feature(rf_model, x)) >>> >>> 2016-05-30 14:17 GMT+07:00 obaidul karim <obaidc...@gmail.com>: >>> >>>> Hi, >>>> >>>> Anybody has any idea on below? >>>> >>>> -Obaid >>>> >>>> >>>> On Friday, 27 May 2016, obaidul karim <obaidc...@gmail.com> wrote: >>>> >>>>> Hi Guys, >>>>> >>>>> This is my first mail to spark users mailing list. >>>>> >>>>> I need help on Dstream operation. >>>>> >>>>> In fact, I am using a MLlib randomforest model to predict using spark >>>>> streaming. In the end, I want to combine the feature Dstream & prediction >>>>> Dstream together for further downstream processing. >>>>> >>>>> I am predicting using below piece of code: >>>>> >>>>> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x : >>>>> x.split(',')).map( lambda parts : [float(i) for i in parts] >>>>> ).transform(lambda rdd: rf_model.predict(rdd)) >>>>> >>>>> Here texts is dstream having single line of text as records >>>>> getFeatures generates a comma separated features extracted from each >>>>> record >>>>> >>>>> >>>>> I want the output as below tuple: >>>>> ("predicted value", "original text") >>>>> >>>>> How can I achieve that ? >>>>> or >>>>> at least can I perform .zip like normal RDD operation on two Dstreams, >>>>> like below: >>>>> output = texts.zip(predictions) >>>>> >>>>> >>>>> Thanks in advance. >>>>> >>>>> -Obaid >>>>> >>>> >>> >> >