Checkpoints in SparkStreaming

2015-07-28 Thread Guillermo Ortiz
I'm using SparkStreaming and I want to configure checkpoint to manage
fault-tolerance.
I've been reading the documentation. Is it necessary to create and
configure the InputDSStream in the getOrCreate function?

I checked the example in
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
and it looks like it does everything inside of the function. Should I put
all the logic of the application inside on it?? I think that that's not the
way...

If I just create the context I got an error:
Exception in thread main org.apache.spark.SparkException:
org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e12a5a6 has not
been initialized
at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
at
org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51)


I'm not pretty good with Scala.. the code that I did
  def functionToCreateContext(): StreamingContext = {
val sparkConf = new SparkConf().setMaster(local[2]).setAppName(app)
val ssc = new StreamingContext(sparkConf, Seconds(5))   // new context

ssc.checkpoint(/tmp/spark/metricsCheckpoint)   // set checkpoint
directory
ssc
  }


val ssc = StreamingContext.getOrCreate(/tmp/spark/metricsCheckpoint,
functionToCreateContext _)
val kafkaParams = Map[String, String](metadata.broker.list - args(0))
val topics = args(1).split(\\,)
val directKafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet)

directKafkaStream.foreachRDD { rdd = ...


Re: Checkpoints in SparkStreaming

2015-07-28 Thread Cody Koeninger
Yes, you need to follow the documentation.  Configure your stream,
including the transformations made to it, inside the getOrCreate function.

On Tue, Jul 28, 2015 at 3:14 AM, Guillermo Ortiz konstt2...@gmail.com
wrote:

 I'm using SparkStreaming and I want to configure checkpoint to manage
 fault-tolerance.
 I've been reading the documentation. Is it necessary to create and
 configure the InputDSStream in the getOrCreate function?

 I checked the example in
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
 and it looks like it does everything inside of the function. Should I put
 all the logic of the application inside on it?? I think that that's not the
 way...

 If I just create the context I got an error:
 Exception in thread main org.apache.spark.SparkException:
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e12a5a6 has not
 been initialized
 at
 org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
 at
 org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51)


 I'm not pretty good with Scala.. the code that I did
   def functionToCreateContext(): StreamingContext = {
 val sparkConf = new
 SparkConf().setMaster(local[2]).setAppName(app)
 val ssc = new StreamingContext(sparkConf, Seconds(5))   // new context

 ssc.checkpoint(/tmp/spark/metricsCheckpoint)   // set checkpoint
 directory
 ssc
   }


 val ssc = StreamingContext.getOrCreate(/tmp/spark/metricsCheckpoint,
 functionToCreateContext _)
 val kafkaParams = Map[String, String](metadata.broker.list -
 args(0))
 val topics = args(1).split(\\,)
 val directKafkaStream = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet)

 directKafkaStream.foreachRDD { rdd = ...