viirya commented on code in PR #52729:
URL: https://github.com/apache/spark/pull/52729#discussion_r2482116954
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -235,7 +345,30 @@ private[kafka010] class KafkaMicroBatchStream(
override def toString(): String = s"KafkaV2[$kafkaOffsetReader]"
override def metrics(latestConsumedOffset: Optional[Offset]): ju.Map[String,
String] = {
- KafkaMicroBatchStream.metrics(latestConsumedOffset, latestPartitionOffsets)
+ var rtmFetchLatestOffsetsTimeMs = Option.empty[Long]
+ val reCalculatedLatestPartitionOffsets =
+ if (inRealTimeMode) {
+ if (!latestConsumedOffset.isPresent) {
+ // this means a batch has no end offsets, which should not happen
+ None
+ } else {
+ Some {
+ val startTime = System.currentTimeMillis()
+ val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(
+
Some(latestConsumedOffset.get.asInstanceOf[KafkaSourceOffset].partitionToOffsets))
+ val endTime = System.currentTimeMillis()
+ rtmFetchLatestOffsetsTimeMs = Some(endTime - startTime)
Review Comment:
Hmm, I'm not sure if I miss something. `rtmFetchLatestOffsetsTimeMs` is
assigned here, but it is not used anymore?
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -235,7 +345,30 @@ private[kafka010] class KafkaMicroBatchStream(
override def toString(): String = s"KafkaV2[$kafkaOffsetReader]"
override def metrics(latestConsumedOffset: Optional[Offset]): ju.Map[String,
String] = {
- KafkaMicroBatchStream.metrics(latestConsumedOffset, latestPartitionOffsets)
+ var rtmFetchLatestOffsetsTimeMs = Option.empty[Long]
+ val reCalculatedLatestPartitionOffsets =
+ if (inRealTimeMode) {
+ if (!latestConsumedOffset.isPresent) {
+ // this means a batch has no end offsets, which should not happen
+ None
+ } else {
+ Some {
+ val startTime = System.currentTimeMillis()
+ val latestOffsets = kafkaOffsetReader.fetchLatestOffsets(
+
Some(latestConsumedOffset.get.asInstanceOf[KafkaSourceOffset].partitionToOffsets))
+ val endTime = System.currentTimeMillis()
+ rtmFetchLatestOffsetsTimeMs = Some(endTime - startTime)
+ latestOffsets
+ }
+ }
+ } else {
+ // If we are in micro-batch mode, we need to get the latest partition
offsets at the
+ // start of the batch and recalculate the latest offsets at the end
for backlog
+ // estimation.
+
Some(kafkaOffsetReader.fetchLatestOffsets(Some(latestPartitionOffsets)))
Review Comment:
This changes original behavior? Previously it just uses
`latestPartitionOffsets` without fetching latest offsets again.
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala:
##########
@@ -272,6 +292,82 @@ private[kafka010] class KafkaDataConsumer(
// Starting timestamp when the consumer is created.
private var startTimestampNano: Long = System.nanoTime()
+ /**
+ * Get an iterator that can return the next entry. It is used exclusively
for real-time
+ * mode.
+ *
+ * It is called by KafkaBatchPartitionReader.nextWithTimeout(). Unlike
get(), there is no
+ * out-of-bound check in this function. Since there is no endOffset given,
we assume anything
+ * record is valid to return as long as it is at or after `offset`.
+ *
+ * @param startOffsets, the starting positions to read from, inclusive.
+ */
+ def getIterator(offset: Long): KafkaDataConsumerIterator = {
Review Comment:
The param doc is `startOffsets` instead of `offset`.
```suggestion
def getIterator(startOffsets: Long): KafkaDataConsumerIterator = {
```
##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRealTimeIntegrationSuite.scala:
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.nio.file.Files
+import java.util.Properties
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+import org.apache.kafka.clients.producer.{KafkaProducer, Producer,
ProducerRecord}
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.{SparkContext, ThreadAudit}
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.execution.streaming.RealTimeTrigger
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.streaming.{OutputMode, ResultsCollector,
StreamingQuery, StreamRealTimeModeE2ESuiteBase, StreamRealTimeModeSuiteBase}
+import org.apache.spark.sql.test.TestSparkSession
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+class KafkaRealTimeModeE2ESuite extends KafkaSourceTest with
StreamRealTimeModeE2ESuiteBase {
+
+ override protected val defaultTrigger: RealTimeTrigger =
RealTimeTrigger.apply("5 seconds")
+
+ override protected def createSparkSession =
+ new TestSparkSession(
+ new SparkContext(
+ "local[15]",
+ "streaming-key-cuj"
+ )
+ )
+
+ override def beforeEach(): Unit = {
+ super[KafkaSourceTest].beforeEach()
+ super[StreamRealTimeModeE2ESuiteBase].beforeEach()
+ }
+
+ def getKafkaConsumerProperties: Properties = {
+ val props: Properties = new Properties()
+ props.put("bootstrap.servers", testUtils.brokerAddress)
+ props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put("compression.type", "snappy")
+
+ props
+ }
+
+ test("Union two kafka streams, for each write to sink") {
+ var q: StreamingQuery = null
+ try {
+ val topic1 = newTopic()
+ val topic2 = newTopic()
+ testUtils.createTopic(topic1, partitions = 2)
+ testUtils.createTopic(topic2, partitions = 2)
+
+ val props: Properties = getKafkaConsumerProperties
+ val producer1: Producer[String, String] = new KafkaProducer[String,
String](props)
+ val producer2: Producer[String, String] = new KafkaProducer[String,
String](props)
+
+ val readStream1 = spark.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic1)
+ .load()
+
+ val readStream2 = spark.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic2)
+ .load()
+
+ val df = readStream1
+ .union(readStream2)
+ .selectExpr("CAST(key AS STRING) AS key", "CAST(value AS STRING) AS
value")
+ .selectExpr("key || ',' || value")
+ .toDF()
+
+ q = runStreamingQuery("union-kafka", df)
+
+ waitForTasksToStart(4)
+
+ val expectedResults = new mutable.ListBuffer[String]()
+ for (batch <- 0 until 3) {
+ (1 to 100).foreach(i => {
+ producer1
+ .send(
+ new ProducerRecord[String, String](
+ topic1,
+ java.lang.Long.toString(i),
+ s"input1-${batch}-${i}"
+ )
+ )
+ .get()
+ producer2
+ .send(
+ new ProducerRecord[String, String](
+ topic2,
+ java.lang.Long.toString(i),
+ s"input2-${batch}-${i}"
+ )
+ )
+ .get()
+ })
+ producer1.flush()
+ producer2.flush()
+
+ expectedResults ++= (1 to 100)
+ .flatMap(v => {
+ Seq(
+ s"${v},input1-${batch}-${v}",
+ s"${v},input2-${batch}-${v}"
+ )
+ })
+ .toList
+
+ eventually(timeout(60.seconds)) {
+ ResultsCollector
+ .get(sinkName)
+ .toArray(new Array[String](ResultsCollector.get(sinkName).size()))
+ .toList
+ .sorted should equal(expectedResults.sorted)
+ }
+ }
+ } finally {
+ if (q != null) {
+ q.stop()
+ }
+ }
+ }
+}
+
+
+/**
+ * Kafka Real-Time Integration test suite.
+ * Tests with a distributed spark cluster with
+ * separate executors processes deployed.
+ */
+class KafkaRealTimeIntegrationSuite
Review Comment:
The difference between this test suite and `KafkaRealTimeModeSuite` is that
this is specially for distributed spark cluster?
##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRealTimeIntegrationSuite.scala:
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.nio.file.Files
+import java.util.Properties
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+import org.apache.kafka.clients.producer.{KafkaProducer, Producer,
ProducerRecord}
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.{SparkContext, ThreadAudit}
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.execution.streaming.RealTimeTrigger
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.streaming.{OutputMode, ResultsCollector,
StreamingQuery, StreamRealTimeModeE2ESuiteBase, StreamRealTimeModeSuiteBase}
+import org.apache.spark.sql.test.TestSparkSession
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+class KafkaRealTimeModeE2ESuite extends KafkaSourceTest with
StreamRealTimeModeE2ESuiteBase {
+
+ override protected val defaultTrigger: RealTimeTrigger =
RealTimeTrigger.apply("5 seconds")
+
+ override protected def createSparkSession =
+ new TestSparkSession(
+ new SparkContext(
+ "local[15]",
+ "streaming-key-cuj"
+ )
+ )
+
+ override def beforeEach(): Unit = {
+ super[KafkaSourceTest].beforeEach()
+ super[StreamRealTimeModeE2ESuiteBase].beforeEach()
+ }
+
+ def getKafkaConsumerProperties: Properties = {
+ val props: Properties = new Properties()
+ props.put("bootstrap.servers", testUtils.brokerAddress)
+ props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put("compression.type", "snappy")
+
+ props
+ }
+
+ test("Union two kafka streams, for each write to sink") {
Review Comment:
Why to have this test separately in this `KafkaRealTimeModeE2ESuite` instead
`KafkaRealTimeModeSuite` or `KafkaRealTimeIntegrationSuite`?
`KafkaRealTimeModeE2ESuite` and `KafkaRealTimeIntegrationSuite` look like
both e2e test suites. Should we have just one e2e test suite?
##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRealTimeIntegrationSuite.scala:
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.nio.file.Files
+import java.util.Properties
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+import org.apache.kafka.clients.producer.{KafkaProducer, Producer,
ProducerRecord}
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.{SparkContext, ThreadAudit}
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.execution.streaming.RealTimeTrigger
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.streaming.{OutputMode, ResultsCollector,
StreamingQuery, StreamRealTimeModeE2ESuiteBase, StreamRealTimeModeSuiteBase}
+import org.apache.spark.sql.test.TestSparkSession
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+class KafkaRealTimeModeE2ESuite extends KafkaSourceTest with
StreamRealTimeModeE2ESuiteBase {
+
+ override protected val defaultTrigger: RealTimeTrigger =
RealTimeTrigger.apply("5 seconds")
+
+ override protected def createSparkSession =
+ new TestSparkSession(
+ new SparkContext(
+ "local[15]",
+ "streaming-key-cuj"
+ )
+ )
+
+ override def beforeEach(): Unit = {
+ super[KafkaSourceTest].beforeEach()
+ super[StreamRealTimeModeE2ESuiteBase].beforeEach()
+ }
+
+ def getKafkaConsumerProperties: Properties = {
+ val props: Properties = new Properties()
+ props.put("bootstrap.servers", testUtils.brokerAddress)
+ props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put("compression.type", "snappy")
+
+ props
+ }
+
+ test("Union two kafka streams, for each write to sink") {
+ var q: StreamingQuery = null
+ try {
+ val topic1 = newTopic()
+ val topic2 = newTopic()
+ testUtils.createTopic(topic1, partitions = 2)
+ testUtils.createTopic(topic2, partitions = 2)
+
+ val props: Properties = getKafkaConsumerProperties
+ val producer1: Producer[String, String] = new KafkaProducer[String,
String](props)
+ val producer2: Producer[String, String] = new KafkaProducer[String,
String](props)
+
+ val readStream1 = spark.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic1)
+ .load()
+
+ val readStream2 = spark.readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("subscribe", topic2)
+ .load()
+
+ val df = readStream1
+ .union(readStream2)
+ .selectExpr("CAST(key AS STRING) AS key", "CAST(value AS STRING) AS
value")
+ .selectExpr("key || ',' || value")
+ .toDF()
+
+ q = runStreamingQuery("union-kafka", df)
+
+ waitForTasksToStart(4)
+
+ val expectedResults = new mutable.ListBuffer[String]()
+ for (batch <- 0 until 3) {
+ (1 to 100).foreach(i => {
+ producer1
+ .send(
+ new ProducerRecord[String, String](
+ topic1,
+ java.lang.Long.toString(i),
+ s"input1-${batch}-${i}"
+ )
+ )
+ .get()
+ producer2
+ .send(
+ new ProducerRecord[String, String](
+ topic2,
+ java.lang.Long.toString(i),
+ s"input2-${batch}-${i}"
+ )
+ )
+ .get()
+ })
+ producer1.flush()
+ producer2.flush()
+
+ expectedResults ++= (1 to 100)
+ .flatMap(v => {
+ Seq(
+ s"${v},input1-${batch}-${v}",
+ s"${v},input2-${batch}-${v}"
+ )
+ })
+ .toList
+
+ eventually(timeout(60.seconds)) {
+ ResultsCollector
+ .get(sinkName)
+ .toArray(new Array[String](ResultsCollector.get(sinkName).size()))
+ .toList
+ .sorted should equal(expectedResults.sorted)
+ }
+ }
+ } finally {
+ if (q != null) {
+ q.stop()
+ }
+ }
+ }
+}
+
+
+/**
+ * Kafka Real-Time Integration test suite.
+ * Tests with a distributed spark cluster with
+ * separate executors processes deployed.
+ */
+class KafkaRealTimeIntegrationSuite
+ extends KafkaSourceTest
+ with StreamRealTimeModeSuiteBase
Review Comment:
Hmm, I saw you created two suite base classes:
`StreamRealTimeModeE2ESuiteBase` and `StreamRealTimeModeSuiteBase`. This test
suite looks like for e2e tests too, why it doesn't use
`StreamRealTimeModeE2ESuiteBase` but `StreamRealTimeModeSuiteBase`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]