Sure thing! The main looks like:
-------------------------------------------------------------------------------------------------- val kafkaBrokers = conf.getString(s"$varPrefix.metadata.broker.list") val kafkaConf = Map( "zookeeper.connect" -> zookeeper, "group.id" -> options.group, "zookeeper.connection.timeout.ms" -> "10000", "auto.commit.interval.ms" -> "1000", "rebalance.max.retries" -> "25", "bootstrap.servers" -> kafkaBrokers ) val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => { createContext(kafkaConf, checkpointDirectory, topic, numThreads, isProd) }, createOnError = true) ssc.start() ssc.awaitTermination() -------------------------------------------------------------------------------------------------- And createContext is defined as: -------------------------------------------------------------------------------------------------- val batchDuration = Seconds(5) val checkpointDuration = Seconds(20) private val AUTO_OFFSET_COMMIT = "auto.commit.enable" def createContext(kafkaConf: Map[String, String], checkpointDirectory: String, topic: String, numThreads: Int, isProd: Boolean) : StreamingContext = { val sparkConf = new SparkConf().setAppName("***") val ssc = new StreamingContext(sparkConf, batchDuration) ssc.checkpoint(checkpointDirectory) val topicSet = topic.split(",").toSet val groupId = kafkaConf.getOrElse("group.id", "") val directKStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicSet) directKStream.checkpoint(checkpointDuration) val table = *** directKStream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.flatMap(rec => someFunc(rec)) .reduceByKey((i1: Long, i2: Long) => if (i1 > i2) i1 else i2) .foreachPartition { partitionRec => val dbWrite = DynamoDBWriter() partitionRec.foreach { /* Update Dynamo Here */ } } /** Set up ZK Connection **/ val props = new Properties() kafkaConf.foreach(param => props.put(param._1, param._2)) props.setProperty(AUTO_OFFSET_COMMIT, "false") val consumerConfig = new ConsumerConfig(props) assert(!consumerConfig.autoCommitEnable) val zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) offsetRanges.foreach { osr => val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic) val zkPath = s"${topicDirs.consumerOffsetDir}/${osr.partition}" ZkUtils.updatePersistentPath(zkClient, zkPath, osr.untilOffset.toString) } } ssc } On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger <c...@koeninger.org> wrote: > Sounds like something's not set up right... can you post a minimal code > example that reproduces the issue? > > On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang <suchenz...@gmail.com> wrote: > >> Yeah. All messages are lost while the streaming job was down. >> >> On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> Are you actually losing messages then? >>> >>> On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang <suchenz...@gmail.com> >>> wrote: >>> >>>> No; first batch only contains messages received after the second job >>>> starts (messages come in at a steady rate of about 400/second). >>>> >>>> On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger <c...@koeninger.org> >>>> wrote: >>>> >>>>> Does the first batch after restart contain all the messages received >>>>> while the job was down? >>>>> >>>>> On Tue, Aug 25, 2015 at 12:53 PM, suchenzang <suchenz...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> I'm using direct spark streaming (from kafka) with checkpointing, and >>>>>> everything works well until a restart. When I shut down (^C) the first >>>>>> streaming job, wait 1 minute, then re-submit, there is somehow a >>>>>> series of 0 >>>>>> event batches that get queued (corresponding to the 1 minute when the >>>>>> job >>>>>> was down). Eventually, the batches would resume processing, and I >>>>>> would see >>>>>> that each batch has roughly 2000 events. >>>>>> >>>>>> I see that at the beginning of the second launch, the checkpoint dirs >>>>>> are >>>>>> found and "loaded", according to console output. >>>>>> >>>>>> Is this expected behavior? It seems like I might've configured >>>>>> something >>>>>> incorrectly, since I would expect with checkpointing that the >>>>>> streaming job >>>>>> would resume from checkpoint and continue processing from there >>>>>> (without >>>>>> seeing 0 event batches corresponding to when the job was down). >>>>>> >>>>>> Also, if I were to wait > 10 minutes or so before re-launching, there >>>>>> would >>>>>> be so many 0 event batches that the job would hang. Is this merely >>>>>> something >>>>>> to be "waited out", or should I set up some restart behavior/make a >>>>>> config >>>>>> change to discard checkpointing if the elapsed time has been too long? >>>>>> >>>>>> Thanks! >>>>>> >>>>>> < >>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png >>>>>> > >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> View this message in context: >>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html >>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>> Nabble.com. >>>>>> >>>>>> --------------------------------------------------------------------- >>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>> >>>>>> >>>>> >>>> >>> >> >