HeartSaVioR commented on a change in pull request #32653:
URL: https://github.com/apache/spark/pull/32653#discussion_r638448022
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -64,6 +66,15 @@ private[kafka010] class KafkaMicroBatchStream(
private[kafka010] val maxOffsetsPerTrigger = Option(options.get(
KafkaSourceProvider.MAX_OFFSET_PER_TRIGGER)).map(_.toLong)
+ private[kafka010] val minOffsetPerTrigger = Option(options.get(
+ KafkaSourceProvider.MIN_OFFSET_PER_TRIGGER)).map(_.toLong)
+
+ private[kafka010] val maxTriggerDelayMs =
Review comment:
I'd like to see this as a part of `ReadMinRows`, so that other data
source implementations leveraging this should also respect the maximum wait
time. But let's hear others' voices as well.
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
##########
@@ -524,6 +528,9 @@ private[kafka010] object KafkaSourceProvider extends
Logging {
private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
private[kafka010] val MIN_PARTITIONS_OPTION_KEY = "minpartitions"
private[kafka010] val MAX_OFFSET_PER_TRIGGER = "maxoffsetspertrigger"
+ private[kafka010] val MIN_OFFSET_PER_TRIGGER = "minOffsetsPerTrigger"
Review comment:
We use all lowercases here to represent the fact the option name is case
insensitive.
##########
File path:
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/streaming/CompositeReadLimit.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read.streaming;
+
+import org.apache.spark.annotation.Evolving;
+
+import java.util.Objects;
+
+/**
+ /**
+ * Represents a {@link ReadLimit} where the {@link MicroBatchStream} should
scan approximately
+ * given maximum number of rows with at least the given minimum number of rows.
+ *
+ * @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
+ * @since 3.1.2
+ */
+@Evolving
+public final class CompositeReadLimit implements ReadLimit {
Review comment:
I'd like to see this be general container of ReadLimits containing
`ReadLimit[]` or `List<ReadLimit>`, instead of sticking with minRows and
maxRows. e.g. Once we want to do the same for ReadMinFiles we will have to have
another class.
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
##########
@@ -87,9 +88,17 @@ private[kafka010] class KafkaSource(
private val maxOffsetsPerTrigger =
sourceOptions.get(MAX_OFFSET_PER_TRIGGER).map(_.toLong)
+ private[kafka010] val minOffsetPerTrigger =
+ sourceOptions.get(MIN_OFFSET_PER_TRIGGER).map(_.toLong)
+
+ private[kafka010] val maxTriggerDelayMs =
Review comment:
Same here.
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
##########
@@ -95,15 +112,51 @@ private[kafka010] class KafkaMicroBatchStream(
override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = {
val startPartitionOffsets =
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
latestPartitionOffsets =
kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
+ var skipBatch = false
endPartitionOffsets = KafkaSourceOffset(readLimit match {
+ case rows: ReadMinRows =>
+ // checking if we need to skip batch based on minOffsetPerTrigger
criteria
+ skipBatch = delayBatch(rows.minRows(), latestPartitionOffsets,
startPartitionOffsets)
+ if (skipBatch) {
+ logDebug(
+ s"Delaying batch as number of records available is less than
minOffsetsPerTrigger")
+ startPartitionOffsets
+ } else latestPartitionOffsets
case rows: ReadMaxRows =>
rateLimit(rows.maxRows(), startPartitionOffsets,
latestPartitionOffsets)
+ case rows: CompositeReadLimit =>
+ skipBatch = delayBatch(rows.minRows(), latestPartitionOffsets,
startPartitionOffsets)
+ if (skipBatch) {
+ logDebug(
+ s"Delaying batch as number of records available is less than
minOffsetsPerTrigger")
+ startPartitionOffsets
+ } else {
+ rateLimit(rows.maxRows(), startPartitionOffsets,
latestPartitionOffsets)
+ }
case _: ReadAllAvailable =>
latestPartitionOffsets
})
+ if (!skipBatch) lastTriggerMillis = System.currentTimeMillis()
endPartitionOffsets
}
+ /** Checks if we need to skip this trigger based on minOffsetsPerTrigger &
maxTriggerDelay */
+ private def delayBatch(minLimit: Long,
Review comment:
Indentation looks to be off: please refer the style guide on indentation
of the method parameters.
https://github.com/databricks/scala-style-guide
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]