Hi all,
I am trying to create RDDs from within /rdd.foreachPartition()/ so I can
save these RDDs to ElasticSearch on the fly :
stream.foreachRDD(rdd => {
rdd.foreachPartition {
iterator => {
val sc = rdd.context
iterator.foreach {
case (cid, sid, ts) => {
[...]
sc.makeRDD(...).saveToEs(...) <----- *throws a
NullPointerException (sc is null)*
}
}
}
}
}
Unfortunately this doesn't work as I can't seem to be able to access the
SparkContext from anywhere within /foreachPartition()/. The code above
throws a NullPointerException, but if I change it to ssc.sc.makeRDD (where
ssc is the StreamingContext object created in the main function, outside of
/foreachPartition/) then I get a NotSerializableException.
What is the correct way to do this ?
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Creating-RDDs-from-within-foreachPartition-Spark-Streaming-tp21700.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]