Hi all,

I am trying to create RDDs from within /rdd.foreachPartition()/ so I can
save these RDDs to ElasticSearch on the fly :

stream.foreachRDD(rdd => {
        rdd.foreachPartition {
          iterator => {
            val sc = rdd.context
            iterator.foreach {
              case (cid, sid, ts) => {

                [...]
              
                sc.makeRDD(...).saveToEs(...) <----- *throws a
NullPointerException (sc is null)*
              }
            }
          }
        }
}

Unfortunately this doesn't work as I can't seem to be able to access the
SparkContext from anywhere within /foreachPartition()/. The code above
throws a NullPointerException, but if I change it to ssc.sc.makeRDD (where
ssc is the StreamingContext object created in the main function, outside of
/foreachPartition/) then I get a NotSerializableException.

What is the correct way to do this ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Creating-RDDs-from-within-foreachPartition-Spark-Streaming-tp21700.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to