viirya commented on code in PR #52729:
URL: https://github.com/apache/spark/pull/52729#discussion_r2476747793


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -218,6 +227,93 @@ private[kafka010] class KafkaMicroBatchStream(
     }.toArray
   }
 
+  override def planInputPartitions(start: Offset): Array[InputPartition] = {
+    // This function is used for real time mode. Trigger restrictions won't be 
supported.
+    if (maxOffsetsPerTrigger.isDefined) {
+      throw new UnsupportedOperationException(
+        "maxOffsetsPerTrigger is not compatible with real time mode")
+    }
+    if (minOffsetPerTrigger.isDefined) {
+      throw new UnsupportedOperationException(
+        "minOffsetsPerTrigger is not compatible with real time mode"
+      )
+    }
+    if (options.containsKey(KafkaSourceProvider.MIN_PARTITIONS_OPTION_KEY)) {
+      throw new UnsupportedOperationException(
+        "minpartitions is not compatible with real time mode"
+      )
+    }
+    if (options.containsKey(KafkaSourceProvider.ENDING_TIMESTAMP_OPTION_KEY)) {
+      throw new UnsupportedOperationException(
+        "endingtimestamp is not compatible with real time mode"
+      )
+    }
+    if (options.containsKey(KafkaSourceProvider.MAX_TRIGGER_DELAY)) {
+      throw new UnsupportedOperationException(
+        "maxtriggerdelay is not compatible with real time mode"
+      )
+    }
+
+    // This function is used by Low Latency Mode, where we expect 1:1 mapping 
between a
+    // topic partition and an input partition.
+    // We are skipping partition range check for performance reason. We can 
always try to do
+    // it in tasks if needed.
+    val startPartitionOffsets = 
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
+
+    // Here we check previous topic partitions with latest partition offsets 
to see if we need to
+    // update the partition list. Here we don't need the updated partition 
topic to be absolutely
+    // up to date, because there might already be minutes' delay since new 
partition is created.
+    // latestPartitionOffsets should be fetched not long ago anyway.
+    // If the topic partitions change, we fetch the earliest offsets for all 
new partitions
+    // and add them to the list.
+    assert(latestPartitionOffsets != null, "latestPartitionOffsets should be 
set in latestOffset")
+    val latestTopicPartitions = latestPartitionOffsets.keySet
+    val newStartPartitionOffsets = if (startPartitionOffsets.keySet == 
latestTopicPartitions) {
+      startPartitionOffsets
+    } else {
+      val newPartitions = 
latestTopicPartitions.diff(startPartitionOffsets.keySet)
+      // Instead of fetching earliest offsets, we could fill offset 0 here and 
avoid this extra
+      // admin function call. But we consider new partition is rare and 
getting earliest offset
+      // aligns with what we do in micro-batch mode and can potentially enable 
more sanity checks
+      // in executor side.
+      val newPartitionOffsets = 
kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+
+      assert(
+        newPartitionOffsets.keys.forall(!startPartitionOffsets.contains(_)),
+        "startPartitionOffsets should not contain any key in 
newPartitionOffsets")
+
+      // Filter out new partition offsets that are not 0 and log a warning
+      val nonZeroNewPartitionOffsets = newPartitionOffsets.filter {
+        case (_, offset) => offset != 0
+      }
+      // Log the non-zero new partition offsets
+      if (nonZeroNewPartitionOffsets.nonEmpty) {
+        logWarning(log"new partitions should start from offset 0: " +
+          log"${MDC(OFFSETS, nonZeroNewPartitionOffsets)}")
+      }

Review Comment:
   For non zero offset new partitions case, 
`getOffsetRangesFromResolvedOffsets` delegates to `reportDataLoss` closure. 
Should we do the same?



##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala:
##########
@@ -93,6 +104,38 @@ private case class KafkaBatchPartitionReader(
     }
   }
 
+  override def nextWithTimeout(timeoutMs: java.lang.Long): RecordStatus = {
+    if (!iteratorForRealTimeMode.isDefined) {
+      logInfo(s"Getting a new kafka consuming iterator for 
${offsetRange.topicPartition} " +
+        s"starting from ${nextOffset}, timeoutMs ${timeoutMs}")
+      iteratorForRealTimeMode = Some(consumer.getIterator(nextOffset))
+    }
+    assert(iteratorForRealTimeMode.isDefined)
+    val nextRecord = iteratorForRealTimeMode.get.nextWithTimeout(timeoutMs)
+    nextRecord.foreach { record =>
+
+      nextRow = unsafeRowProjector(record)
+      nextOffset = record.offset + 1
+      if (record.timestampType() == TimestampType.LOG_APPEND_TIME ||
+        record.timestampType() == TimestampType.CREATE_TIME) {
+        if (!timestampTypeLogged) {
+          logInfo(log"Kafka source record timestamp type is " +
+            log"${MDC(LogKeys.TIMESTAMP_COLUMN_NAME, record.timestampType())}")
+          timestampTypeLogged = true
+        }

Review Comment:
   Could you explain more on this logging behavior? Why we need to do this 
logging?



##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -218,6 +227,93 @@ private[kafka010] class KafkaMicroBatchStream(
     }.toArray
   }
 
+  override def planInputPartitions(start: Offset): Array[InputPartition] = {
+    // This function is used for real time mode. Trigger restrictions won't be 
supported.
+    if (maxOffsetsPerTrigger.isDefined) {
+      throw new UnsupportedOperationException(
+        "maxOffsetsPerTrigger is not compatible with real time mode")
+    }
+    if (minOffsetPerTrigger.isDefined) {
+      throw new UnsupportedOperationException(
+        "minOffsetsPerTrigger is not compatible with real time mode"
+      )
+    }
+    if (options.containsKey(KafkaSourceProvider.MIN_PARTITIONS_OPTION_KEY)) {
+      throw new UnsupportedOperationException(
+        "minpartitions is not compatible with real time mode"
+      )
+    }
+    if (options.containsKey(KafkaSourceProvider.ENDING_TIMESTAMP_OPTION_KEY)) {
+      throw new UnsupportedOperationException(
+        "endingtimestamp is not compatible with real time mode"
+      )
+    }
+    if (options.containsKey(KafkaSourceProvider.MAX_TRIGGER_DELAY)) {
+      throw new UnsupportedOperationException(
+        "maxtriggerdelay is not compatible with real time mode"
+      )
+    }
+
+    // This function is used by Low Latency Mode, where we expect 1:1 mapping 
between a

Review Comment:
   ```suggestion
       // This function is used by real time mode, where we expect 1:1 mapping 
between a
   ```



##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -218,6 +227,93 @@ private[kafka010] class KafkaMicroBatchStream(
     }.toArray
   }
 
+  override def planInputPartitions(start: Offset): Array[InputPartition] = {
+    // This function is used for real time mode. Trigger restrictions won't be 
supported.
+    if (maxOffsetsPerTrigger.isDefined) {
+      throw new UnsupportedOperationException(
+        "maxOffsetsPerTrigger is not compatible with real time mode")
+    }
+    if (minOffsetPerTrigger.isDefined) {
+      throw new UnsupportedOperationException(
+        "minOffsetsPerTrigger is not compatible with real time mode"
+      )
+    }
+    if (options.containsKey(KafkaSourceProvider.MIN_PARTITIONS_OPTION_KEY)) {
+      throw new UnsupportedOperationException(
+        "minpartitions is not compatible with real time mode"
+      )
+    }
+    if (options.containsKey(KafkaSourceProvider.ENDING_TIMESTAMP_OPTION_KEY)) {
+      throw new UnsupportedOperationException(
+        "endingtimestamp is not compatible with real time mode"
+      )
+    }
+    if (options.containsKey(KafkaSourceProvider.MAX_TRIGGER_DELAY)) {
+      throw new UnsupportedOperationException(
+        "maxtriggerdelay is not compatible with real time mode"
+      )
+    }
+
+    // This function is used by Low Latency Mode, where we expect 1:1 mapping 
between a
+    // topic partition and an input partition.
+    // We are skipping partition range check for performance reason. We can 
always try to do
+    // it in tasks if needed.
+    val startPartitionOffsets = 
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
+
+    // Here we check previous topic partitions with latest partition offsets 
to see if we need to
+    // update the partition list. Here we don't need the updated partition 
topic to be absolutely
+    // up to date, because there might already be minutes' delay since new 
partition is created.
+    // latestPartitionOffsets should be fetched not long ago anyway.
+    // If the topic partitions change, we fetch the earliest offsets for all 
new partitions
+    // and add them to the list.
+    assert(latestPartitionOffsets != null, "latestPartitionOffsets should be 
set in latestOffset")
+    val latestTopicPartitions = latestPartitionOffsets.keySet
+    val newStartPartitionOffsets = if (startPartitionOffsets.keySet == 
latestTopicPartitions) {
+      startPartitionOffsets
+    } else {
+      val newPartitions = 
latestTopicPartitions.diff(startPartitionOffsets.keySet)
+      // Instead of fetching earliest offsets, we could fill offset 0 here and 
avoid this extra
+      // admin function call. But we consider new partition is rare and 
getting earliest offset
+      // aligns with what we do in micro-batch mode and can potentially enable 
more sanity checks
+      // in executor side.
+      val newPartitionOffsets = 
kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)

Review Comment:
   `KafkaMicroBatchStream`'s existing `planInputPartitions` calls ` 
kafkaOffsetReader.getOffsetRangesFromResolvedOffsets` to handle partition 
offsets. 
   
   It handles deleted partitions cases but this new `planInputPartitions` 
doesn't, should we also do the same?
   
   



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala:
##########
@@ -284,6 +284,8 @@ trait StreamTest extends QueryTest with SharedSparkSession 
with TimeLimits with
 
   case class WaitUntilBatchProcessed(batchId: Long) extends StreamAction with 
StreamMustBeRunning
 
+  case object WaitUntilCurrentBatchProcessed extends StreamAction with 
StreamMustBeRunning

Review Comment:
   Why need this? Can't we use `WaitUntilBatchProcessed`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to