Hi Guys,

In the end, I am using below.
The trick is using "native python map" along with "spark spreaming
May not an elegent way, however it works :).

def predictScore(texts, modelRF):
    predictions = texts.map( lambda txt :  (txt , getFeatures(txt)) ).\
     map(lambda (txt, features) : (txt ,(features.split(','))) ).\
     map( lambda (txt, features) : (txt, ([float(i) for i in features])) ).\
     transform( lambda  rdd: sc.parallelize(\
       map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda
(x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\
     # in the transform operation: x=text and y=features
    # Retrun will be tuple of (score,'original text')
    return predictions

Hope, it will help somebody who is facing same problem.
If anybody has better idea, please post it here.


On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan <newvalu...@gmail.com>

> Dstream has an method foreachRDD, so you can walk through all RDDs inside
> DStream as you want.
> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html
> 2016-05-30 19:30 GMT+07:00 obaidul karim <obaidc...@gmail.com>:
>> Hi nguyen,
>> If I am not mistaken, we cannot call  "predict" on "dstream" as you have
>> suggested.
>> We have to use "transform" to be able to perform normal RDD operations on
>> dstreams and here I am trapped.
>> -Obaid
>> On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan <newvalu...@gmail.com>
>> wrote:
>>> How about this ?
>>> def extract_feature(rf_model, x):
>>> text = getFeatures(x).split(',')
>>> fea = [float(i) for i in text]
>>> prediction = rf_model.predict(fea)
>>> return (prediction, x)
>>> output = texts.map(lambda x: extract_feature(rf_model, x))
>>> 2016-05-30 14:17 GMT+07:00 obaidul karim <obaidc...@gmail.com>:
>>>> Hi,
>>>> Anybody has any idea on below?
>>>> -Obaid
>>>> On Friday, 27 May 2016, obaidul karim <obaidc...@gmail.com> wrote:
>>>>> Hi Guys,
>>>>> This is my first mail to spark users mailing list.
>>>>> I need help on Dstream operation.
>>>>> In fact, I am using a MLlib randomforest model to predict using spark
>>>>> streaming. In the end, I want to combine the feature Dstream & prediction
>>>>> Dstream together for further downstream processing.
>>>>> I am predicting using below piece of code:
>>>>> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x :
>>>>> x.split(',')).map( lambda parts : [float(i) for i in parts]
>>>>> ).transform(lambda rdd: rf_model.predict(rdd))
>>>>> Here texts is dstream having single line of text as records
>>>>> getFeatures generates a comma separated features extracted from each
>>>>> record
>>>>> I want the output as below tuple:
>>>>> ("predicted value", "original text")
>>>>> How can I achieve that ?
>>>>> or
>>>>> at least can I perform .zip like normal RDD operation on two Dstreams,
>>>>> like below:
>>>>> output = texts.zip(predictions)
>>>>> Thanks in advance.
>>>>> -Obaid

Reply via email to