Someone can correct me, but I'm pretty sure Spark dstreams (in general, not just kafka) have been progressing on to the next batch after a given batch aborts for quite some time now. Yet another reason I put offsets in my database transactionally. My jobs throw exceptions if the offset in the DB isn't what I expected it to be.
On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <grap...@graphex.com> wrote: > I've been encountering the same kinds of timeout issues as Ivan, using the > "Kafka Stream" approach that he is using, except I'm storing my offsets > manually from the driver to Zookeeper in the Kafka 8 format. I haven't yet > implemented the KafkaRDD approach, and therefore don't have the concurrency > issues, but a very similar use case is coming up for me soon, it's just been > backburnered until I can get streaming to be more reliable (I will definitely > ensure unique group IDs when I do). Offset commits are certainly more painful > in Kafka 0.10, and that doesn't have anything to do with Spark. > > While i may be able to alleviate the timeout by just increasing it, I've > noticed something else that is more worrying: When one task fails 4 times in > a row (i.e. "Failed to get records for _ after polling for _"), Spark aborts > the Stage and Job with "Job aborted due to stage failure: Task _ in stage _ > failed 4 times". That's fine, and it's the behavior I want, but instead of > stopping the Application there (as previous versions of Spark did) the next > microbatch marches on and offsets are committed ahead of the failed > microbatch. Suddenly my at-least-once app becomes more > sometimes-at-least-once which is no good. In order for spark to display that > failure, I must be propagating the errors up to Spark, but the behavior of > marching forward with the next microbatch seems to be new, and a big > potential for data loss in streaming applications. > > Am I perhaps missing a setting to stop the entire streaming application once > spark.task.maxFailures is reached? Has anyone else seen this behavior of a > streaming application skipping over failed microbatches? > > Thanks, > Sean > > >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <c...@koeninger.org> wrote: >> >> So basically what I am saying is >> >> - increase poll.ms >> - use a separate group id everywhere >> - stop committing offsets under the covers >> >> That should eliminate all of those as possible causes, and then we can >> see if there are still issues. >> >> As far as 0.8 vs 0.10, Spark doesn't require you to assign or >> subscribe to a topic in order to update offsets, Kafka does. If you >> don't like the new Kafka consumer api, the existing 0.8 simple >> consumer api should be usable with later brokers. As long as you >> don't need SSL or dynamic subscriptions, and it meets your needs, keep >> using it. >> >> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <i...@vadio.com> wrote: >>> Yes, the parallel KafkaRDD uses the same consumer group, but each RDD uses a >>> single distinct topic. For example, the group would be something like >>> "storage-group", and the topics would be "storage-channel1", and >>> "storage-channel2". In each thread a KafkaConsumer is started, assigned the >>> partitions assigned, and then commit offsets are called after the RDD is >>> processed. This should not interfere with the consumer group used by the >>> executors which would be "spark-executor-storage-group". >>> >>> In the streaming example there is a single topic ("client-events") and group >>> ("processing-group"). A single stream is created and offsets are manually >>> updated from the executor after each partition is handled. This was a >>> challenge since Spark now requires one to assign or subscribe to a topic in >>> order to even update the offsets. In 0.8.2.x you did not have to worry about >>> that. This approach limits your exposure to duplicate data since idempotent >>> records are not entirely possible in our scenario. At least without a lot of >>> re-running of logic to de-dup. >>> >>> Thanks, >>> >>> Ivan >>> >>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger <c...@koeninger.org> wrote: >>>> >>>> So just to be clear, the answers to my questions are >>>> >>>> - you are not using different group ids, you're using the same group >>>> id everywhere >>>> >>>> - you are committing offsets manually >>>> >>>> Right? >>>> >>>> If you want to eliminate network or kafka misbehavior as a source, >>>> tune poll.ms upwards even higher. >>>> >>>> You must use different group ids for different rdds or streams. >>>> Kafka consumers won't behave the way you expect if they are all in the >>>> same group id, and the consumer cache is keyed by group id. Yes, the >>>> executor will tack "spark-executor-" on to the beginning, but if you >>>> give it the same base group id, it will be the same. And the driver >>>> will use the group id you gave it, unmodified. >>>> >>>> Finally, I really can't help you if you're manually writing your own >>>> code to commit offsets directly to Kafka. Trying to minimize >>>> duplicates that way doesn't really make sense, your system must be >>>> able to handle duplicates if you're using kafka as an offsets store, >>>> it can't do transactional exactly once. >>>> >>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <i...@vadio.com> wrote: >>>>> Here are some examples and details of the scenarios. The KafkaRDD is the >>>>> most >>>>> error prone to polling >>>>> timeouts and concurrentm modification errors. >>>>> >>>>> *Using KafkaRDD* - This takes a list of channels and processes them in >>>>> parallel using the KafkaRDD directly. they all use the same consumer >>>>> group >>>>> ('storage-group'), but each has it's own topic and each topic has 4 >>>>> partitions. We routinely get timeout errors when polling for data. This >>>>> occurs whether we process in parallel or sequentially. >>>>> >>>>> *Spark Kafka setting:* >>>>> spark.streaming.kafka.consumer.poll.ms=2000 >>>>> >>>>> *Kafka Consumer Params:* >>>>> metric.reporters = [] >>>>> metadata.max.age.ms = 300000 >>>>> partition.assignment.strategy = >>>>> [org.apache.kafka.clients.consumer.RangeAssignor] >>>>> reconnect.backoff.ms = 50 >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>>>> max.partition.fetch.bytes = 1048576 >>>>> bootstrap.servers = [somemachine:31000] >>>>> ssl.keystore.type = JKS >>>>> enable.auto.commit = false >>>>> sasl.mechanism = GSSAPI >>>>> interceptor.classes = null >>>>> exclude.internal.topics = true >>>>> ssl.truststore.password = null >>>>> client.id = >>>>> ssl.endpoint.identification.algorithm = null >>>>> max.poll.records = 1000 >>>>> check.crcs = true >>>>> request.timeout.ms = 40000 >>>>> heartbeat.interval.ms = 3000 >>>>> auto.commit.interval.ms = 5000 >>>>> receive.buffer.bytes = 65536 >>>>> ssl.truststore.type = JKS >>>>> ssl.truststore.location = null >>>>> ssl.keystore.password = null >>>>> fetch.min.bytes = 1 >>>>> send.buffer.bytes = 131072 >>>>> value.deserializer = class >>>>> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer >>>>> group.id = storage-group >>>>> retry.backoff.ms = 100 >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>>>> sasl.kerberos.service.name = null >>>>> sasl.kerberos.ticket.renew.jitter = 0.05 >>>>> ssl.trustmanager.algorithm = PKIX >>>>> ssl.key.password = null >>>>> fetch.max.wait.ms = 500 >>>>> sasl.kerberos.min.time.before.relogin = 60000 >>>>> connections.max.idle.ms = 540000 >>>>> session.timeout.ms = 30000 >>>>> metrics.num.samples = 2 >>>>> key.deserializer = class >>>>> org.apache.kafka.common.serialization.StringDeserializer >>>>> ssl.protocol = TLS >>>>> ssl.provider = null >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>>>> ssl.keystore.location = null >>>>> ssl.cipher.suites = null >>>>> security.protocol = PLAINTEXT >>>>> ssl.keymanager.algorithm = SunX509 >>>>> metrics.sample.window.ms = 30000 >>>>> auto.offset.reset = earliest >>>>> >>>>> *Example usage with KafkaRDD:* >>>>> val channels = Seq("channel1", "channel2") >>>>> >>>>> channels.toParArray.foreach { channel => >>>>> val consumer = new KafkaConsumer[K, V](kafkaParams.asJava) >>>>> >>>>> // Get offsets for the given topic and the consumer group >>>>> 'storage-group' >>>>> val offsetRanges = getOffsets("storage-group", channel) >>>>> >>>>> val ds = KafkaUtils.createRDD[K, V](context, >>>>> kafkaParams asJava, >>>>> offsetRanges, >>>>> PreferConsistent).toDS[V] >>>>> >>>>> // Do some aggregations >>>>> ds.agg(...) >>>>> // Save the data >>>>> ds.write.mode(SaveMode.Append).parquet(somePath) >>>>> // Save offsets using a KafkaConsumer >>>>> consumer.commitSync(newOffsets.asJava) >>>>> consumer.close() >>>>> } >>>>> >>>>> >>>>> *Example usage with Kafka Stream:* >>>>> This creates a stream and processes events in each partition. At the end >>>>> of >>>>> processing for >>>>> each partition, we updated the offsets for each partition. This is >>>>> challenging to do, but is better >>>>> then calling commitAysnc on the stream, because that occurs after the >>>>> /entire/ RDD has been >>>>> processed. This method minimizes duplicates in an exactly once >>>>> environment. >>>>> Since the executors >>>>> use their own custom group "spark-executor-processor-group" and the >>>>> commit >>>>> is buried in private >>>>> functions we are unable to use the executors cached consumer to update >>>>> the >>>>> offsets. This requires us >>>>> to go through multiple steps to update the Kafka offsets accordingly. >>>>> >>>>> val offsetRanges = getOffsets("processor-group", "my-topic") >>>>> >>>>> val stream = KafkaUtils.createDirectStream[K, V](context, >>>>> PreferConsistent, >>>>> Subscribe[K, V](Seq("my-topic") asJavaCollection, >>>>> kafkaParams, >>>>> offsetRanges)) >>>>> >>>>> stream.foreachRDD { rdd => >>>>> val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges >>>>> >>>>> // Transform our data >>>>> rdd.foreachPartition { events => >>>>> // Establish a consumer in the executor so we can update offsets >>>>> after each partition. >>>>> // This class is homegrown and uses the KafkaConsumer to help >>>>> get/set >>>>> offsets >>>>> val consumer = new ConsumerUtils(kafkaParams) >>>>> // do something with our data >>>>> >>>>> // Write the offsets that were updated in this partition >>>>> kafkaConsumer.setConsumerOffsets("processor-group", >>>>> Map(TopicAndPartition(tp.topic, tp.partition) -> endOffset)) >>>>> } >>>>> } >>>>> >>>>> >>>>> >>>>> -- >>>>> View this message in context: >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html >>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>>> >>>>> --------------------------------------------------------------------- >>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>> >>> >>> >> >> --------------------------------------------------------------------- >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org