Hi Our Spark streaming job was working fine as expected (the number of events to process in a batch). But due to some reasons, we added compaction on Kafka topic and restarted the job. But after restart it was failing for below reason:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 16 in stage 2.0 failed 4 times, most recent failure: Lost task 16.3 in stage 2.0 (TID 231, 10.34.29.38, executor 4): java.lang.IllegalArgumentException: requirement failed: Got wrong record for spark-executor-pc-nfr-loop-31-march-2020-4 demandIngestion.SLTarget-39 even after seeking to offset 106847 got offset 199066 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets at scala.Predef$.require(Predef.scala:224) at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:146) So, I added spark.streaming.kafka.allowNonConsecutiveOffsets: true in spark config and I changed the group name to consume from beginning. Now the problem is, it reading only one message from per partition. So if a topic has 50 partitions then its reading 50 message per batch (batch duration is 5 sec). The topic is 1M records and consumer has huge lag. Driver log which fetches 1 message per partition. 20/03/31 18:25:55 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211951. 20/03/31 18:26:00 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211952. 20/03/31 18:26:05 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211953. 20/03/31 18:26:10 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211954. 20/03/31 18:26:15 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211955. 20/03/31 18:26:20 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211956. 20/03/31 18:26:25 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211957. 20/03/31 18:26:30 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211958. 20/03/31 18:26:35 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211959. 20/03/31 18:26:40 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211960. 20/03/31 18:26:45 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211961. 20/03/31 18:26:50 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211962. 20/03/31 18:26:55 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211963. 20/03/31 18:27:00 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211964. 20/03/31 18:27:05 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211965. 20/03/31 18:27:10 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211966. 20/03/31 18:27:15 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211967. 20/03/31 18:27:20 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211968. 20/03/31 18:27:25 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211969. 20/03/31 18:27:30 INFO Fetcher: [groupId=pc-nfr-loop-31-march-2020-4] Resetting offset for partition demandIngestion.SLTarget-45 to offset 211970. Spark Config (batch.duration: 5, using Spark Stream) : spark.shuffle.service.enabled: "true" spark.streaming.backpressure.enabled: "true" spark.streaming.concurrentJobs: "1" spark.executor.extraJavaOptions: "-XX:+UseConcMarkSweepGC" spark.streaming.backpressure.pid.minRate: 1500 spark.streaming.backpressure.initialRate: 100 spark.streaming.kafka.allowNonConsecutiveOffsets: true Is there any issue in my configuration or something special required with compact Kafka topic which I'm missing? Regards Hrishi