Thank you Shixiong, that is what I was missing.

On 26 January 2016 at 00:27, Shixiong(Ryan) Zhu <shixi...@databricks.com>
wrote:

> You need to define a create function and use StreamingContext.getOrCreate.
> See the example here:
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#how-to-configure-checkpointing
>
> On Thu, Jan 21, 2016 at 3:32 AM, Patrick McGloin <
> mcgloin.patr...@gmail.com> wrote:
>
>> Hi all,
>>
>> To have a simple way of testing the Spark Streaming Write Ahead Log I
>> created a very simple Custom Input Receiver, which will generate strings
>> and store those:
>>
>> class InMemoryStringReceiver extends 
>> Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) {
>>
>>   val batchID = System.currentTimeMillis()
>>
>>   def onStart() {
>>     new Thread("InMemoryStringReceiver") {
>>       override def run(): Unit = {
>>         var i = 0
>>         while(true) {
>>           
>> //http://spark.apache.org/docs/latest/streaming-custom-receivers.html
>>           //To implement a reliable receiver, you have to use 
>> store(multiple-records) to store data.
>>           store(ArrayBuffer(s"$batchID-$i"))
>>           println(s"Stored => [$batchID-$i)]")
>>           Thread.sleep(1000L)
>>           i = i + 1
>>         }
>>       }
>>     }.start()
>>   }
>>
>>   def onStop() {}
>> }
>>
>> I then created a simple Application which will use the Custom Receiver to
>> stream the data and process it:
>>
>> object DStreamResilienceTest extends App {
>>
>>   val conf = new 
>> SparkConf().setMaster("local[*]").setAppName("DStreamResilienceTest").set("spark.streaming.receiver.writeAheadLog.enable",
>>  "true")
>>   val ssc = new StreamingContext(conf, Seconds(1))
>>   
>> ssc.checkpoint("hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest")
>>   val customReceiverStream: ReceiverInputDStream[String] = 
>> ssc.receiverStream(new InMemoryStringReceiver())
>>   customReceiverStream.foreachRDD { (rdd: RDD[String]) =>
>>     println(s"processed => [${rdd.collect().toList}]")
>>     Thread.sleep(2000L)
>>   }
>>   ssc.start()
>>   ssc.awaitTermination()
>>
>> }
>>
>> As you can see the processing of each received RDD has sleep of 2 seconds
>> while the Strings are stored every second. This creates a backlog and the
>> new strings pile up, and should be stored in the WAL. Indeed, I can see the
>> files in the checkpoint dirs getting updated. Running the app I get output
>> like this:
>>
>> [info] Stored => [1453374654941-0)]
>> [info] processed => [List(1453374654941-0)]
>> [info] Stored => [1453374654941-1)]
>> [info] Stored => [1453374654941-2)]
>> [info] processed => [List(1453374654941-1)]
>> [info] Stored => [1453374654941-3)]
>> [info] Stored => [1453374654941-4)]
>> [info] processed => [List(1453374654941-2)]
>> [info] Stored => [1453374654941-5)]
>> [info] Stored => [1453374654941-6)]
>> [info] processed => [List(1453374654941-3)]
>> [info] Stored => [1453374654941-7)]
>> [info] Stored => [1453374654941-8)]
>> [info] processed => [List(1453374654941-4)]
>> [info] Stored => [1453374654941-9)]
>> [info] Stored => [1453374654941-10)]
>>
>> As you would expect, the storing is out pacing the processing. So I kill
>> the application and restart it. This time I commented out the sleep in the
>> foreachRDD so that the processing can clear any backlog:
>>
>> [info] Stored => [1453374753946-0)]
>> [info] processed => [List(1453374753946-0)]
>> [info] Stored => [1453374753946-1)]
>> [info] processed => [List(1453374753946-1)]
>> [info] Stored => [1453374753946-2)]
>> [info] processed => [List(1453374753946-2)]
>> [info] Stored => [1453374753946-3)]
>> [info] processed => [List(1453374753946-3)]
>> [info] Stored => [1453374753946-4)]
>> [info] processed => [List(1453374753946-4)]
>>
>> As you can see the new events are processed but none from the previous
>> batch. The old WAL logs are cleared and I see log messages like this but
>> the old data does not get processed.
>>
>> INFO WriteAheadLogManager : Recovered 1 write ahead log files from 
>> hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest/receivedData/0
>>
>> What am I doing wrong? I am using Spark 1.5.2.
>>
>> Best regards,
>>
>> Patrick
>>
>
>

Reply via email to