Someone can correct me, but I'm pretty sure Spark dstreams (in
general, not just kafka) have been progressing on to the next batch
after a given batch aborts for quite some time now.  Yet another
reason I put offsets in my database transactionally.  My jobs throw
exceptions if the offset in the DB isn't what I expected it to be.

> I've been encountering the same kinds of timeout issues as Ivan, using the 
> "Kafka Stream" approach that he is using, except I'm storing my offsets 
> manually from the driver to Zookeeper in the Kafka 8 format. I haven't yet 
> implemented the KafkaRDD approach, and therefore don't have the concurrency 
> issues, but a very similar use case is coming up for me soon, it's just been 
> backburnered until I can get streaming to be more reliable (I will definitely 
> ensure unique group IDs when I do). Offset commits are certainly more painful 
> in Kafka 0.10, and that doesn't have anything to do with Spark.
> While i may be able to alleviate the timeout by just increasing it, I've 
> noticed something else that is more worrying: When one task fails 4 times in 
> a row (i.e. "Failed to get records for _ after polling for _"), Spark aborts 
> the Stage and Job with "Job aborted due to stage failure: Task _ in stage _ 
> failed 4 times". That's fine, and it's the behavior I want, but instead of 
> stopping the Application there (as previous versions of Spark did) the next 
> microbatch marches on and offsets are committed ahead of the failed 
> microbatch. Suddenly my at-least-once app becomes more 
> sometimes-at-least-once which is no good. In order for spark to display that 
> failure, I must be propagating the errors up to Spark, but the behavior of 
> marching forward with the next microbatch seems to be new, and a big 
> potential for data loss in streaming applications.
> Am I perhaps missing a setting to stop the entire streaming application once 
> spark.task.maxFailures is reached? Has anyone else seen this behavior of a 
> streaming application skipping over failed microbatches?
>> So basically what I am saying is
>> - increase
>> - use a separate group id everywhere
>> - stop committing offsets under the covers
>> That should eliminate all of those as possible causes, and then we can
>> see if there are still issues.
>> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
>> subscribe to a topic in order to update offsets, Kafka does.  If you
>> don't like the new Kafka consumer api, the existing 0.8 simple
>> consumer api should be usable with later brokers.  As long as you
>> don't need SSL or dynamic subscriptions, and it meets your needs, keep
>> using it.
>>> Yes, the parallel KafkaRDD uses the same consumer group, but each RDD uses a
>>> single distinct topic. For example, the group would be something like
>>> "storage-group", and the topics would be "storage-channel1", and
>>> "storage-channel2". In each thread a KafkaConsumer is started, assigned the
>>> partitions assigned, and then commit offsets are called after the RDD is
>>> processed. This should not interfere with the consumer group used by the
>>> executors which would be "spark-executor-storage-group".
>>> In the streaming example there is a single topic ("client-events") and group
>>> ("processing-group"). A single stream is created and offsets are manually
>>> updated from the executor after each partition is handled. This was a
>>> challenge since Spark now requires one to assign or subscribe to a topic in
>>> order to even update the offsets. In 0.8.2.x you did not have to worry about
>>> that. This approach limits your exposure to duplicate data since idempotent
>>> records are not entirely possible in our scenario. At least without a lot of
>>> re-running of logic to de-dup.
>>>> So just to be clear, the answers to my questions are
>>>> - you are not using different group ids, you're using the same group
>>>> id everywhere
>>>> - you are committing offsets manually
>>>> Right?
>>>> If you want to eliminate network or kafka misbehavior as a source,
>>>> tune upwards even higher.
>>>> You must use different group ids for different rdds or streams.
>>>> Kafka consumers won't behave the way you expect if they are all in the
>>>> same group id, and the consumer cache is keyed by group id. Yes, the
>>>> executor will tack "spark-executor-" on to the beginning, but if you
>>>> give it the same base group id, it will be the same.  And the driver
>>>> will use the group id you gave it, unmodified.
>>>> Finally, I really can't help you if you're manually writing your own
>>>> code to commit offsets directly to Kafka.  Trying to minimize
>>>> duplicates that way doesn't really make sense, your system must be
>>>> able to handle duplicates if you're using kafka as an offsets store,
>>>> it can't do transactional exactly once.
>>>>> Here are some examples and details of the scenarios. The KafkaRDD is the
>>>>> most
>>>>> error prone to polling
>>>>> timeouts and concurrentm modification errors.
>>>>> *Using KafkaRDD* - This takes a list of channels and processes them in
>>>>> parallel using the KafkaRDD directly. they all use the same consumer
>>>>> group
>>>>> ('storage-group'), but each has it's own topic and each topic has 4
>>>>> partitions. We routinely get timeout errors when polling for data. This
>>>>> occurs whether we process in parallel or sequentially.
>>>>> *Spark Kafka setting:*
>>>>> *Kafka Consumer Params:*
>>>>> metric.reporters = []
>>>>> = 300000
>>>>> partition.assignment.strategy =
>>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
>>>>> = 50
>>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
>>>>> max.partition.fetch.bytes = 1048576
>>>>> bootstrap.servers = [somemachine:31000]
>>>>> ssl.keystore.type = JKS
>>>>> = false
>>>>> sasl.mechanism = GSSAPI
>>>>> interceptor.classes = null
>>>>> exclude.internal.topics = true
>>>>> ssl.truststore.password = null
>>>>> =
>>>>> ssl.endpoint.identification.algorithm = null
>>>>> max.poll.records = 1000
>>>>> check.crcs = true
>>>>> = 40000
>>>>> = 3000
>>>>> = 5000
>>>>> receive.buffer.bytes = 65536
>>>>> ssl.truststore.type = JKS
>>>>> ssl.truststore.location = null
>>>>> ssl.keystore.password = null
>>>>> fetch.min.bytes = 1
>>>>> send.buffer.bytes = 131072
>>>>> value.deserializer = class
>>>>> = storage-group
>>>>> = 100
>>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>>> = null
>>>>> sasl.kerberos.ticket.renew.jitter = 0.05
>>>>> ssl.trustmanager.algorithm = PKIX
>>>>> ssl.key.password = null
>>>>> = 500
>>>>> sasl.kerberos.min.time.before.relogin = 60000
>>>>> = 540000
>>>>> = 30000
>>>>> metrics.num.samples = 2
>>>>> key.deserializer = class
>>>>> org.apache.kafka.common.serialization.StringDeserializer
>>>>> ssl.protocol = TLS
>>>>> ssl.provider = null
>>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>>> ssl.keystore.location = null
>>>>> ssl.cipher.suites = null
>>>>> security.protocol = PLAINTEXT
>>>>> ssl.keymanager.algorithm = SunX509
>>>>> = 30000
>>>>> auto.offset.reset = earliest
>>>>> *Example usage with KafkaRDD:*
>>>>> val channels = Seq("channel1", "channel2")
>>>>> channels.toParArray.foreach { channel =>
>>>>>  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>>>>>  // Get offsets for the given topic and the consumer group
>>>>> 'storage-group'
>>>>>  val offsetRanges = getOffsets("storage-group", channel)
>>>>>  val ds = KafkaUtils.createRDD[K, V](context,
>>>>>        kafkaParams asJava,
>>>>>        offsetRanges,
>>>>>        PreferConsistent).toDS[V]
>>>>>  // Do some aggregations
>>>>>  ds.agg(...)
>>>>>  // Save the data
>>>>>  ds.write.mode(SaveMode.Append).parquet(somePath)
>>>>>  // Save offsets using a KafkaConsumer
>>>>>  consumer.commitSync(newOffsets.asJava)
>>>>>  consumer.close()
>>>>> }
>>>>> *Example usage with Kafka Stream:*
>>>>> This creates a stream and processes events in each partition. At the end
>>>>> of
>>>>> processing for
>>>>> each partition, we updated the offsets for each partition. This is
>>>>> challenging to do, but is better
>>>>> then calling commitAysnc on the stream, because that occurs after the
>>>>> /entire/ RDD has been
>>>>> processed. This method minimizes duplicates in an exactly once
>>>>> environment.
>>>>> Since the executors
>>>>> use their own custom group "spark-executor-processor-group" and the
>>>>> commit
>>>>> is buried in private
>>>>> functions we are unable to use the executors cached consumer to update
>>>>> the
>>>>> offsets. This requires us
>>>>> to go through multiple steps to update the Kafka offsets accordingly.
>>>>> val offsetRanges = getOffsets("processor-group", "my-topic")
>>>>> val stream = KafkaUtils.createDirectStream[K, V](context,
>>>>>      PreferConsistent,
>>>>>      Subscribe[K, V](Seq("my-topic") asJavaCollection,
>>>>>        kafkaParams,
>>>>>        offsetRanges))
>>>>> stream.foreachRDD { rdd =>
>>>>>    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>>>    // Transform our data
>>>>>   rdd.foreachPartition { events =>
>>>>>       // Establish a consumer in the executor so we can update offsets
>>>>> after each partition.
>>>>>       // This class is homegrown and uses the KafkaConsumer to help
>>>>> get/set
>>>>> offsets
>>>>>       val consumer = new ConsumerUtils(kafkaParams)
>>>>>       // do something with our data
>>>>>       // Write the offsets that were updated in this partition
>>>>>       kafkaConsumer.setConsumerOffsets("processor-group",
>>>>>          Map(TopicAndPartition(tp.topic, tp.partition) -> endOffset))
>>>>>   }
>>>>> }
