It seems, I found the issue. The actual problem is something related to
back pressure. When I am adding these config
*spark.streaming.kafka.maxRatePerPartition* or
*spark.streaming.backpressure.initialRate* (the of these configs are 100).
After that it starts consuming one message per partition per batch. Not why
it's happening.


On Thu, Apr 2, 2020 at 8:48 AM Waleed Fateem <waleed.fat...@gmail.com>
wrote:

> Well this is interesting. Not sure if this is the expected behavior. The
> log messages you have referenced are actually printed out by the Kafka
> Consumer itself (org.apache.kafka.clients.consumer.internals.Fetcher).
>
> That log message belongs to a new feature added starting with Kafka 1.1:
> https://issues.apache.org/jira/browse/KAFKA-6397
>
> I'm assuming then that you're using Spark 2.4?
>
> From Kafka's perspective, when you do a describe on your
> demandIngestion.SLTarget topic, does that look okay? All partitions are
> available with a valid leader.
>
> The other thing I'm curious about, after you
> enabled spark.streaming.kafka.allowNonConsecutiveOffsets, did you try going
> back to the older group.id and do you see the same behavior? Was there a
> reason you chose to start reading again from the beginning by using a new
> consumer group rather then sticking to the same consumer group?
>
> In your application, are you manually committing offsets to Kafka?
>
> Regards,
>
> Waleed
>
> On Wed, Apr 1, 2020 at 1:31 AM Hrishikesh Mishra <sd.hri...@gmail.com>
> wrote:
>
>> Hi
>>
>> Our Spark streaming job was working fine as expected (the number of
>> events to process in a batch). But due to some reasons, we added compaction
>> on Kafka topic and restarted the job. But after restart it was failing for
>> below reason:
>>
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 16 in stage 2.0 failed 4 times, most recent failure: Lost task 16.3 in
>> stage 2.0 (TID 231, 10.34.29.38, executor 4):
>> java.lang.IllegalArgumentException: requirement failed: Got wrong record
>> for spark-executor-pc-nfr-loop-31-march-2020-4 demandIngestion.SLTarget-39
>> even after seeking to offset 106847 got offset 199066 instead. If this is a
>> compacted topic, consider enabling
>> spark.streaming.kafka.allowNonConsecutiveOffsets
>>   at scala.Predef$.require(Predef.scala:224)
>>   at
>> org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:146)
>>
>>
>>
>> So, I added spark.streaming.kafka.allowNonConsecutiveOffsets: true  in
>> spark config and I changed the group name to consume from beginning. Now
>> the problem is, it reading only one message from per partition. So if a
>> topic has 50 partitions then its reading 50 message per batch (batch
>> duration is 5 sec).
>>
>> The topic is 1M records and consumer has huge lag.
>>
>>
>> Driver log which fetches 1 message per partition.
>>
>> 20/03/31 18:25:55 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211951.
>> 20/03/31 18:26:00 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211952.
>> 20/03/31 18:26:05 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211953.
>> 20/03/31 18:26:10 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211954.
>> 20/03/31 18:26:15 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211955.
>> 20/03/31 18:26:20 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211956.
>> 20/03/31 18:26:25 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211957.
>> 20/03/31 18:26:30 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211958.
>> 20/03/31 18:26:35 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211959.
>> 20/03/31 18:26:40 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211960.
>> 20/03/31 18:26:45 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211961.
>> 20/03/31 18:26:50 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211962.
>> 20/03/31 18:26:55 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211963.
>> 20/03/31 18:27:00 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211964.
>> 20/03/31 18:27:05 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211965.
>> 20/03/31 18:27:10 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211966.
>> 20/03/31 18:27:15 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211967.
>> 20/03/31 18:27:20 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211968.
>> 20/03/31 18:27:25 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45 to offset 211969.
>> 20/03/31 18:27:30 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4]
>> Resetting offset for partition demandIngestion.SLTarget-45  to offset
>> 211970.
>>
>>
>>
>> Spark Config (batch.duration: 5, using Spark Stream) :
>>
>>   spark.shuffle.service.enabled: "true"
>>
>>   spark.streaming.backpressure.enabled: "true"
>>
>>   spark.streaming.concurrentJobs: "1"
>>
>>   spark.executor.extraJavaOptions: "-XX:+UseConcMarkSweepGC"
>>
>>   spark.streaming.backpressure.pid.minRate: 1500
>>
>>   spark.streaming.backpressure.initialRate: 100
>>
>>   spark.streaming.kafka.allowNonConsecutiveOffsets: true
>>
>>
>>
>> Is there any issue in my configuration or something special required with
>> compact Kafka topic which I'm missing?
>>
>>
>>
>>
>> Regards
>> Hrishi
>>
>>

Reply via email to