You could do it like this:

val transformedFileAndTime = fileAndTime.transformWith(anomaly, (rdd1:
RDD[(String,String)], rdd2 : RDD[Int]) => {
                                                                  var first
= " "; var second = " "; var third = 0
                                                                  if
(rdd2.first<=3)

    {

            first = rdd1.map(_._1).first

            second = rdd1.map(_._2).first

            third = rdd2.first

    }
                                                                     *
 (first,second,third)*
                                                                })

Thanks
Best Regards

On Sat, Mar 7, 2015 at 2:28 AM, Laeeq Ahmed <laeeqsp...@yahoo.com.invalid>
wrote:

> Yes this is the problem. I want to return an RDD but it is abstract and I
> cannot instantiate it. So what are other options.
> I have two streams and I want to filter this stream on the basis of other
> and also want keep the value of other stream. I have also tried join. But
> one stream has more values than other in each sliding window and after join
> I get repetitions which I don't want.
>
> Regards,
> Laeeq
>
>
>   On Friday, March 6, 2015 8:11 PM, Sean Owen <so...@cloudera.com> wrote:
>
>
> What is this line supposed to mean?
>
> RDD[(first,second,third)]
>
> It's not valid as a line of code, and you don't instantiate RDDs anyway.
>
>
> On Fri, Mar 6, 2015 at 7:06 PM, Laeeq Ahmed
> <laeeqsp...@yahoo.com.invalid> wrote:
> > Hi,
> >
> > I am filtering first DStream with the value in second DStream. I also
> want
> > to keep the value of second Dstream. I have done the following and having
> > problem with returning new RDD:
> >
> > val transformedFileAndTime = fileAndTime.transformWith(anomaly, (rdd1:
> > RDD[(String,String)], rdd2 : RDD[Int]) => {
> >                                                                  var
> first
> > = " "; var second = " "; var third = 0
> >                                                                  if
> > (rdd2.first<=3)
> >
> > {
> >
> > first = rdd1.map(_._1).first
> >
> > second = rdd1.map(_._2).first
> >
> > third = rdd2.first
> >
> > }
> >
> > RDD[(first,second,third)]
> >                                                                })
> >
> > ERROR
> >
> /home/hduser/Projects/scalaad/src/main/scala/eeg/anomd/StreamAnomalyDetector.scala:119:
> > error: not found: value RDD
> > [ERROR] RDD[(first,second,third)]
> >
> > I am imported the import org.apache.spark.rdd.RDD
> >
> >
> > Regards,
> > Laeeq
> >
>
>
>

Reply via email to