This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b702f63bed27 [SPARK-45502][BUILD] Upgrade Kafka to 3.6.1 b702f63bed27 is described below commit b702f63bed27b73bae748e232236da2f2ed19dfb Author: dengziming <dengziming1...@gmail.com> AuthorDate: Sat Dec 16 14:17:39 2023 -0800 [SPARK-45502][BUILD] Upgrade Kafka to 3.6.1 ### What changes were proposed in this pull request? Upgrade Apache Kafka from 3.4.1 to 3.6.1 ### Why are the changes needed? - https://downloads.apache.org/kafka/3.6.1/RELEASE_NOTES.html - https://downloads.apache.org/kafka/3.6.0/RELEASE_NOTES.html - https://downloads.apache.org/kafka/3.5.0/RELEASE_NOTES.html ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? GitHub CI. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43348 from dengziming/kafka-3.6.0. Authored-by: dengziming <dengziming1993gmail.com> Signed-off-by: Dongjoon Hyun <dhyunapple.com> Closes #44312 from dengziming/kafka-3.6.1. Authored-by: dengziming <dengziming1...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 6 ++++-- .../apache/spark/sql/kafka010/KafkaTestUtils.scala | 4 ++-- .../spark/streaming/kafka010/KafkaRDDSuite.scala | 16 ++++++++------ .../spark/streaming/kafka010/KafkaTestUtils.scala | 4 ++-- .../streaming/kafka010/mocks/MockScheduler.scala | 25 +++++++++++----------- pom.xml | 2 +- 6 files changed, 30 insertions(+), 27 deletions(-) diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 02e4e909734a..5b4567aa2881 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -154,7 +154,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSparkSession with K } val offset = KafkaSourceOffset(testUtils.getLatestOffsets(topics)) - logInfo(s"Added data, expected offset $offset") + logInfo(s"Added data to topic: $topic, expected offset: $offset") (kafkaSource, offset) } @@ -2691,7 +2691,9 @@ class KafkaSourceStressSuite extends KafkaSourceTest { start + Random.nextInt(start + end - 1) } - test("stress test with multiple topics and partitions") { + override val brokerProps = Map("auto.create.topics.enable" -> "false") + + test("stress test with multiple topics and partitions") { topics.foreach { topic => testUtils.createTopic(topic, partitions = nextInt(1, 6)) testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray) diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 1fa1dda9faf2..64e54ad63bdc 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -28,7 +28,6 @@ import scala.io.Source import scala.jdk.CollectionConverters._ import com.google.common.io.Files -import kafka.api.Request import kafka.server.{HostedPartition, KafkaConfig, KafkaServer} import kafka.server.checkpoints.OffsetCheckpointFile import kafka.zk.KafkaZkClient @@ -40,6 +39,7 @@ import org.apache.kafka.clients.producer._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.security.auth.SecurityProtocol.{PLAINTEXT, SASL_PLAINTEXT} import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.common.utils.SystemTime @@ -603,7 +603,7 @@ class KafkaTestUtils( .getPartitionInfo(topic, partition) match { case Some(partitionState) => zkClient.getLeaderForPartition(new TopicPartition(topic, partition)).isDefined && - Request.isValidBrokerId(partitionState.leader) && + FetchRequest.isValidBrokerId(partitionState.leader) && !partitionState.replicas.isEmpty case _ => diff --git a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index 7994f46992c9..a97ee71ef4fe 100644 --- a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -24,12 +24,14 @@ import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ import scala.util.Random -import kafka.log.{CleanerConfig, LogCleaner, LogConfig, ProducerStateManagerConfig, UnifiedLog} -import kafka.server.{BrokerTopicStats, LogDirFailureChannel} +import kafka.log.{LogCleaner, UnifiedLog} +import kafka.server.BrokerTopicStats import kafka.utils.Pool import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig} import org.scalatest.concurrent.Eventually.{eventually, interval, timeout} import org.apache.spark._ @@ -90,13 +92,13 @@ class KafkaRDDSuite extends SparkFunSuite { val dir = new File(logDir, topic + "-" + partition) dir.mkdirs() val logProps = new ju.Properties() - logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) - logProps.put(LogConfig.MinCleanableDirtyRatioProp, java.lang.Float.valueOf(0.1f)) + logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) + logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, java.lang.Float.valueOf(0.1f)) val logDirFailureChannel = new LogDirFailureChannel(1) val topicPartition = new TopicPartition(topic, partition) val producerIdExpirationMs = Int.MaxValue - val producerStateManagerConfig = new ProducerStateManagerConfig(producerIdExpirationMs) - val logConfig = LogConfig(logProps) + val producerStateManagerConfig = new ProducerStateManagerConfig(producerIdExpirationMs, false) + val logConfig = new LogConfig(logProps) val log = UnifiedLog( dir, logConfig, @@ -120,7 +122,7 @@ class KafkaRDDSuite extends SparkFunSuite { log.roll() logs.put(topicPartition, log) - val cleaner = new LogCleaner(CleanerConfig(), Array(dir), logs, logDirFailureChannel) + val cleaner = new LogCleaner(new CleanerConfig(false), Array(dir), logs, logDirFailureChannel) cleaner.startup() cleaner.awaitCleaned(new TopicPartition(topic, partition), log.activeSegment.baseOffset, 1000) diff --git a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index 46a6fbcf2c36..3916a84f1107 100644 --- a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -27,12 +27,12 @@ import scala.annotation.tailrec import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal -import kafka.api.Request import kafka.server.{KafkaConfig, KafkaServer} import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.common.utils.{Time => KTime} import org.apache.zookeeper.client.ZKClientConfig @@ -304,7 +304,7 @@ private[kafka010] class KafkaTestUtils extends Logging { val leader = partitionState.leader val isr = partitionState.isr zkClient.getLeaderForPartition(new TopicPartition(topic, partition)).isDefined && - Request.isValidBrokerId(leader) && !isr.isEmpty + FetchRequest.isValidBrokerId(leader) && !isr.isEmpty case _ => false } diff --git a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala index c0724909bc35..1b7e92a03604 100644 --- a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala +++ b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala @@ -19,8 +19,8 @@ package org.apache.spark.streaming.kafka010.mocks import java.util.concurrent.{ScheduledFuture, TimeUnit} -import kafka.utils.Scheduler import org.apache.kafka.common.utils.Time +import org.apache.kafka.server.util.Scheduler import org.jmock.lib.concurrent.DeterministicScheduler /** @@ -42,8 +42,6 @@ private[kafka010] class MockScheduler(val time: Time) extends Scheduler { val scheduler = new DeterministicScheduler() - def isStarted: Boolean = true - def startup(): Unit = {} def shutdown(): Unit = synchronized { @@ -56,17 +54,18 @@ private[kafka010] class MockScheduler(val time: Time) extends Scheduler { def schedule( name: String, - fun: () => Unit, - delay: Long = 0, - period: Long = -1, - unit: TimeUnit = TimeUnit.MILLISECONDS): ScheduledFuture[_] = synchronized { - val runnable = new Runnable { - override def run(): Unit = fun() - } - if (period >= 0) { - scheduler.scheduleAtFixedRate(runnable, delay, period, unit) + task: Runnable, + delayMs: Long = 0, + periodMs: Long = -1): ScheduledFuture[_] = synchronized { + if (periodMs >= 0) { + scheduler.scheduleAtFixedRate(task, delayMs, periodMs, TimeUnit.MILLISECONDS) } else { - scheduler.schedule(runnable, delay, unit) + scheduler.schedule(task, delayMs, TimeUnit.MILLISECONDS) } } + + override def resizeThreadPool(i: Int): Unit = { + + } + } diff --git a/pom.xml b/pom.xml index c97c74ce5707..b95f72f8a5c6 100644 --- a/pom.xml +++ b/pom.xml @@ -137,7 +137,7 @@ <!-- Version used for internal directory structure --> <hive.version.short>2.3</hive.version.short> <!-- note that this should be compatible with Kafka brokers version 0.10 and up --> - <kafka.version>3.4.1</kafka.version> + <kafka.version>3.6.1</kafka.version> <!-- After 10.17.1.0, the minimum required version is JDK19 --> <derby.version>10.16.1.1</derby.version> <parquet.version>1.13.1</parquet.version> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org