You need to define a create function and use StreamingContext.getOrCreate.
See the example here:
http://spark.apache.org/docs/latest/streaming-programming-guide.html#how-to-configure-checkpointing

On Thu, Jan 21, 2016 at 3:32 AM, Patrick McGloin <mcgloin.patr...@gmail.com>
wrote:

> Hi all,
>
> To have a simple way of testing the Spark Streaming Write Ahead Log I
> created a very simple Custom Input Receiver, which will generate strings
> and store those:
>
> class InMemoryStringReceiver extends 
> Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) {
>
>   val batchID = System.currentTimeMillis()
>
>   def onStart() {
>     new Thread("InMemoryStringReceiver") {
>       override def run(): Unit = {
>         var i = 0
>         while(true) {
>           
> //http://spark.apache.org/docs/latest/streaming-custom-receivers.html
>           //To implement a reliable receiver, you have to use 
> store(multiple-records) to store data.
>           store(ArrayBuffer(s"$batchID-$i"))
>           println(s"Stored => [$batchID-$i)]")
>           Thread.sleep(1000L)
>           i = i + 1
>         }
>       }
>     }.start()
>   }
>
>   def onStop() {}
> }
>
> I then created a simple Application which will use the Custom Receiver to
> stream the data and process it:
>
> object DStreamResilienceTest extends App {
>
>   val conf = new 
> SparkConf().setMaster("local[*]").setAppName("DStreamResilienceTest").set("spark.streaming.receiver.writeAheadLog.enable",
>  "true")
>   val ssc = new StreamingContext(conf, Seconds(1))
>   
> ssc.checkpoint("hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest")
>   val customReceiverStream: ReceiverInputDStream[String] = 
> ssc.receiverStream(new InMemoryStringReceiver())
>   customReceiverStream.foreachRDD { (rdd: RDD[String]) =>
>     println(s"processed => [${rdd.collect().toList}]")
>     Thread.sleep(2000L)
>   }
>   ssc.start()
>   ssc.awaitTermination()
>
> }
>
> As you can see the processing of each received RDD has sleep of 2 seconds
> while the Strings are stored every second. This creates a backlog and the
> new strings pile up, and should be stored in the WAL. Indeed, I can see the
> files in the checkpoint dirs getting updated. Running the app I get output
> like this:
>
> [info] Stored => [1453374654941-0)]
> [info] processed => [List(1453374654941-0)]
> [info] Stored => [1453374654941-1)]
> [info] Stored => [1453374654941-2)]
> [info] processed => [List(1453374654941-1)]
> [info] Stored => [1453374654941-3)]
> [info] Stored => [1453374654941-4)]
> [info] processed => [List(1453374654941-2)]
> [info] Stored => [1453374654941-5)]
> [info] Stored => [1453374654941-6)]
> [info] processed => [List(1453374654941-3)]
> [info] Stored => [1453374654941-7)]
> [info] Stored => [1453374654941-8)]
> [info] processed => [List(1453374654941-4)]
> [info] Stored => [1453374654941-9)]
> [info] Stored => [1453374654941-10)]
>
> As you would expect, the storing is out pacing the processing. So I kill
> the application and restart it. This time I commented out the sleep in the
> foreachRDD so that the processing can clear any backlog:
>
> [info] Stored => [1453374753946-0)]
> [info] processed => [List(1453374753946-0)]
> [info] Stored => [1453374753946-1)]
> [info] processed => [List(1453374753946-1)]
> [info] Stored => [1453374753946-2)]
> [info] processed => [List(1453374753946-2)]
> [info] Stored => [1453374753946-3)]
> [info] processed => [List(1453374753946-3)]
> [info] Stored => [1453374753946-4)]
> [info] processed => [List(1453374753946-4)]
>
> As you can see the new events are processed but none from the previous
> batch. The old WAL logs are cleared and I see log messages like this but
> the old data does not get processed.
>
> INFO WriteAheadLogManager : Recovered 1 write ahead log files from 
> hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest/receivedData/0
>
> What am I doing wrong? I am using Spark 1.5.2.
>
> Best regards,
>
> Patrick
>

Reply via email to