You could do it like this: val transformedFileAndTime = fileAndTime.transformWith(anomaly, (rdd1: RDD[(String,String)], rdd2 : RDD[Int]) => { var first = " "; var second = " "; var third = 0 if (rdd2.first<=3)
{ first = rdd1.map(_._1).first second = rdd1.map(_._2).first third = rdd2.first } * (first,second,third)* }) Thanks Best Regards On Sat, Mar 7, 2015 at 2:28 AM, Laeeq Ahmed <laeeqsp...@yahoo.com.invalid> wrote: > Yes this is the problem. I want to return an RDD but it is abstract and I > cannot instantiate it. So what are other options. > I have two streams and I want to filter this stream on the basis of other > and also want keep the value of other stream. I have also tried join. But > one stream has more values than other in each sliding window and after join > I get repetitions which I don't want. > > Regards, > Laeeq > > > On Friday, March 6, 2015 8:11 PM, Sean Owen <so...@cloudera.com> wrote: > > > What is this line supposed to mean? > > RDD[(first,second,third)] > > It's not valid as a line of code, and you don't instantiate RDDs anyway. > > > On Fri, Mar 6, 2015 at 7:06 PM, Laeeq Ahmed > <laeeqsp...@yahoo.com.invalid> wrote: > > Hi, > > > > I am filtering first DStream with the value in second DStream. I also > want > > to keep the value of second Dstream. I have done the following and having > > problem with returning new RDD: > > > > val transformedFileAndTime = fileAndTime.transformWith(anomaly, (rdd1: > > RDD[(String,String)], rdd2 : RDD[Int]) => { > > var > first > > = " "; var second = " "; var third = 0 > > if > > (rdd2.first<=3) > > > > { > > > > first = rdd1.map(_._1).first > > > > second = rdd1.map(_._2).first > > > > third = rdd2.first > > > > } > > > > RDD[(first,second,third)] > > }) > > > > ERROR > > > /home/hduser/Projects/scalaad/src/main/scala/eeg/anomd/StreamAnomalyDetector.scala:119: > > error: not found: value RDD > > [ERROR] RDD[(first,second,third)] > > > > I am imported the import org.apache.spark.rdd.RDD > > > > > > Regards, > > Laeeq > > > > >