This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b702f63bed27 [SPARK-45502][BUILD] Upgrade Kafka to 3.6.1
b702f63bed27 is described below

commit b702f63bed27b73bae748e232236da2f2ed19dfb
Author: dengziming <dengziming1...@gmail.com>
AuthorDate: Sat Dec 16 14:17:39 2023 -0800

    [SPARK-45502][BUILD] Upgrade Kafka to 3.6.1
    
    ### What changes were proposed in this pull request?
    Upgrade Apache Kafka from 3.4.1 to 3.6.1
    
    ### Why are the changes needed?
    
    - https://downloads.apache.org/kafka/3.6.1/RELEASE_NOTES.html
    - https://downloads.apache.org/kafka/3.6.0/RELEASE_NOTES.html
    - https://downloads.apache.org/kafka/3.5.0/RELEASE_NOTES.html
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    GitHub CI.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43348 from dengziming/kafka-3.6.0.
    
    Authored-by: dengziming <dengziming1993gmail.com>
    Signed-off-by: Dongjoon Hyun <dhyunapple.com>
    
    Closes #44312 from dengziming/kafka-3.6.1.
    
    Authored-by: dengziming <dengziming1...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  |  6 ++++--
 .../apache/spark/sql/kafka010/KafkaTestUtils.scala |  4 ++--
 .../spark/streaming/kafka010/KafkaRDDSuite.scala   | 16 ++++++++------
 .../spark/streaming/kafka010/KafkaTestUtils.scala  |  4 ++--
 .../streaming/kafka010/mocks/MockScheduler.scala   | 25 +++++++++++-----------
 pom.xml                                            |  2 +-
 6 files changed, 30 insertions(+), 27 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 02e4e909734a..5b4567aa2881 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -154,7 +154,7 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSparkSession with K
       }
 
       val offset = KafkaSourceOffset(testUtils.getLatestOffsets(topics))
-      logInfo(s"Added data, expected offset $offset")
+      logInfo(s"Added data to topic: $topic, expected offset: $offset")
       (kafkaSource, offset)
     }
 
@@ -2691,7 +2691,9 @@ class KafkaSourceStressSuite extends KafkaSourceTest {
     start + Random.nextInt(start + end - 1)
   }
 
-  test("stress test with multiple topics and partitions")  {
+  override val brokerProps = Map("auto.create.topics.enable" -> "false")
+
+  test("stress test with multiple topics and partitions") {
     topics.foreach { topic =>
       testUtils.createTopic(topic, partitions = nextInt(1, 6))
       testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray)
diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 1fa1dda9faf2..64e54ad63bdc 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -28,7 +28,6 @@ import scala.io.Source
 import scala.jdk.CollectionConverters._
 
 import com.google.common.io.Files
-import kafka.api.Request
 import kafka.server.{HostedPartition, KafkaConfig, KafkaServer}
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.zk.KafkaZkClient
@@ -40,6 +39,7 @@ import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.requests.FetchRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol.{PLAINTEXT, 
SASL_PLAINTEXT}
 import org.apache.kafka.common.serialization.StringSerializer
 import org.apache.kafka.common.utils.SystemTime
@@ -603,7 +603,7 @@ class KafkaTestUtils(
         .getPartitionInfo(topic, partition) match {
       case Some(partitionState) =>
         zkClient.getLeaderForPartition(new TopicPartition(topic, 
partition)).isDefined &&
-          Request.isValidBrokerId(partitionState.leader) &&
+          FetchRequest.isValidBrokerId(partitionState.leader) &&
           !partitionState.replicas.isEmpty
 
       case _ =>
diff --git 
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
 
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
index 7994f46992c9..a97ee71ef4fe 100644
--- 
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
+++ 
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
@@ -24,12 +24,14 @@ import scala.concurrent.duration._
 import scala.jdk.CollectionConverters._
 import scala.util.Random
 
-import kafka.log.{CleanerConfig, LogCleaner, LogConfig, 
ProducerStateManagerConfig, UnifiedLog}
-import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
+import kafka.log.{LogCleaner, UnifiedLog}
+import kafka.server.BrokerTopicStats
 import kafka.utils.Pool
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
SimpleRecord}
 import org.apache.kafka.common.serialization.StringDeserializer
+import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, 
LogDirFailureChannel, ProducerStateManagerConfig}
 import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
 
 import org.apache.spark._
@@ -90,13 +92,13 @@ class KafkaRDDSuite extends SparkFunSuite {
     val dir = new File(logDir, topic + "-" + partition)
     dir.mkdirs()
     val logProps = new ju.Properties()
-    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
-    logProps.put(LogConfig.MinCleanableDirtyRatioProp, 
java.lang.Float.valueOf(0.1f))
+    logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_COMPACT)
+    logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 
java.lang.Float.valueOf(0.1f))
     val logDirFailureChannel = new LogDirFailureChannel(1)
     val topicPartition = new TopicPartition(topic, partition)
     val producerIdExpirationMs = Int.MaxValue
-    val producerStateManagerConfig = new 
ProducerStateManagerConfig(producerIdExpirationMs)
-    val logConfig = LogConfig(logProps)
+    val producerStateManagerConfig = new 
ProducerStateManagerConfig(producerIdExpirationMs, false)
+    val logConfig = new LogConfig(logProps)
     val log = UnifiedLog(
       dir,
       logConfig,
@@ -120,7 +122,7 @@ class KafkaRDDSuite extends SparkFunSuite {
     log.roll()
     logs.put(topicPartition, log)
 
-    val cleaner = new LogCleaner(CleanerConfig(), Array(dir), logs, 
logDirFailureChannel)
+    val cleaner = new LogCleaner(new CleanerConfig(false), Array(dir), logs, 
logDirFailureChannel)
     cleaner.startup()
     cleaner.awaitCleaned(new TopicPartition(topic, partition), 
log.activeSegment.baseOffset, 1000)
 
diff --git 
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index 46a6fbcf2c36..3916a84f1107 100644
--- 
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ 
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -27,12 +27,12 @@ import scala.annotation.tailrec
 import scala.jdk.CollectionConverters._
 import scala.util.control.NonFatal
 
-import kafka.api.Request
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.requests.FetchRequest
 import org.apache.kafka.common.serialization.StringSerializer
 import org.apache.kafka.common.utils.{Time => KTime}
 import org.apache.zookeeper.client.ZKClientConfig
@@ -304,7 +304,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
         val leader = partitionState.leader
         val isr = partitionState.isr
         zkClient.getLeaderForPartition(new TopicPartition(topic, 
partition)).isDefined &&
-          Request.isValidBrokerId(leader) && !isr.isEmpty
+          FetchRequest.isValidBrokerId(leader) && !isr.isEmpty
       case _ =>
         false
     }
diff --git 
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
 
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
index c0724909bc35..1b7e92a03604 100644
--- 
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
+++ 
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala
@@ -19,8 +19,8 @@ package org.apache.spark.streaming.kafka010.mocks
 
 import java.util.concurrent.{ScheduledFuture, TimeUnit}
 
-import kafka.utils.Scheduler
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.util.Scheduler
 import org.jmock.lib.concurrent.DeterministicScheduler
 
 /**
@@ -42,8 +42,6 @@ private[kafka010] class MockScheduler(val time: Time) extends 
Scheduler {
 
   val scheduler = new DeterministicScheduler()
 
-  def isStarted: Boolean = true
-
   def startup(): Unit = {}
 
   def shutdown(): Unit = synchronized {
@@ -56,17 +54,18 @@ private[kafka010] class MockScheduler(val time: Time) 
extends Scheduler {
 
   def schedule(
       name: String,
-      fun: () => Unit,
-      delay: Long = 0,
-      period: Long = -1,
-      unit: TimeUnit = TimeUnit.MILLISECONDS): ScheduledFuture[_] = 
synchronized {
-    val runnable = new Runnable {
-      override def run(): Unit = fun()
-    }
-    if (period >= 0) {
-      scheduler.scheduleAtFixedRate(runnable, delay, period, unit)
+      task: Runnable,
+      delayMs: Long = 0,
+      periodMs: Long = -1): ScheduledFuture[_] = synchronized {
+    if (periodMs >= 0) {
+      scheduler.scheduleAtFixedRate(task, delayMs, periodMs, 
TimeUnit.MILLISECONDS)
     } else {
-      scheduler.schedule(runnable, delay, unit)
+      scheduler.schedule(task, delayMs, TimeUnit.MILLISECONDS)
     }
   }
+
+  override def resizeThreadPool(i: Int): Unit = {
+
+  }
+
 }
diff --git a/pom.xml b/pom.xml
index c97c74ce5707..b95f72f8a5c6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -137,7 +137,7 @@
     <!-- Version used for internal directory structure -->
     <hive.version.short>2.3</hive.version.short>
     <!-- note that this should be compatible with Kafka brokers version 0.10 
and up -->
-    <kafka.version>3.4.1</kafka.version>
+    <kafka.version>3.6.1</kafka.version>
     <!-- After 10.17.1.0, the minimum required version is JDK19 -->
     <derby.version>10.16.1.1</derby.version>
     <parquet.version>1.13.1</parquet.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to