This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d69df596e512 [SPARK-47738][BUILD] Upgrade Kafka to 3.7.0
d69df596e512 is described below

commit d69df596e5124ef9ea744549b21c28c9d1d00704
Author: panbingkun <panbing...@baidu.com>
AuthorDate: Sat Apr 6 10:48:21 2024 -0700

    [SPARK-47738][BUILD] Upgrade Kafka to 3.7.0
    
    ### What changes were proposed in this pull request?
    The pr aims to upgrade `Kafka` from `3.6.1` to `3.7.0`.
    
    ### Why are the changes needed?
    https://downloads.apache.org/kafka/3.7.0/RELEASE_NOTES.html
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    Pass GA.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #45893 from panbingkun/SPARK-47738.
    
    Authored-by: panbingkun <panbing...@baidu.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala    | 3 +--
 .../scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala   | 3 +--
 pom.xml                                                                | 2 +-
 3 files changed, 3 insertions(+), 5 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
index 10d5062848b5..ab41e53d8ffb 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
@@ -24,7 +24,6 @@ import scala.jdk.CollectionConverters._
 
 import org.apache.kafka.clients.admin.Admin
 import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer}
-import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.internal.Logging
@@ -127,7 +126,7 @@ private[kafka010] case class 
SubscribePatternStrategy(topicPattern: String)
       kafkaParams: ju.Map[String, Object]): Consumer[Array[Byte], Array[Byte]] 
= {
     val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
     val consumer = new KafkaConsumer[Array[Byte], 
Array[Byte]](updatedKafkaParams)
-    consumer.subscribe(ju.regex.Pattern.compile(topicPattern), new 
NoOpConsumerRebalanceListener())
+    consumer.subscribe(ju.regex.Pattern.compile(topicPattern))
     consumer
   }
 
diff --git 
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
 
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
index a0b0e92666eb..693ddd31d9a8 100644
--- 
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
+++ 
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -23,7 +23,6 @@ import java.util.Locale
 import scala.jdk.CollectionConverters._
 
 import org.apache.kafka.clients.consumer._
-import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.internal.Logging
@@ -147,7 +146,7 @@ private case class SubscribePattern[K, V](
   def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] 
= {
     val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
     val consumer = new KafkaConsumer[K, V](updatedKafkaParams)
-    consumer.subscribe(pattern, new NoOpConsumerRebalanceListener())
+    consumer.subscribe(pattern)
     val toSeek = if (currentOffsets.isEmpty) {
       offsets
     } else {
diff --git a/pom.xml b/pom.xml
index 5fe86dd80b2a..9b51548e1c0f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,7 +137,7 @@
     <!-- Version used for internal directory structure -->
     <hive.version.short>2.3</hive.version.short>
     <!-- note that this should be compatible with Kafka brokers version 0.10 
and up -->
-    <kafka.version>3.6.1</kafka.version>
+    <kafka.version>3.7.0</kafka.version>
     <!-- After 10.17.1.0, the minimum required version is JDK19 -->
     <derby.version>10.16.1.1</derby.version>
     <parquet.version>1.13.1</parquet.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to