Hi Guys,

In the end, I am using below.
The trick is using "native python map" along with "spark spreaming
transform".
May not an elegent way, however it works :).

def predictScore(texts, modelRF):
    predictions = texts.map( lambda txt :  (txt , getFeatures(txt)) ).\
     map(lambda (txt, features) : (txt ,(features.split(','))) ).\
     map( lambda (txt, features) : (txt, ([float(i) for i in features])) ).\
     transform( lambda  rdd: sc.parallelize(\
       map( lambda x,y:(x,y), modelRF.predict(rdd.map(lambda
(x,y):y)).collect(),rdd.map(lambda (x,y):x).collect() )\
       )\
     )
     # in the transform operation: x=text and y=features
    # Retrun will be tuple of (score,'original text')
    return predictions


Hope, it will help somebody who is facing same problem.
If anybody has better idea, please post it here.

-Obaid

On Mon, May 30, 2016 at 8:43 PM, nguyen duc tuan <newvalu...@gmail.com>
wrote:

> Dstream has an method foreachRDD, so you can walk through all RDDs inside
> DStream as you want.
>
>
> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/streaming/dstream/DStream.html
>
> 2016-05-30 19:30 GMT+07:00 obaidul karim <obaidc...@gmail.com>:
>
>> Hi nguyen,
>>
>> If I am not mistaken, we cannot call  "predict" on "dstream" as you have
>> suggested.
>> We have to use "transform" to be able to perform normal RDD operations on
>> dstreams and here I am trapped.
>>
>> -Obaid
>>
>>
>>
>> On Mon, May 30, 2016 at 7:58 PM, nguyen duc tuan <newvalu...@gmail.com>
>> wrote:
>>
>>> How about this ?
>>>
>>> def extract_feature(rf_model, x):
>>> text = getFeatures(x).split(',')
>>> fea = [float(i) for i in text]
>>> prediction = rf_model.predict(fea)
>>> return (prediction, x)
>>> output = texts.map(lambda x: extract_feature(rf_model, x))
>>>
>>> 2016-05-30 14:17 GMT+07:00 obaidul karim <obaidc...@gmail.com>:
>>>
>>>> Hi,
>>>>
>>>> Anybody has any idea on below?
>>>>
>>>> -Obaid
>>>>
>>>>
>>>> On Friday, 27 May 2016, obaidul karim <obaidc...@gmail.com> wrote:
>>>>
>>>>> Hi Guys,
>>>>>
>>>>> This is my first mail to spark users mailing list.
>>>>>
>>>>> I need help on Dstream operation.
>>>>>
>>>>> In fact, I am using a MLlib randomforest model to predict using spark
>>>>> streaming. In the end, I want to combine the feature Dstream & prediction
>>>>> Dstream together for further downstream processing.
>>>>>
>>>>> I am predicting using below piece of code:
>>>>>
>>>>> predictions = texts.map( lambda x : getFeatures(x) ).map(lambda x :
>>>>> x.split(',')).map( lambda parts : [float(i) for i in parts]
>>>>> ).transform(lambda rdd: rf_model.predict(rdd))
>>>>>
>>>>> Here texts is dstream having single line of text as records
>>>>> getFeatures generates a comma separated features extracted from each
>>>>> record
>>>>>
>>>>>
>>>>> I want the output as below tuple:
>>>>> ("predicted value", "original text")
>>>>>
>>>>> How can I achieve that ?
>>>>> or
>>>>> at least can I perform .zip like normal RDD operation on two Dstreams,
>>>>> like below:
>>>>> output = texts.zip(predictions)
>>>>>
>>>>>
>>>>> Thanks in advance.
>>>>>
>>>>> -Obaid
>>>>>
>>>>
>>>
>>
>

Reply via email to