Thank you Shixiong, that is what I was missing. On 26 January 2016 at 00:27, Shixiong(Ryan) Zhu <shixi...@databricks.com> wrote:
> You need to define a create function and use StreamingContext.getOrCreate. > See the example here: > http://spark.apache.org/docs/latest/streaming-programming-guide.html#how-to-configure-checkpointing > > On Thu, Jan 21, 2016 at 3:32 AM, Patrick McGloin < > mcgloin.patr...@gmail.com> wrote: > >> Hi all, >> >> To have a simple way of testing the Spark Streaming Write Ahead Log I >> created a very simple Custom Input Receiver, which will generate strings >> and store those: >> >> class InMemoryStringReceiver extends >> Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) { >> >> val batchID = System.currentTimeMillis() >> >> def onStart() { >> new Thread("InMemoryStringReceiver") { >> override def run(): Unit = { >> var i = 0 >> while(true) { >> >> //http://spark.apache.org/docs/latest/streaming-custom-receivers.html >> //To implement a reliable receiver, you have to use >> store(multiple-records) to store data. >> store(ArrayBuffer(s"$batchID-$i")) >> println(s"Stored => [$batchID-$i)]") >> Thread.sleep(1000L) >> i = i + 1 >> } >> } >> }.start() >> } >> >> def onStop() {} >> } >> >> I then created a simple Application which will use the Custom Receiver to >> stream the data and process it: >> >> object DStreamResilienceTest extends App { >> >> val conf = new >> SparkConf().setMaster("local[*]").setAppName("DStreamResilienceTest").set("spark.streaming.receiver.writeAheadLog.enable", >> "true") >> val ssc = new StreamingContext(conf, Seconds(1)) >> >> ssc.checkpoint("hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest") >> val customReceiverStream: ReceiverInputDStream[String] = >> ssc.receiverStream(new InMemoryStringReceiver()) >> customReceiverStream.foreachRDD { (rdd: RDD[String]) => >> println(s"processed => [${rdd.collect().toList}]") >> Thread.sleep(2000L) >> } >> ssc.start() >> ssc.awaitTermination() >> >> } >> >> As you can see the processing of each received RDD has sleep of 2 seconds >> while the Strings are stored every second. This creates a backlog and the >> new strings pile up, and should be stored in the WAL. Indeed, I can see the >> files in the checkpoint dirs getting updated. Running the app I get output >> like this: >> >> [info] Stored => [1453374654941-0)] >> [info] processed => [List(1453374654941-0)] >> [info] Stored => [1453374654941-1)] >> [info] Stored => [1453374654941-2)] >> [info] processed => [List(1453374654941-1)] >> [info] Stored => [1453374654941-3)] >> [info] Stored => [1453374654941-4)] >> [info] processed => [List(1453374654941-2)] >> [info] Stored => [1453374654941-5)] >> [info] Stored => [1453374654941-6)] >> [info] processed => [List(1453374654941-3)] >> [info] Stored => [1453374654941-7)] >> [info] Stored => [1453374654941-8)] >> [info] processed => [List(1453374654941-4)] >> [info] Stored => [1453374654941-9)] >> [info] Stored => [1453374654941-10)] >> >> As you would expect, the storing is out pacing the processing. So I kill >> the application and restart it. This time I commented out the sleep in the >> foreachRDD so that the processing can clear any backlog: >> >> [info] Stored => [1453374753946-0)] >> [info] processed => [List(1453374753946-0)] >> [info] Stored => [1453374753946-1)] >> [info] processed => [List(1453374753946-1)] >> [info] Stored => [1453374753946-2)] >> [info] processed => [List(1453374753946-2)] >> [info] Stored => [1453374753946-3)] >> [info] processed => [List(1453374753946-3)] >> [info] Stored => [1453374753946-4)] >> [info] processed => [List(1453374753946-4)] >> >> As you can see the new events are processed but none from the previous >> batch. The old WAL logs are cleared and I see log messages like this but >> the old data does not get processed. >> >> INFO WriteAheadLogManager : Recovered 1 write ahead log files from >> hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest/receivedData/0 >> >> What am I doing wrong? I am using Spark 1.5.2. >> >> Best regards, >> >> Patrick >> > >