HeartSaVioR commented on a change in pull request #32653: URL: https://github.com/apache/spark/pull/32653#discussion_r642793811
########## File path: sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/CompositeReadLimit.java ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read.streaming; + +import org.apache.spark.annotation.Evolving; + +import java.util.Arrays; + +/** + /** + * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should scan approximately + * given maximum number of rows with at least the given minimum number of rows. + * + * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit) + * @since 3.1.2 Review comment: The target version will be 3.2.0 in best case. We don't add a new functionality in bugfix versions. ########## File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ########## @@ -321,6 +321,231 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } } + test("minOffsetsPerTrigger") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 3) + testUtils.sendMessages(topic, (100 to 109).map(_.toString).toArray, Some(0)) + testUtils.sendMessages(topic, (10 to 14).map(_.toString).toArray, Some(1)) + testUtils.sendMessages(topic, Array("1"), Some(2)) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("minOffsetsPerTrigger", 15) + .option("maxTriggerDelay", "10s") + .option("subscribe", topic) + .option("startingOffsets", "earliest") + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + + val clock = new StreamManualClock + + val waitUntilBatchProcessed = AssertOnQuery { q => + eventually(Timeout(streamingTimeout)) { + if (!q.exception.isDefined) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) + } + } + if (q.exception.isDefined) { + throw q.exception.get + } + true + } + testStream(mapped)( + StartStream(Trigger.ProcessingTime(100), clock), + waitUntilBatchProcessed, + // First Batch is always processed + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, + 11, 108, 109, 12, 13, 14), + // Adding more data but less than minOffsetsPerTrigger + Assert { + testUtils.sendMessages(topic, (15 to 20).map(_.toString).toArray, Some(1)) + true + }, + // No data is processed for next batch as data is less than minOffsetsPerTrigger + // and maxTriggerDelay is not expired + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, Review comment: Please use CheckNewAnswer here and below to assert only new output if that works for the test. If CheckNewAnswer works without waitUntilBatchProcessed, even better. ########## File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ########## @@ -321,6 +321,231 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } } + test("minOffsetsPerTrigger") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 3) + testUtils.sendMessages(topic, (100 to 109).map(_.toString).toArray, Some(0)) + testUtils.sendMessages(topic, (10 to 14).map(_.toString).toArray, Some(1)) + testUtils.sendMessages(topic, Array("1"), Some(2)) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("minOffsetsPerTrigger", 15) + .option("maxTriggerDelay", "10s") + .option("subscribe", topic) + .option("startingOffsets", "earliest") + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + + val clock = new StreamManualClock + + val waitUntilBatchProcessed = AssertOnQuery { q => Review comment: Let's extract this to `private def` and reuse across tests in this file. ########## File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ########## @@ -321,6 +321,231 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } } + test("minOffsetsPerTrigger") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 3) + testUtils.sendMessages(topic, (100 to 109).map(_.toString).toArray, Some(0)) + testUtils.sendMessages(topic, (10 to 14).map(_.toString).toArray, Some(1)) + testUtils.sendMessages(topic, Array("1"), Some(2)) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("minOffsetsPerTrigger", 15) + .option("maxTriggerDelay", "10s") Review comment: Let's pick smaller value like 5 seconds or similar. It won't introduce flakiness and less time duration to run. ########## File path: sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/ReadMinRows.java ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read.streaming; + +import org.apache.spark.annotation.Evolving; + +import java.util.Objects; + +/** + * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should scan approximately + * at least the given minimum number of rows. + * + * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit) + * @since 3.1.2 Review comment: Same here. ########## File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ########## @@ -321,6 +321,231 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } } + test("minOffsetsPerTrigger") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 3) + testUtils.sendMessages(topic, (100 to 109).map(_.toString).toArray, Some(0)) + testUtils.sendMessages(topic, (10 to 14).map(_.toString).toArray, Some(1)) + testUtils.sendMessages(topic, Array("1"), Some(2)) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("minOffsetsPerTrigger", 15) + .option("maxTriggerDelay", "10s") + .option("subscribe", topic) + .option("startingOffsets", "earliest") + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + + val clock = new StreamManualClock + + val waitUntilBatchProcessed = AssertOnQuery { q => + eventually(Timeout(streamingTimeout)) { + if (!q.exception.isDefined) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) + } + } + if (q.exception.isDefined) { + throw q.exception.get + } + true + } + testStream(mapped)( + StartStream(Trigger.ProcessingTime(100), clock), + waitUntilBatchProcessed, + // First Batch is always processed + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, + 11, 108, 109, 12, 13, 14), + // Adding more data but less than minOffsetsPerTrigger + Assert { + testUtils.sendMessages(topic, (15 to 20).map(_.toString).toArray, Some(1)) + true + }, + // No data is processed for next batch as data is less than minOffsetsPerTrigger + // and maxTriggerDelay is not expired + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, + 11, 108, 109, 12, 13, 14), + Assert { + testUtils.sendMessages(topic, (110 to 120).map(_.toString).toArray, Some(0)) + testUtils.sendMessages(topic, Array("2"), Some(2)) + true + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + // Running batch now as number of records is greater than minOffsetsPerTrigger + // 2 from smallest, 10 more from middle, 20 more from biggest + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, + 11, 108, 109, 110, 111, 112, 113, 114, 115, 116, + 12, 117, 118, 119, 120, 13, 14, 15, 16, 17, 18, 19, 2, 20), + // Testing maxTriggerDelay + // Adding more data but less than minOffsetsPerTrigger + Assert { + testUtils.sendMessages(topic, (121 to 125).map(_.toString).toArray, Some(0)) + testUtils.sendMessages(topic, (21 to 25).map(_.toString).toArray, Some(1)) + true + }, + // No data is processed for next batch till maxTriggerDelay is expired + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, + 11, 108, 109, 110, 111, 112, 113, 114, 115, 116, + 12, 117, 118, 119, 120, 13, 14, 15, 16, 17, 18, 19, 2, 20), + // Sleeping for 10s to let maxTriggerDelay expire + Assert { + Thread.sleep(10 * 1000) + true + }, + AdvanceManualClock(100), + // Running batch as maxTriggerDelay is expired + waitUntilBatchProcessed, + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, + 11, 108, 109, 110, 111, 112, 113, 114, 115, 116, + 12, 117, 118, 119, 120, 121, 122, 123, 124, 125, + 13, 14, 15, 16, 17, 18, 19, 2, 20, 21, 22, 23, 24, 25) + ) + // When Trigger.Once() is used, the read limit should be ignored + val allData = Seq(1, 2) ++ (10 to 25) ++ (100 to 125) + withTempDir { dir => + testStream(mapped)( + StartStream(Trigger.Once(), checkpointLocation = dir.getCanonicalPath), + AssertOnQuery { q => + q.processAllAvailable() + true + }, + CheckAnswer(allData: _*), + StopStream, + + AddKafkaData(Set(topic), 1000 to 1010: _*), + StartStream(Trigger.Once(), checkpointLocation = dir.getCanonicalPath), + AssertOnQuery { q => + q.processAllAvailable() + true + }, + CheckAnswer((allData ++ 1000.to(1010)): _*) + ) + } + } + + test("compositeReadLimit") { Review comment: Same comments here. ########## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala ########## @@ -139,26 +156,78 @@ private[kafka010] class KafkaSource( override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset = { // Make sure initialPartitionOffsets is initialized initialPartitionOffsets - - val latest = kafkaReader.fetchLatestOffsets( - currentPartitionOffsets.orElse(Some(initialPartitionOffsets))) + val currentOffsets = currentPartitionOffsets.orElse(Some(initialPartitionOffsets)) + val latest = kafkaReader.fetchLatestOffsets(currentOffsets) + var skipBatch = false Review comment: Something like: ``` override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset = { // Make sure initialPartitionOffsets is initialized initialPartitionOffsets val currentOffsets = currentPartitionOffsets.orElse(Some(initialPartitionOffsets)) val latest = kafkaReader.fetchLatestOffsets(currentOffsets) latestPartitionOffsets = Some(latest) val limits: Seq[ReadLimit] = limit match { case rows: CompositeReadLimit => rows.getReadLimits case rows => Seq(rows) } val offsets = if (limits.exists(_.isInstanceOf[ReadAllAvailable])) { // ReadAllAvailable has the highest priority latest } else { val lowerLimit = limits.find(_.isInstanceOf[ReadMinRows]).map(_.asInstanceOf[ReadMinRows]) val upperLimit = limits.find(_.isInstanceOf[ReadMaxRows]).map(_.asInstanceOf[ReadMaxRows]) lowerLimit.flatMap { limit => // checking if we need to skip batch based on minOffsetPerTrigger criteria val skipBatch = delayBatch( limit.minRows, latest, currentOffsets.get, limit.maxTriggerDelayMs) if (skipBatch) { logDebug( s"Delaying batch as number of records available is less than minOffsetsPerTrigger") // Pass same current offsets as output to skip trigger Some(currentOffsets.get) } else { None } }.orElse { // checking if we need to adjust a range of offsets based on maxOffsetPerTrigger criteria upperLimit.map { limit => rateLimit(limit.maxRows, currentPartitionOffsets.getOrElse(initialPartitionOffsets), latest) } }.getOrElse(latest) } currentPartitionOffsets = Some(offsets) logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}") KafkaSourceOffset(offsets) } ``` ########## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala ########## @@ -95,15 +114,62 @@ private[kafka010] class KafkaMicroBatchStream( override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = { val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets)) + var skipBatch = false Review comment: Btw I guess we can extract the logic of calculating offsets (including delayBatch, of course) based on ReadLimit across DSv1 and DSv2. These codes are mostly duplicated. Maybe having a new object `KafkaSourceOffsetHelper` and extracting the logic into method(s) there? The content of `getDefaultReadLimit` is another good thing to extract into as well. ########## File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ########## @@ -321,6 +321,231 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } } + test("minOffsetsPerTrigger") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 3) + testUtils.sendMessages(topic, (100 to 109).map(_.toString).toArray, Some(0)) + testUtils.sendMessages(topic, (10 to 14).map(_.toString).toArray, Some(1)) + testUtils.sendMessages(topic, Array("1"), Some(2)) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("minOffsetsPerTrigger", 15) + .option("maxTriggerDelay", "10s") + .option("subscribe", topic) + .option("startingOffsets", "earliest") + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + + val clock = new StreamManualClock + + val waitUntilBatchProcessed = AssertOnQuery { q => + eventually(Timeout(streamingTimeout)) { + if (!q.exception.isDefined) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) + } + } + if (q.exception.isDefined) { + throw q.exception.get + } + true + } + testStream(mapped)( + StartStream(Trigger.ProcessingTime(100), clock), + waitUntilBatchProcessed, + // First Batch is always processed + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, + 11, 108, 109, 12, 13, 14), + // Adding more data but less than minOffsetsPerTrigger + Assert { + testUtils.sendMessages(topic, (15 to 20).map(_.toString).toArray, Some(1)) + true + }, + // No data is processed for next batch as data is less than minOffsetsPerTrigger + // and maxTriggerDelay is not expired + AdvanceManualClock(100), + waitUntilBatchProcessed, + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, + 11, 108, 109, 12, 13, 14), + Assert { + testUtils.sendMessages(topic, (110 to 120).map(_.toString).toArray, Some(0)) + testUtils.sendMessages(topic, Array("2"), Some(2)) + true + }, + AdvanceManualClock(100), + waitUntilBatchProcessed, + // Running batch now as number of records is greater than minOffsetsPerTrigger + // 2 from smallest, 10 more from middle, 20 more from biggest Review comment: Once CheckNewAnswer works we don't need the code comment for expected values. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
