You can't use RDDs inside RDDs. RDDs are managed from the driver, and
functions like foreachRDD execute things on the remote executors.

You can write code to simply directly save whatever you want to ES.
There is not necessarily a need to use RDDs for that.

On Wed, Feb 18, 2015 at 11:36 AM, t1ny <[email protected]> wrote:
> Hi all,
>
> I am trying to create RDDs from within /rdd.foreachPartition()/ so I can
> save these RDDs to ElasticSearch on the fly :
>
> stream.foreachRDD(rdd => {
>         rdd.foreachPartition {
>           iterator => {
>             val sc = rdd.context
>             iterator.foreach {
>               case (cid, sid, ts) => {
>
>                 [...]
>
>                 sc.makeRDD(...).saveToEs(...) <----- *throws a
> NullPointerException (sc is null)*
>               }
>             }
>           }
>         }
> }
>
> Unfortunately this doesn't work as I can't seem to be able to access the
> SparkContext from anywhere within /foreachPartition()/. The code above
> throws a NullPointerException, but if I change it to ssc.sc.makeRDD (where
> ssc is the StreamingContext object created in the main function, outside of
> /foreachPartition/) then I get a NotSerializableException.
>
> What is the correct way to do this ?
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Creating-RDDs-from-within-foreachPartition-Spark-Streaming-tp21700.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to