viirya commented on code in PR #52729:
URL: https://github.com/apache/spark/pull/52729#discussion_r2479530617


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -218,6 +227,93 @@ private[kafka010] class KafkaMicroBatchStream(
     }.toArray
   }
 
+  override def planInputPartitions(start: Offset): Array[InputPartition] = {
+    // This function is used for real time mode. Trigger restrictions won't be 
supported.
+    if (maxOffsetsPerTrigger.isDefined) {
+      throw new UnsupportedOperationException(
+        "maxOffsetsPerTrigger is not compatible with real time mode")
+    }
+    if (minOffsetPerTrigger.isDefined) {
+      throw new UnsupportedOperationException(
+        "minOffsetsPerTrigger is not compatible with real time mode"
+      )
+    }
+    if (options.containsKey(KafkaSourceProvider.MIN_PARTITIONS_OPTION_KEY)) {
+      throw new UnsupportedOperationException(
+        "minpartitions is not compatible with real time mode"
+      )
+    }
+    if (options.containsKey(KafkaSourceProvider.ENDING_TIMESTAMP_OPTION_KEY)) {
+      throw new UnsupportedOperationException(
+        "endingtimestamp is not compatible with real time mode"
+      )
+    }
+    if (options.containsKey(KafkaSourceProvider.MAX_TRIGGER_DELAY)) {
+      throw new UnsupportedOperationException(
+        "maxtriggerdelay is not compatible with real time mode"
+      )
+    }
+
+    // This function is used by Low Latency Mode, where we expect 1:1 mapping 
between a
+    // topic partition and an input partition.
+    // We are skipping partition range check for performance reason. We can 
always try to do
+    // it in tasks if needed.
+    val startPartitionOffsets = 
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
+
+    // Here we check previous topic partitions with latest partition offsets 
to see if we need to
+    // update the partition list. Here we don't need the updated partition 
topic to be absolutely
+    // up to date, because there might already be minutes' delay since new 
partition is created.
+    // latestPartitionOffsets should be fetched not long ago anyway.
+    // If the topic partitions change, we fetch the earliest offsets for all 
new partitions
+    // and add them to the list.
+    assert(latestPartitionOffsets != null, "latestPartitionOffsets should be 
set in latestOffset")
+    val latestTopicPartitions = latestPartitionOffsets.keySet
+    val newStartPartitionOffsets = if (startPartitionOffsets.keySet == 
latestTopicPartitions) {
+      startPartitionOffsets
+    } else {
+      val newPartitions = 
latestTopicPartitions.diff(startPartitionOffsets.keySet)
+      // Instead of fetching earliest offsets, we could fill offset 0 here and 
avoid this extra
+      // admin function call. But we consider new partition is rare and 
getting earliest offset
+      // aligns with what we do in micro-batch mode and can potentially enable 
more sanity checks
+      // in executor side.
+      val newPartitionOffsets = 
kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)

Review Comment:
   Hmm, this is what currently in `getOffsetRangesFromResolvedOffsets` called 
by `KafkaMicroBatchStream.planInputPartitions`:
   
   ```scala
   if (newPartitionInitialOffsets.keySet != newPartitions) {
     // We cannot get from offsets for some partitions. It means they got 
deleted.
     val deletedPartitions = 
newPartitions.diff(newPartitionInitialOffsets.keySet)
     reportDataLoss(
       s"Cannot find earliest offsets of ${deletedPartitions}. Some data may 
have been missed",
       () => 
KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions))
   }
   ```
   
   The behavior of `reportDataLoss` is configurable. It can be a failure like 
what you did here, or a log warning.
   
   I would suggest to follow existing behavior instead of two different 
behaviors.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to