Yeah, the KafkaRDD cannot be reused. It's better to document it.

On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy <i...@vadio.com> wrote:

> Ok, I have split he KafkaRDD logic to each use their own group and bumped
> the poll.ms to 10 seconds. Anything less then 2 seconds on the poll.ms
> ends up with a timeout and exception so I am still perplexed on that one.
> The new error I am getting now is a `ConcurrentModificationException`
> when Spark is trying to remove the CachedKafkaConsumer.
>
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access
> at org.apache.kafka.clients.consumer.KafkaConsumer.
> acquire(KafkaConsumer.java:1431)
> at org.apache.kafka.clients.consumer.KafkaConsumer.close(
> KafkaConsumer.java:1361)
> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$
> anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)
>
> Here is the basic logic:
>
> *Using KafkaRDD* - This takes a list of channels and processes them in
> parallel using the KafkaRDD directly. They each use a distinct consumer
> group (s"$prefix-$topic"), and each has it's own topic and each topic has
> 4 partitions. We routinely get timeout errors when polling for data when
> the poll.ms is less then 2 seconds. This occurs whether we process in
> parallel.
>
> *Example usage with KafkaRDD:*
> val channels = Seq("channel1", "channel2")
>
> channels.toParArray.foreach { channel =>
>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>
>   // Get offsets for the given topic and the consumer group "$prefix-$
> topic"
>   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
>
>   val ds = KafkaUtils.createRDD[K, V](context,
>         kafkaParams asJava,
>         offsetRanges,
>         PreferConsistent).toDS[V]
>
>   // Do some aggregations
>   ds.agg(...)
>   // Save the data
>   ds.write.mode(SaveMode.Append).parquet(somePath)
>   // Save offsets using a KafkaConsumer
>   consumer.commitSync(newOffsets.asJava)
>   consumer.close()
> }
>
> I am not sure why the concurrent issue is there as I have tried to debug
> and also looked at the KafkaConsumer code as well, but everything looks
> like it should not occur. The things to figure out is why when running in
> parallel does this occur and also why the timeouts still occur.
>
> Thanks,
>
> Ivan
>
> On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> There definitely is Kafka documentation indicating that you should use
>> a different consumer group for logically different subscribers, this
>> is really basic to Kafka:
>>
>> http://kafka.apache.org/documentation#intro_consumers
>>
>> As for your comment that "commit async after each RDD, which is not
>> really viable also", how is it not viable?  Again, committing offsets
>> to Kafka doesn't give you reliable delivery semantics unless your
>> downstream data store is idempotent.  If your downstream data store is
>> idempotent, then it shouldn't matter to you when offset commits
>> happen, as long as they happen within a reasonable time after the data
>> is written.
>>
>> Do you want to keep arguing with me, or follow my advice and proceed
>> with debugging any remaining issues after you make the changes I
>> suggested?
>>
>> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy <i...@vadio.com> wrote:
>> > With our stream version, we update the offsets for only the partition we
>> > operating on. We even break down the partition into smaller batches and
>> then
>> > update the offsets after each batch within the partition. With Spark
>> 1.6 and
>> > Kafka 0.8.x this was not an issue, and as Sean pointed out, this is not
>> > necessarily a Spark issue since Kafka no longer allows you to simply
>> update
>> > the offsets for a given consumer group. You have to subscribe or assign
>> > partitions to even do so.
>> >
>> > As for storing the offsets in some other place like a DB, it don't find
>> this
>> > useful because you then can't use tools like Kafka Manager. In order to
>> do
>> > so you would have to store in a DB and the circle back and update Kafka
>> > afterwards. This means you have to keep two sources in sync which is not
>> > really a good idea.
>> >
>> > It is a challenge in Spark to use the Kafka offsets since the drive
>> keeps
>> > subscribed to the topic(s) and consumer group, while the executors
>> prepend
>> > "spark-executor-" to the consumer group. The stream (driver) does allow
>> you
>> > to commit async after each RDD, which is not really viable also. I have
>> not
>> > of implementing an Akka actor system on the driver and send it messages
>> from
>> > the executor code to update the offsets, but then that is asynchronous
>> as
>> > well so not really a good solution.
>> >
>> > I have no idea why Kafka made this change and also why in the parallel
>> > KafkaRDD application we would be advised to use different consumer
>> groups
>> > for each RDD. That seems strange to me that different consumer groups
>> would
>> > be required or advised. There is no Kafka documentation that I know if
>> that
>> > states this. The biggest issue I see with the parallel KafkaRDD is the
>> > timeouts. I have tried to set poll.ms to 30 seconds and still get the
>> issue.
>> > Something is not right here and just not seem right. As I mentioned
>> with the
>> > streaming application, with Spark 1.6 and Kafka 0.8.x we never saw this
>> > issue. We have been running the same basic logic for over a year now
>> without
>> > one hitch at all.
>> >
>> > Ivan
>> >
>> >
>> > On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>> >>
>> >> Someone can correct me, but I'm pretty sure Spark dstreams (in
>> >> general, not just kafka) have been progressing on to the next batch
>> >> after a given batch aborts for quite some time now.  Yet another
>> >> reason I put offsets in my database transactionally.  My jobs throw
>> >> exceptions if the offset in the DB isn't what I expected it to be.
>> >>
>> >>
>> >>
>> >>
>> >> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <grap...@graphex.com>
>> wrote:
>> >> > I've been encountering the same kinds of timeout issues as Ivan,
>> using
>> >> > the "Kafka Stream" approach that he is using, except I'm storing my
>> offsets
>> >> > manually from the driver to Zookeeper in the Kafka 8 format. I
>> haven't yet
>> >> > implemented the KafkaRDD approach, and therefore don't have the
>> concurrency
>> >> > issues, but a very similar use case is coming up for me soon, it's
>> just been
>> >> > backburnered until I can get streaming to be more reliable (I will
>> >> > definitely ensure unique group IDs when I do). Offset commits are
>> certainly
>> >> > more painful in Kafka 0.10, and that doesn't have anything to do
>> with Spark.
>> >> >
>> >> > While i may be able to alleviate the timeout by just increasing it,
>> I've
>> >> > noticed something else that is more worrying: When one task fails 4
>> times in
>> >> > a row (i.e. "Failed to get records for _ after polling for _"),
>> Spark aborts
>> >> > the Stage and Job with "Job aborted due to stage failure: Task _ in
>> stage _
>> >> > failed 4 times". That's fine, and it's the behavior I want, but
>> instead of
>> >> > stopping the Application there (as previous versions of Spark did)
>> the next
>> >> > microbatch marches on and offsets are committed ahead of the failed
>> >> > microbatch. Suddenly my at-least-once app becomes more
>> >> > sometimes-at-least-once which is no good. In order for spark to
>> display that
>> >> > failure, I must be propagating the errors up to Spark, but the
>> behavior of
>> >> > marching forward with the next microbatch seems to be new, and a big
>> >> > potential for data loss in streaming applications.
>> >> >
>> >> > Am I perhaps missing a setting to stop the entire streaming
>> application
>> >> > once spark.task.maxFailures is reached? Has anyone else seen this
>> behavior
>> >> > of a streaming application skipping over failed microbatches?
>> >> >
>> >> > Thanks,
>> >> > Sean
>> >> >
>> >> >
>> >> >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>> >> >>
>> >> >> So basically what I am saying is
>> >> >>
>> >> >> - increase poll.ms
>> >> >> - use a separate group id everywhere
>> >> >> - stop committing offsets under the covers
>> >> >>
>> >> >> That should eliminate all of those as possible causes, and then we
>> can
>> >> >> see if there are still issues.
>> >> >>
>> >> >> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
>> >> >> subscribe to a topic in order to update offsets, Kafka does.  If you
>> >> >> don't like the new Kafka consumer api, the existing 0.8 simple
>> >> >> consumer api should be usable with later brokers.  As long as you
>> >> >> don't need SSL or dynamic subscriptions, and it meets your needs,
>> keep
>> >> >> using it.
>> >> >>
>> >> >> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <i...@vadio.com>
>> wrote:
>> >> >>> Yes, the parallel KafkaRDD uses the same consumer group, but each
>> RDD
>> >> >>> uses a
>> >> >>> single distinct topic. For example, the group would be something
>> like
>> >> >>> "storage-group", and the topics would be "storage-channel1", and
>> >> >>> "storage-channel2". In each thread a KafkaConsumer is started,
>> >> >>> assigned the
>> >> >>> partitions assigned, and then commit offsets are called after the
>> RDD
>> >> >>> is
>> >> >>> processed. This should not interfere with the consumer group used
>> by
>> >> >>> the
>> >> >>> executors which would be "spark-executor-storage-group".
>> >> >>>
>> >> >>> In the streaming example there is a single topic ("client-events")
>> and
>> >> >>> group
>> >> >>> ("processing-group"). A single stream is created and offsets are
>> >> >>> manually
>> >> >>> updated from the executor after each partition is handled. This
>> was a
>> >> >>> challenge since Spark now requires one to assign or subscribe to a
>> >> >>> topic in
>> >> >>> order to even update the offsets. In 0.8.2.x you did not have to
>> worry
>> >> >>> about
>> >> >>> that. This approach limits your exposure to duplicate data since
>> >> >>> idempotent
>> >> >>> records are not entirely possible in our scenario. At least
>> without a
>> >> >>> lot of
>> >> >>> re-running of logic to de-dup.
>> >> >>>
>> >> >>> Thanks,
>> >> >>>
>> >> >>> Ivan
>> >> >>>
>> >> >>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger <c...@koeninger.org
>> >
>> >> >>> wrote:
>> >> >>>>
>> >> >>>> So just to be clear, the answers to my questions are
>> >> >>>>
>> >> >>>> - you are not using different group ids, you're using the same
>> group
>> >> >>>> id everywhere
>> >> >>>>
>> >> >>>> - you are committing offsets manually
>> >> >>>>
>> >> >>>> Right?
>> >> >>>>
>> >> >>>> If you want to eliminate network or kafka misbehavior as a source,
>> >> >>>> tune poll.ms upwards even higher.
>> >> >>>>
>> >> >>>> You must use different group ids for different rdds or streams.
>> >> >>>> Kafka consumers won't behave the way you expect if they are all in
>> >> >>>> the
>> >> >>>> same group id, and the consumer cache is keyed by group id. Yes,
>> the
>> >> >>>> executor will tack "spark-executor-" on to the beginning, but if
>> you
>> >> >>>> give it the same base group id, it will be the same.  And the
>> driver
>> >> >>>> will use the group id you gave it, unmodified.
>> >> >>>>
>> >> >>>> Finally, I really can't help you if you're manually writing your
>> own
>> >> >>>> code to commit offsets directly to Kafka.  Trying to minimize
>> >> >>>> duplicates that way doesn't really make sense, your system must be
>> >> >>>> able to handle duplicates if you're using kafka as an offsets
>> store,
>> >> >>>> it can't do transactional exactly once.
>> >> >>>>
>> >> >>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <i...@vadio.com> wrote:
>> >> >>>>> Here are some examples and details of the scenarios. The
>> KafkaRDD is
>> >> >>>>> the
>> >> >>>>> most
>> >> >>>>> error prone to polling
>> >> >>>>> timeouts and concurrentm modification errors.
>> >> >>>>>
>> >> >>>>> *Using KafkaRDD* - This takes a list of channels and processes
>> them
>> >> >>>>> in
>> >> >>>>> parallel using the KafkaRDD directly. they all use the same
>> consumer
>> >> >>>>> group
>> >> >>>>> ('storage-group'), but each has it's own topic and each topic
>> has 4
>> >> >>>>> partitions. We routinely get timeout errors when polling for
>> data.
>> >> >>>>> This
>> >> >>>>> occurs whether we process in parallel or sequentially.
>> >> >>>>>
>> >> >>>>> *Spark Kafka setting:*
>> >> >>>>> spark.streaming.kafka.consumer.poll.ms=2000
>> >> >>>>>
>> >> >>>>> *Kafka Consumer Params:*
>> >> >>>>> metric.reporters = []
>> >> >>>>> metadata.max.age.ms = 300000
>> >> >>>>> partition.assignment.strategy =
>> >> >>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
>> >> >>>>> reconnect.backoff.ms = 50
>> >> >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
>> >> >>>>> max.partition.fetch.bytes = 1048576
>> >> >>>>> bootstrap.servers = [somemachine:31000]
>> >> >>>>> ssl.keystore.type = JKS
>> >> >>>>> enable.auto.commit = false
>> >> >>>>> sasl.mechanism = GSSAPI
>> >> >>>>> interceptor.classes = null
>> >> >>>>> exclude.internal.topics = true
>> >> >>>>> ssl.truststore.password = null
>> >> >>>>> client.id =
>> >> >>>>> ssl.endpoint.identification.algorithm = null
>> >> >>>>> max.poll.records = 1000
>> >> >>>>> check.crcs = true
>> >> >>>>> request.timeout.ms = 40000
>> >> >>>>> heartbeat.interval.ms = 3000
>> >> >>>>> auto.commit.interval.ms = 5000
>> >> >>>>> receive.buffer.bytes = 65536
>> >> >>>>> ssl.truststore.type = JKS
>> >> >>>>> ssl.truststore.location = null
>> >> >>>>> ssl.keystore.password = null
>> >> >>>>> fetch.min.bytes = 1
>> >> >>>>> send.buffer.bytes = 131072
>> >> >>>>> value.deserializer = class
>> >> >>>>> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeser
>> ializer
>> >> >>>>> group.id = storage-group
>> >> >>>>> retry.backoff.ms = 100
>> >> >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> >> >>>>> sasl.kerberos.service.name = null
>> >> >>>>> sasl.kerberos.ticket.renew.jitter = 0.05
>> >> >>>>> ssl.trustmanager.algorithm = PKIX
>> >> >>>>> ssl.key.password = null
>> >> >>>>> fetch.max.wait.ms = 500
>> >> >>>>> sasl.kerberos.min.time.before.relogin = 60000
>> >> >>>>> connections.max.idle.ms = 540000
>> >> >>>>> session.timeout.ms = 30000
>> >> >>>>> metrics.num.samples = 2
>> >> >>>>> key.deserializer = class
>> >> >>>>> org.apache.kafka.common.serialization.StringDeserializer
>> >> >>>>> ssl.protocol = TLS
>> >> >>>>> ssl.provider = null
>> >> >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> >> >>>>> ssl.keystore.location = null
>> >> >>>>> ssl.cipher.suites = null
>> >> >>>>> security.protocol = PLAINTEXT
>> >> >>>>> ssl.keymanager.algorithm = SunX509
>> >> >>>>> metrics.sample.window.ms = 30000
>> >> >>>>> auto.offset.reset = earliest
>> >> >>>>>
>> >> >>>>> *Example usage with KafkaRDD:*
>> >> >>>>> val channels = Seq("channel1", "channel2")
>> >> >>>>>
>> >> >>>>> channels.toParArray.foreach { channel =>
>> >> >>>>>  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>> >> >>>>>
>> >> >>>>>  // Get offsets for the given topic and the consumer group
>> >> >>>>> 'storage-group'
>> >> >>>>>  val offsetRanges = getOffsets("storage-group", channel)
>> >> >>>>>
>> >> >>>>>  val ds = KafkaUtils.createRDD[K, V](context,
>> >> >>>>>        kafkaParams asJava,
>> >> >>>>>        offsetRanges,
>> >> >>>>>        PreferConsistent).toDS[V]
>> >> >>>>>
>> >> >>>>>  // Do some aggregations
>> >> >>>>>  ds.agg(...)
>> >> >>>>>  // Save the data
>> >> >>>>>  ds.write.mode(SaveMode.Append).parquet(somePath)
>> >> >>>>>  // Save offsets using a KafkaConsumer
>> >> >>>>>  consumer.commitSync(newOffsets.asJava)
>> >> >>>>>  consumer.close()
>> >> >>>>> }
>> >> >>>>>
>> >> >>>>>
>> >> >>>>> *Example usage with Kafka Stream:*
>> >> >>>>> This creates a stream and processes events in each partition. At
>> the
>> >> >>>>> end
>> >> >>>>> of
>> >> >>>>> processing for
>> >> >>>>> each partition, we updated the offsets for each partition. This
>> is
>> >> >>>>> challenging to do, but is better
>> >> >>>>> then calling commitAysnc on the stream, because that occurs after
>> >> >>>>> the
>> >> >>>>> /entire/ RDD has been
>> >> >>>>> processed. This method minimizes duplicates in an exactly once
>> >> >>>>> environment.
>> >> >>>>> Since the executors
>> >> >>>>> use their own custom group "spark-executor-processor-group" and
>> the
>> >> >>>>> commit
>> >> >>>>> is buried in private
>> >> >>>>> functions we are unable to use the executors cached consumer to
>> >> >>>>> update
>> >> >>>>> the
>> >> >>>>> offsets. This requires us
>> >> >>>>> to go through multiple steps to update the Kafka offsets
>> >> >>>>> accordingly.
>> >> >>>>>
>> >> >>>>> val offsetRanges = getOffsets("processor-group", "my-topic")
>> >> >>>>>
>> >> >>>>> val stream = KafkaUtils.createDirectStream[K, V](context,
>> >> >>>>>      PreferConsistent,
>> >> >>>>>      Subscribe[K, V](Seq("my-topic") asJavaCollection,
>> >> >>>>>        kafkaParams,
>> >> >>>>>        offsetRanges))
>> >> >>>>>
>> >> >>>>> stream.foreachRDD { rdd =>
>> >> >>>>>    val offsetRanges = rdd.asInstanceOf[HasOffsetRang
>> es].offsetRanges
>> >> >>>>>
>> >> >>>>>    // Transform our data
>> >> >>>>>   rdd.foreachPartition { events =>
>> >> >>>>>       // Establish a consumer in the executor so we can update
>> >> >>>>> offsets
>> >> >>>>> after each partition.
>> >> >>>>>       // This class is homegrown and uses the KafkaConsumer to
>> help
>> >> >>>>> get/set
>> >> >>>>> offsets
>> >> >>>>>       val consumer = new ConsumerUtils(kafkaParams)
>> >> >>>>>       // do something with our data
>> >> >>>>>
>> >> >>>>>       // Write the offsets that were updated in this partition
>> >> >>>>>       kafkaConsumer.setConsumerOffsets("processor-group",
>> >> >>>>>          Map(TopicAndPartition(tp.topic, tp.partition) ->
>> >> >>>>> endOffset))
>> >> >>>>>   }
>> >> >>>>> }
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>
>> >> >>>>> --
>> >> >>>>> View this message in context:
>> >> >>>>>
>> >> >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Instabil
>> ity-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
>> >> >>>>> Sent from the Apache Spark User List mailing list archive at
>> >> >>>>> Nabble.com.
>> >> >>>>>
>> >> >>>>>
>> >> >>>>> ------------------------------------------------------------
>> ---------
>> >> >>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >> >>>>>
>> >> >>>
>> >> >>>
>> >> >>
>> >> >> ------------------------------------------------------------
>> ---------
>> >> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >> >>
>> >> >
>> >
>> >
>>
>
>

Reply via email to