Madhav Madhu created KAFKA-14219: ------------------------------------ Summary: Kafka messages are not read in certain bytes or message count in batches when read from Spark Key: KAFKA-14219 URL: https://issues.apache.org/jira/browse/KAFKA-14219 Project: Kafka Issue Type: Bug Components: config, consumer Affects Versions: 3.2.0 Reporter: Madhav Madhu
Spark Kafka consumer is unable to read messages, of a certain size or count in batches. I have tried few approaches as mentioned in Kafka docs but with no success. Here is a link to Stack Overflow where I asked the same question with no response and think this is a possible bug here. Same configuration works fine when the consumer is a java code. https://stackoverflow.com/questions/73398533/spark-streaming-context-kafka-consumer-read-messages-of-a-certain-byte-size-in Here is the consumer code which fetches data from Kafka, {code:java} val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(10)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "localhost:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "test", "fetch.max.bytes" -> "65536", "max.partition.fetch.bytes" -> "8192", "max.poll.records" -> "100", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean), "sasl.jaas.config"-> "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";", "sasl.mechanism" -> "PLAIN", "security.protocol" -> "SASL_PLAINTEXT", ) val topics = Array("test.topic") val stream = KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges println(offsetRanges.foreach(a => println(a.topic + ":" + a.partition + ":" + a.fromOffset + ":" + a.untilOffset + ":" + a.count()))) val df = rdd.map(a => a.value().split(",")).toDF() val selectCols = columns.indices.map(i => $"value"(i)) var newDF = df.select(selectCols: _*).toDF(columns: _*) // Some business operations here and then write to back to kafka. newDF.write .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("topic", "topic.ouput") .option("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";") .option("kafka.sasl.mechanism", "PLAIN") .option("kafka.security.protocol", "SASL_PLAINTEXT") .save() stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) sparkSession.catalog.clearCache() } streamingContext.start() streamingContext.awaitTermination() {code} Output: {code:java} test.topic:6:1345075:4163058:2817983 test.topic:0:1339456:4144190:2804734 test.topic:3:1354266:4189336:2835070 test.topic:7:1353542:4186148:2832606 test.topic:5:1355140:4189071:2833931 test.topic:2:1351162:4173375:2822213 test.topic:1:1352801:4184073:2831272 test.topic:4:1348558:4166749:2818191 () test.topic:6:4163058:4163058:0 test.topic:0:4144190:4144190:0 test.topic:3:4189336:4189336:0 test.topic:7:4186148:4186148:0 test.topic:5:4189071:4189071:0 test.topic:2:4173375:4173375:0 test.topic:1:4184073:4184073:0 test.topic:4:4166749:4166749:0 {code} I tried different options as followed, Option 1: Topic Partition 8 Streaming Context 1 sec: "fetch.max.bytes" -> "65536", // 64 Kb "max.partition.fetch.bytes" -> "8192" // 8Kb "max.poll.records" -> "100" DataFrame count which it read from Kafka in the very first batch: 1200000 Option 2: Partition 1 Streaming Context 1 sec "fetch.max.bytes" -> "65536", "max.partition.fetch.bytes" -> "8192" "max.poll.records" -> "100" Kafka Lag: 126360469 DataFrame count which it read from Kafka in the very first batch: 126360469. -- This message was sent by Atlassian Jira (v8.20.10#820010)