You need to define a create function and use StreamingContext.getOrCreate. See the example here: http://spark.apache.org/docs/latest/streaming-programming-guide.html#how-to-configure-checkpointing
On Thu, Jan 21, 2016 at 3:32 AM, Patrick McGloin <mcgloin.patr...@gmail.com> wrote: > Hi all, > > To have a simple way of testing the Spark Streaming Write Ahead Log I > created a very simple Custom Input Receiver, which will generate strings > and store those: > > class InMemoryStringReceiver extends > Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) { > > val batchID = System.currentTimeMillis() > > def onStart() { > new Thread("InMemoryStringReceiver") { > override def run(): Unit = { > var i = 0 > while(true) { > > //http://spark.apache.org/docs/latest/streaming-custom-receivers.html > //To implement a reliable receiver, you have to use > store(multiple-records) to store data. > store(ArrayBuffer(s"$batchID-$i")) > println(s"Stored => [$batchID-$i)]") > Thread.sleep(1000L) > i = i + 1 > } > } > }.start() > } > > def onStop() {} > } > > I then created a simple Application which will use the Custom Receiver to > stream the data and process it: > > object DStreamResilienceTest extends App { > > val conf = new > SparkConf().setMaster("local[*]").setAppName("DStreamResilienceTest").set("spark.streaming.receiver.writeAheadLog.enable", > "true") > val ssc = new StreamingContext(conf, Seconds(1)) > > ssc.checkpoint("hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest") > val customReceiverStream: ReceiverInputDStream[String] = > ssc.receiverStream(new InMemoryStringReceiver()) > customReceiverStream.foreachRDD { (rdd: RDD[String]) => > println(s"processed => [${rdd.collect().toList}]") > Thread.sleep(2000L) > } > ssc.start() > ssc.awaitTermination() > > } > > As you can see the processing of each received RDD has sleep of 2 seconds > while the Strings are stored every second. This creates a backlog and the > new strings pile up, and should be stored in the WAL. Indeed, I can see the > files in the checkpoint dirs getting updated. Running the app I get output > like this: > > [info] Stored => [1453374654941-0)] > [info] processed => [List(1453374654941-0)] > [info] Stored => [1453374654941-1)] > [info] Stored => [1453374654941-2)] > [info] processed => [List(1453374654941-1)] > [info] Stored => [1453374654941-3)] > [info] Stored => [1453374654941-4)] > [info] processed => [List(1453374654941-2)] > [info] Stored => [1453374654941-5)] > [info] Stored => [1453374654941-6)] > [info] processed => [List(1453374654941-3)] > [info] Stored => [1453374654941-7)] > [info] Stored => [1453374654941-8)] > [info] processed => [List(1453374654941-4)] > [info] Stored => [1453374654941-9)] > [info] Stored => [1453374654941-10)] > > As you would expect, the storing is out pacing the processing. So I kill > the application and restart it. This time I commented out the sleep in the > foreachRDD so that the processing can clear any backlog: > > [info] Stored => [1453374753946-0)] > [info] processed => [List(1453374753946-0)] > [info] Stored => [1453374753946-1)] > [info] processed => [List(1453374753946-1)] > [info] Stored => [1453374753946-2)] > [info] processed => [List(1453374753946-2)] > [info] Stored => [1453374753946-3)] > [info] processed => [List(1453374753946-3)] > [info] Stored => [1453374753946-4)] > [info] processed => [List(1453374753946-4)] > > As you can see the new events are processed but none from the previous > batch. The old WAL logs are cleared and I see log messages like this but > the old data does not get processed. > > INFO WriteAheadLogManager : Recovered 1 write ahead log files from > hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest/receivedData/0 > > What am I doing wrong? I am using Spark 1.5.2. > > Best regards, > > Patrick >