I believe you’ll have to use another way of creating StreamingContext by passing create function in getOrCreate function.
private def setupSparkContext(): StreamingContext = { val streamingSparkContext = { val sparkConf = new SparkConf().setAppName(config.appName).setMaster(config.master) new StreamingContext(sparkConf, config.batchInterval) } streamingSparkContext.checkpoint(config.checkpointDir) streamingSparkContext } …. val ssc = StreamingContext.getOrCreate(config.checkpointDir, setupSparkContext) Javadoc for getOrCreate /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be * recreated from the checkpoint data. If the data does not exist, then the StreamingContext * will be created by called the provided `creatingFunc`. * * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program * @param creatingFunc Function to create a new StreamingContext * @param hadoopConf Optional Hadoop configuration if necessary for reading from the * file system * @param createOnError Optional, whether to create a new StreamingContext if there is an * error in reading checkpoint data. By default, an exception will be * thrown on error. */ Hope this helps! SM > On 06-Nov-2015, at 8:19 PM, Cody Koeninger <c...@koeninger.org> wrote: > > Have you looked at the driver and executor logs? > > Without being able to see what's in the "do stuff with the dstream" section > of code... I'd suggest starting with a simpler job, e.g that does nothing but > print each message, and verify whether it checkpoints > > On Fri, Nov 6, 2015 at 3:59 AM, Kathi Stutz <em...@kathistutz.de > <mailto:em...@kathistutz.de>> wrote: > Hi all, > > I want to load an InputDStream from a checkkpoint, but I doesn't work, and > after trying several things I have finally run out of ideas. > > So, here's what I do: > > 1. I create the streaming context - or load it from the checkpoint directory. > > def main(args: Array[String]) { > val ssc = StreamingContext.getOrCreate("files/checkpoint", > createStreamingContext _) > ssc.start() > ssc.awaitTermination() > } > > 2. In the function createStreamingContext(), I first create a new Spark > config... > > def createStreamingContext(): StreamingContext = { > println("New Context") > > val conf = new SparkConf() > .setMaster("local[2]") > .setAppName("CheckpointTest") > .set("spark.streaming.kafka.maxRatePerPartition", "10000") > > //...then I create the streaming context... > val ssc = new StreamingContext(conf, Seconds(1)) > > var offsetRanges = Array[OffsetRange]() > val kafkaParams = Map("metadata.broker.list" -> > "sandbox.hortonworks.com:6667 <http://sandbox.hortonworks.com:6667/>", > "auto.offset.reset" -> "smallest") //Start from beginning > val kafkaTopics = Set("Bla") > > //...then I go and get a DStream from Kafka... > val directKafkaStream = KafkaUtils.createDirectStream[String, > Array[Byte], StringDecoder, DefaultDecoder](ssc, > kafkaParams, kafkaTopics) > > //...I do stuff with the DStream > ... > > //...and finally I checkpoint the streaming context and return it > ssc.checkpoint("files/checkpoint") > ssc > } > > 3. When I start the application, after a while it creates in > files/checkpoint/ an empty directory with a name like > 23207ed2-c021-4a1d-8af8-0620a19a8665. But that's all, no more files or > directories or whatever appear there. > > 4. When I stop the application and restart it, it creates a new streaming > context each time. (This also means it starts the Kafka streaming from the > smallest available offset again and again. The main reason for using > checkpoints for me was to not having to keep track of Kafka offsets.) > > So, what am I doing wrong? > > Thanks a lot! > > Kathi > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > <mailto:user-unsubscr...@spark.apache.org> > For additional commands, e-mail: user-h...@spark.apache.org > <mailto:user-h...@spark.apache.org> > >