Ah, I was using the UI coupled with the job logs indicating that offsets
were being "processed" even though it corresponded to 0 events. Looks like
I wasn't matching up timestamps correctly: the 0 event batches were
queued/processed when offsets were getting skipped:

15/08/26 11:26:05 INFO storage.BlockManager: Removing RDD 0
15/08/26 11:26:05 INFO kafka.KafkaRDD: Beginning offset 831729964 is the
same as ending offset skipping install-json 1
15/08/26 11:26:05 INFO storage.ShuffleBlockFetcherIterator: Getting 0
non-empty blocks out of 6 blocks
15/08/26 11:26:08 INFO storage.BlockManager: Removing RDD 1

But eventually processing of offset 831729964 would resume:

15/08/26 11:27:18 INFO kafka.KafkaRDD: Computing topic install-json,
partition 1 offsets 831729964 -> 831729976

Lesson learned: will be more focused on reading the job logs properly in
the future.


Thanks for all the help on this!


On Wed, Aug 26, 2015 at 12:16 PM, Cody Koeninger <c...@koeninger.org> wrote:

> I'd be less concerned about what the streaming ui shows than what's
> actually going on with the job.  When you say you were losing messages, how
> were you observing that?  The UI, or actual job output?
>
> The log lines you posted indicate that the checkpoint was restored and
> those offsets were processed; what are the log lines for the following
> KafkaRDD ?
>
>
> On Wed, Aug 26, 2015 at 2:04 PM, Susan Zhang <suchenz...@gmail.com> wrote:
>
>> Compared offsets, and it continues from checkpoint loading:
>>
>> 15/08/26 11:24:54 INFO
>> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
>> KafkaRDD for time 1440612035000 ms [(install-json,5,826112083,826112446),
>> (install-json,4,825772921,825773536),
>> (install-json,1,831654775,831655076),
>> (install-json,0,1296018451,1296018810),
>> (install-json,2,824785282,824785696), (install-json,3,
>> 811428882,811429181)]
>>
>> 15/08/26 11:25:19 INFO kafka.KafkaRDD: Computing topic install-json,
>> partition 0 offsets 1296018451 -> 1296018810
>> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
>> partition 4 offsets 825773536 -> 825907428
>> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
>> partition 2 offsets 824785696 -> 824889957
>> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
>> partition 3 offsets 811429181 -> 811529084
>> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
>> partition 1 offsets 831655076 -> 831729964
>> ...
>>
>> But for some reason the streaming UI shows it as computing 0 events.
>>
>> Removing the call to checkpoint does remove the queueing of 0 event
>> batches, since offsets just skip to the latest (checked that the first
>> part.fromOffset in the restarted job is larger than the last
>> part.untilOffset before restart).
>>
>>
>>
>>
>> On Wed, Aug 26, 2015 at 11:19 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> When the kafka rdd is actually being iterated on the worker, there
>>> should be an info line of the form
>>>
>>>     log.info(s"Computing topic ${part.topic}, partition
>>> ${part.partition} " +
>>>
>>>       s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>>>
>>>
>>> You should be able to compare that to log of offsets during checkpoint
>>> loading, to see if they line up.
>>>
>>> Just out of curiosity, does removing the call to checkpoint on the
>>> stream affect anything?
>>>
>>>
>>>
>>> On Wed, Aug 26, 2015 at 1:04 PM, Susan Zhang <suchenz...@gmail.com>
>>> wrote:
>>>
>>>> Thanks for the suggestions! I tried the following:
>>>>
>>>> I removed
>>>>
>>>> createOnError = true
>>>>
>>>> And reran the same process to reproduce. Double checked that checkpoint
>>>> is loading:
>>>>
>>>> 15/08/26 10:10:40 INFO
>>>> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
>>>> KafkaRDD for time 1440608825000 ms [(install-json,5,825898270,825898528),
>>>> (install-json,4,825400856,825401058),
>>>> (install-json,1,831453228,831453396),
>>>> (install-json,0,1295759888,1295760378),
>>>> (install-json,2,824443526,824444409), (install-json,3,
>>>> 811222580,811222874)]
>>>> 15/08/26 10:10:40 INFO
>>>> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
>>>> KafkaRDD for time 1440608830000 ms [(install-json,5,825898528,825898791),
>>>> (install-json,4,825401058,825401249),
>>>> (install-json,1,831453396,831453603),
>>>> (install-json,0,1295760378,1295760809),
>>>> (install-json,2,824444409,824445510), (install-json,3,
>>>> 811222874,811223285)]
>>>> ...
>>>>
>>>> And the same issue is appearing as before (with 0 event batches getting
>>>> queued corresponding to dropped messages). Our kafka brokers are on version
>>>> 0.8.2.0, if that makes a difference.
>>>>
>>>> Also as a sanity check, I took out the ZK updates and reran (just in
>>>> case that was somehow causing problems), and that didn't change anything as
>>>> expected.
>>>>
>>>> Furthermore, the 0 event batches seem to take longer to process than
>>>> batches with the regular load of events: processing time for 0 event
>>>> batches can be upwards of 1 - 2 minutes, whereas processing time for ~2000
>>>> event batches is consistently < 1s. Why would that happen?
>>>>
>>>>
>>>> As for the checkpoint call:
>>>>
>>>> directKStream.checkpoint(checkpointDuration)
>>>>
>>>> was an attempt to set the checkpointing interval (at some multiple of
>>>> the batch interval), whereas StreamingContext.checkpoint seems like it will
>>>> only set the checkpoint directory.
>>>>
>>>>
>>>>
>>>> Thanks for all the help,
>>>>
>>>> Susan
>>>>
>>>>
>>>> On Wed, Aug 26, 2015 at 7:12 AM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> The first thing that stands out to me is
>>>>> createOnError = true
>>>>>
>>>>> Are you sure the checkpoint is actually loading, as opposed to failing
>>>>> and starting the job anyway?  There should be info lines that look like
>>>>>
>>>>> INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData:
>>>>> Restoring KafkaRDD for time 1440597180000 ms [(test,1,162,220)
>>>>>
>>>>>
>>>>> You should be able to tell from those whether the offset ranges being
>>>>> loaded from the checkpoint look reasonable.
>>>>>
>>>>> Also, is there a reason you're calling
>>>>>
>>>>> directKStream.checkpoint(checkpointDuration)
>>>>>
>>>>> Just calling checkpoint on the streaming context should be sufficient
>>>>> to save the metadata
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang <suchenz...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Sure thing!
>>>>>>
>>>>>> The main looks like:
>>>>>>
>>>>>>
>>>>>> --------------------------------------------------------------------------------------------------
>>>>>>
>>>>>>
>>>>>> val kafkaBrokers = conf.getString(s"$varPrefix.metadata.broker.list")
>>>>>>
>>>>>> val kafkaConf = Map(
>>>>>>       "zookeeper.connect" -> zookeeper,
>>>>>>       "group.id" -> options.group,
>>>>>>       "zookeeper.connection.timeout.ms" -> "10000",
>>>>>>       "auto.commit.interval.ms" -> "1000",
>>>>>>       "rebalance.max.retries" -> "25",
>>>>>>       "bootstrap.servers" -> kafkaBrokers
>>>>>>     )
>>>>>>
>>>>>> val ssc = StreamingContext.getOrCreate(checkpointDirectory,
>>>>>>       () => {
>>>>>>         createContext(kafkaConf, checkpointDirectory, topic,
>>>>>> numThreads, isProd)
>>>>>>       }, createOnError = true)
>>>>>>
>>>>>> ssc.start()
>>>>>> ssc.awaitTermination()
>>>>>>
>>>>>>
>>>>>>
>>>>>> --------------------------------------------------------------------------------------------------
>>>>>>
>>>>>>
>>>>>> And createContext is defined as:
>>>>>>
>>>>>>
>>>>>>
>>>>>> --------------------------------------------------------------------------------------------------
>>>>>>
>>>>>>
>>>>>> val batchDuration = Seconds(5)
>>>>>> val checkpointDuration = Seconds(20)
>>>>>>
>>>>>> private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
>>>>>>
>>>>>> def createContext(kafkaConf: Map[String, String],
>>>>>>                     checkpointDirectory: String,
>>>>>>                     topic: String,
>>>>>>                     numThreads: Int,
>>>>>>                     isProd: Boolean)
>>>>>>   : StreamingContext = {
>>>>>>
>>>>>>     val sparkConf = new SparkConf().setAppName("***")
>>>>>>     val ssc = new StreamingContext(sparkConf, batchDuration)
>>>>>>     ssc.checkpoint(checkpointDirectory)
>>>>>>
>>>>>>     val topicSet = topic.split(",").toSet
>>>>>>     val groupId = kafkaConf.getOrElse("group.id", "")
>>>>>>
>>>>>>     val directKStream = KafkaUtils.createDirectStream[String, String,
>>>>>> StringDecoder, StringDecoder](ssc, kafkaConf, topicSet)
>>>>>>     directKStream.checkpoint(checkpointDuration)
>>>>>>
>>>>>>     val table = ***
>>>>>>
>>>>>>     directKStream.foreachRDD { rdd =>
>>>>>>       val offsetRanges =
>>>>>> rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>>>>       rdd.flatMap(rec => someFunc(rec))
>>>>>>         .reduceByKey((i1: Long, i2: Long) => if (i1 > i2) i1 else i2)
>>>>>>         .foreachPartition { partitionRec =>
>>>>>>           val dbWrite = DynamoDBWriter()
>>>>>>           partitionRec.foreach {
>>>>>>             /* Update Dynamo Here */
>>>>>>           }
>>>>>>         }
>>>>>>
>>>>>>       /** Set up ZK Connection **/
>>>>>>       val props = new Properties()
>>>>>>       kafkaConf.foreach(param => props.put(param._1, param._2))
>>>>>>
>>>>>>       props.setProperty(AUTO_OFFSET_COMMIT, "false")
>>>>>>
>>>>>>       val consumerConfig = new ConsumerConfig(props)
>>>>>>       assert(!consumerConfig.autoCommitEnable)
>>>>>>
>>>>>>       val zkClient = new ZkClient(consumerConfig.zkConnect,
>>>>>> consumerConfig.zkSessionTimeoutMs,
>>>>>>         consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
>>>>>>
>>>>>>       offsetRanges.foreach { osr =>
>>>>>>         val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic)
>>>>>>         val zkPath =
>>>>>> s"${topicDirs.consumerOffsetDir}/${osr.partition}"
>>>>>>         ZkUtils.updatePersistentPath(zkClient, zkPath,
>>>>>> osr.untilOffset.toString)
>>>>>>       }
>>>>>>     }
>>>>>>     ssc
>>>>>>   }
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger <c...@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Sounds like something's not set up right... can you post a minimal
>>>>>>> code example that reproduces the issue?
>>>>>>>
>>>>>>> On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang <suchenz...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yeah. All messages are lost while the streaming job was down.
>>>>>>>>
>>>>>>>> On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger <
>>>>>>>> c...@koeninger.org> wrote:
>>>>>>>>
>>>>>>>>> Are you actually losing messages then?
>>>>>>>>>
>>>>>>>>> On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang <suchenz...@gmail.com
>>>>>>>>> > wrote:
>>>>>>>>>
>>>>>>>>>> No; first batch only contains messages received after the second
>>>>>>>>>> job starts (messages come in at a steady rate of about 400/second).
>>>>>>>>>>
>>>>>>>>>> On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger <
>>>>>>>>>> c...@koeninger.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Does the first batch after restart contain all the messages
>>>>>>>>>>> received while the job was down?
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Aug 25, 2015 at 12:53 PM, suchenzang <
>>>>>>>>>>> suchenz...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello,
>>>>>>>>>>>>
>>>>>>>>>>>> I'm using direct spark streaming (from kafka) with
>>>>>>>>>>>> checkpointing, and
>>>>>>>>>>>> everything works well until a restart. When I shut down (^C)
>>>>>>>>>>>> the first
>>>>>>>>>>>> streaming job, wait 1 minute, then re-submit, there is somehow
>>>>>>>>>>>> a series of 0
>>>>>>>>>>>> event batches that get queued (corresponding to the 1 minute
>>>>>>>>>>>> when the job
>>>>>>>>>>>> was down). Eventually, the batches would resume processing, and
>>>>>>>>>>>> I would see
>>>>>>>>>>>> that each batch has roughly 2000 events.
>>>>>>>>>>>>
>>>>>>>>>>>> I see that at the beginning of the second launch, the
>>>>>>>>>>>> checkpoint dirs are
>>>>>>>>>>>> found and "loaded", according to console output.
>>>>>>>>>>>>
>>>>>>>>>>>> Is this expected behavior? It seems like I might've configured
>>>>>>>>>>>> something
>>>>>>>>>>>> incorrectly, since I would expect with checkpointing that the
>>>>>>>>>>>> streaming job
>>>>>>>>>>>> would resume from checkpoint and continue processing from there
>>>>>>>>>>>> (without
>>>>>>>>>>>> seeing 0 event batches corresponding to when the job was down).
>>>>>>>>>>>>
>>>>>>>>>>>> Also, if I were to wait > 10 minutes or so before re-launching,
>>>>>>>>>>>> there would
>>>>>>>>>>>> be so many 0 event batches that the job would hang. Is this
>>>>>>>>>>>> merely something
>>>>>>>>>>>> to be "waited out", or should I set up some restart
>>>>>>>>>>>> behavior/make a config
>>>>>>>>>>>> change to discard checkpointing if the elapsed time has been
>>>>>>>>>>>> too long?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>
>>>>>>>>>>>> <
>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24450/Screen_Shot_2015-08-25_at_10.png
>>>>>>>>>>>> >
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> View this message in context:
>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpointing-Restarts-with-0-Event-Batches-tp24450.html
>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to