Problem solved by creating only one RDD. > On Jun 1, 2016, at 14:05, Cyril Scetbon <cyril.scet...@free.fr> wrote: > > It seems that to join a DStream with a RDD I can use : > > mgs.transform(rdd => rdd.join(rdd1)) > > or > > mgs.foreachRDD(rdd => rdd.join(rdd1)) > > But, I can't see why rdd1.toDF("id","aid") really causes SPARK-5063 > > >> On Jun 1, 2016, at 12:00, Cyril Scetbon <cyril.scet...@free.fr> wrote: >> >> Hi guys, >> >> I have a 2 input data streams that I want to join using Dataframes and >> unfortunately I get the message produced by >> https://issues.apache.org/jira/browse/SPARK-5063 as I can't reference rdd1 >> in (2) : >> >> (1) >> val rdd1 = sc.esRDD(es_resource.toLowerCase, query) >> .map(r => (r._1, r._2)) >> >> (2) >> mgs.map(x => x._1) >> .foreachRDD { rdd => >> val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) >> import sqlContext.implicits._ >> >> val df_aids = rdd.toDF("id") >> >> val df = rdd1.toDF("id", "aid") >> >> df.select(explode(df("aid")).as("aid"), df("id")) >> .join(df_aids, $"aid" === df_aids("id")) >> .select(df("id"), df_aids("id")) >> ..... >> } >> >> Is there a way to still use Dataframes to do it or I need to do everything >> using RDDs join only ? >> And If I need to use only RDDs join, how to do it ? as I have a RDD (rdd1) >> and a DStream (mgs) ? >> >> Thanks >> -- >> Cyril SCETBON >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org >
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org