viirya commented on code in PR #52729:
URL: https://github.com/apache/spark/pull/52729#discussion_r2476747793
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -218,6 +227,93 @@ private[kafka010] class KafkaMicroBatchStream(
}.toArray
}
+ override def planInputPartitions(start: Offset): Array[InputPartition] = {
+ // This function is used for real time mode. Trigger restrictions won't be
supported.
+ if (maxOffsetsPerTrigger.isDefined) {
+ throw new UnsupportedOperationException(
+ "maxOffsetsPerTrigger is not compatible with real time mode")
+ }
+ if (minOffsetPerTrigger.isDefined) {
+ throw new UnsupportedOperationException(
+ "minOffsetsPerTrigger is not compatible with real time mode"
+ )
+ }
+ if (options.containsKey(KafkaSourceProvider.MIN_PARTITIONS_OPTION_KEY)) {
+ throw new UnsupportedOperationException(
+ "minpartitions is not compatible with real time mode"
+ )
+ }
+ if (options.containsKey(KafkaSourceProvider.ENDING_TIMESTAMP_OPTION_KEY)) {
+ throw new UnsupportedOperationException(
+ "endingtimestamp is not compatible with real time mode"
+ )
+ }
+ if (options.containsKey(KafkaSourceProvider.MAX_TRIGGER_DELAY)) {
+ throw new UnsupportedOperationException(
+ "maxtriggerdelay is not compatible with real time mode"
+ )
+ }
+
+ // This function is used by Low Latency Mode, where we expect 1:1 mapping
between a
+ // topic partition and an input partition.
+ // We are skipping partition range check for performance reason. We can
always try to do
+ // it in tasks if needed.
+ val startPartitionOffsets =
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
+
+ // Here we check previous topic partitions with latest partition offsets
to see if we need to
+ // update the partition list. Here we don't need the updated partition
topic to be absolutely
+ // up to date, because there might already be minutes' delay since new
partition is created.
+ // latestPartitionOffsets should be fetched not long ago anyway.
+ // If the topic partitions change, we fetch the earliest offsets for all
new partitions
+ // and add them to the list.
+ assert(latestPartitionOffsets != null, "latestPartitionOffsets should be
set in latestOffset")
+ val latestTopicPartitions = latestPartitionOffsets.keySet
+ val newStartPartitionOffsets = if (startPartitionOffsets.keySet ==
latestTopicPartitions) {
+ startPartitionOffsets
+ } else {
+ val newPartitions =
latestTopicPartitions.diff(startPartitionOffsets.keySet)
+ // Instead of fetching earliest offsets, we could fill offset 0 here and
avoid this extra
+ // admin function call. But we consider new partition is rare and
getting earliest offset
+ // aligns with what we do in micro-batch mode and can potentially enable
more sanity checks
+ // in executor side.
+ val newPartitionOffsets =
kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+
+ assert(
+ newPartitionOffsets.keys.forall(!startPartitionOffsets.contains(_)),
+ "startPartitionOffsets should not contain any key in
newPartitionOffsets")
+
+ // Filter out new partition offsets that are not 0 and log a warning
+ val nonZeroNewPartitionOffsets = newPartitionOffsets.filter {
+ case (_, offset) => offset != 0
+ }
+ // Log the non-zero new partition offsets
+ if (nonZeroNewPartitionOffsets.nonEmpty) {
+ logWarning(log"new partitions should start from offset 0: " +
+ log"${MDC(OFFSETS, nonZeroNewPartitionOffsets)}")
+ }
Review Comment:
For non zero offset new partitions case,
`getOffsetRangesFromResolvedOffsets` delegates to `reportDataLoss` closure.
Should we do the same?
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala:
##########
@@ -93,6 +104,38 @@ private case class KafkaBatchPartitionReader(
}
}
+ override def nextWithTimeout(timeoutMs: java.lang.Long): RecordStatus = {
+ if (!iteratorForRealTimeMode.isDefined) {
+ logInfo(s"Getting a new kafka consuming iterator for
${offsetRange.topicPartition} " +
+ s"starting from ${nextOffset}, timeoutMs ${timeoutMs}")
+ iteratorForRealTimeMode = Some(consumer.getIterator(nextOffset))
+ }
+ assert(iteratorForRealTimeMode.isDefined)
+ val nextRecord = iteratorForRealTimeMode.get.nextWithTimeout(timeoutMs)
+ nextRecord.foreach { record =>
+
+ nextRow = unsafeRowProjector(record)
+ nextOffset = record.offset + 1
+ if (record.timestampType() == TimestampType.LOG_APPEND_TIME ||
+ record.timestampType() == TimestampType.CREATE_TIME) {
+ if (!timestampTypeLogged) {
+ logInfo(log"Kafka source record timestamp type is " +
+ log"${MDC(LogKeys.TIMESTAMP_COLUMN_NAME, record.timestampType())}")
+ timestampTypeLogged = true
+ }
Review Comment:
Could you explain more on this logging behavior? Why we need to do this
logging?
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -218,6 +227,93 @@ private[kafka010] class KafkaMicroBatchStream(
}.toArray
}
+ override def planInputPartitions(start: Offset): Array[InputPartition] = {
+ // This function is used for real time mode. Trigger restrictions won't be
supported.
+ if (maxOffsetsPerTrigger.isDefined) {
+ throw new UnsupportedOperationException(
+ "maxOffsetsPerTrigger is not compatible with real time mode")
+ }
+ if (minOffsetPerTrigger.isDefined) {
+ throw new UnsupportedOperationException(
+ "minOffsetsPerTrigger is not compatible with real time mode"
+ )
+ }
+ if (options.containsKey(KafkaSourceProvider.MIN_PARTITIONS_OPTION_KEY)) {
+ throw new UnsupportedOperationException(
+ "minpartitions is not compatible with real time mode"
+ )
+ }
+ if (options.containsKey(KafkaSourceProvider.ENDING_TIMESTAMP_OPTION_KEY)) {
+ throw new UnsupportedOperationException(
+ "endingtimestamp is not compatible with real time mode"
+ )
+ }
+ if (options.containsKey(KafkaSourceProvider.MAX_TRIGGER_DELAY)) {
+ throw new UnsupportedOperationException(
+ "maxtriggerdelay is not compatible with real time mode"
+ )
+ }
+
+ // This function is used by Low Latency Mode, where we expect 1:1 mapping
between a
Review Comment:
```suggestion
// This function is used by real time mode, where we expect 1:1 mapping
between a
```
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -218,6 +227,93 @@ private[kafka010] class KafkaMicroBatchStream(
}.toArray
}
+ override def planInputPartitions(start: Offset): Array[InputPartition] = {
+ // This function is used for real time mode. Trigger restrictions won't be
supported.
+ if (maxOffsetsPerTrigger.isDefined) {
+ throw new UnsupportedOperationException(
+ "maxOffsetsPerTrigger is not compatible with real time mode")
+ }
+ if (minOffsetPerTrigger.isDefined) {
+ throw new UnsupportedOperationException(
+ "minOffsetsPerTrigger is not compatible with real time mode"
+ )
+ }
+ if (options.containsKey(KafkaSourceProvider.MIN_PARTITIONS_OPTION_KEY)) {
+ throw new UnsupportedOperationException(
+ "minpartitions is not compatible with real time mode"
+ )
+ }
+ if (options.containsKey(KafkaSourceProvider.ENDING_TIMESTAMP_OPTION_KEY)) {
+ throw new UnsupportedOperationException(
+ "endingtimestamp is not compatible with real time mode"
+ )
+ }
+ if (options.containsKey(KafkaSourceProvider.MAX_TRIGGER_DELAY)) {
+ throw new UnsupportedOperationException(
+ "maxtriggerdelay is not compatible with real time mode"
+ )
+ }
+
+ // This function is used by Low Latency Mode, where we expect 1:1 mapping
between a
+ // topic partition and an input partition.
+ // We are skipping partition range check for performance reason. We can
always try to do
+ // it in tasks if needed.
+ val startPartitionOffsets =
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
+
+ // Here we check previous topic partitions with latest partition offsets
to see if we need to
+ // update the partition list. Here we don't need the updated partition
topic to be absolutely
+ // up to date, because there might already be minutes' delay since new
partition is created.
+ // latestPartitionOffsets should be fetched not long ago anyway.
+ // If the topic partitions change, we fetch the earliest offsets for all
new partitions
+ // and add them to the list.
+ assert(latestPartitionOffsets != null, "latestPartitionOffsets should be
set in latestOffset")
+ val latestTopicPartitions = latestPartitionOffsets.keySet
+ val newStartPartitionOffsets = if (startPartitionOffsets.keySet ==
latestTopicPartitions) {
+ startPartitionOffsets
+ } else {
+ val newPartitions =
latestTopicPartitions.diff(startPartitionOffsets.keySet)
+ // Instead of fetching earliest offsets, we could fill offset 0 here and
avoid this extra
+ // admin function call. But we consider new partition is rare and
getting earliest offset
+ // aligns with what we do in micro-batch mode and can potentially enable
more sanity checks
+ // in executor side.
+ val newPartitionOffsets =
kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
Review Comment:
`KafkaMicroBatchStream`'s existing `planInputPartitions` calls `
kafkaOffsetReader.getOffsetRangesFromResolvedOffsets` to handle partition
offsets.
It handles deleted partitions cases but this new `planInputPartitions`
doesn't, should we also do the same?
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala:
##########
@@ -284,6 +284,8 @@ trait StreamTest extends QueryTest with SharedSparkSession
with TimeLimits with
case class WaitUntilBatchProcessed(batchId: Long) extends StreamAction with
StreamMustBeRunning
+ case object WaitUntilCurrentBatchProcessed extends StreamAction with
StreamMustBeRunning
Review Comment:
Why need this? Can't we use `WaitUntilBatchProcessed`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]