viirya commented on a change in pull request #32747:
URL: https://github.com/apache/spark/pull/32747#discussion_r655123411



##########
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
##########
@@ -66,30 +67,40 @@ private[kafka010] trait KafkaOffsetReader {
   /**
    * Resolves the specific offsets based on timestamp per topic-partition.
    * The returned offset for each partition is the earliest offset whose 
timestamp is greater
-   * than or equal to the given timestamp in the corresponding partition. If 
the matched offset
-   * doesn't exist, depending on `failsOnNoMatchingOffset` parameter, the 
offset will be set to
-   * latest or this method throws an error.
+   * than or equal to the given timestamp in the corresponding partition.
+   *
+   * If the matched offset doesn't exist, the behavior depends on the 
destination and the option:
+   *
+   * - isStartingOffsets = false => implementation should provide the offset 
same as 'latest'
+   * - isStartingOffsets = true  => implementation should follow the strategy 
on non-matching
+   *                                starting offset, passed as 
`strategyOnNoMatchStartingOffset`
    *
    * @param partitionTimestamps the timestamp per topic-partition.
-   * @param failsOnNoMatchingOffset whether to fail the query when no matched 
offset can be found.
    */
   def fetchSpecificTimestampBasedOffsets(
       partitionTimestamps: Map[TopicPartition, Long],
-      failsOnNoMatchingOffset: Boolean): KafkaSourceOffset
+      isStartingOffsets: Boolean,

Review comment:
       `isStartingOffsets` here actually means the starting offset of the 
query, right? We have another concept that is the start offset for current 
batch (e.g. `isStartingOffsets` in `fetchPartitionOffsets`), do you think it is 
better to rename this as `isQueryStartingOffsets`?

##########
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
##########
@@ -135,12 +136,12 @@ private[kafka010] class KafkaOffsetReaderAdmin(
       }.toMap
       case SpecificOffsetRangeLimit(partitionOffsets) =>
         validateTopicPartitions(partitions, partitionOffsets)
-      case SpecificTimestampRangeLimit(partitionTimestamps) =>
-        fetchSpecificTimestampBasedOffsets(partitionTimestamps,
-          failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets
-      case GlobalTimestampRangeLimit(timestamp) =>
-        fetchGlobalTimestampBasedOffsets(timestamp,
-          failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets
+      case SpecificTimestampRangeLimit(partitionTimestamps, 
strategyOnNoMatchingStartingOffset) =>
+        fetchSpecificTimestampBasedOffsets(partitionTimestamps, 
isStartingOffsets,
+          strategyOnNoMatchingStartingOffset).partitionToOffsets
+      case GlobalTimestampRangeLimit(timestamp, 
strategyOnNoMatchingStartingOffset) =>
+        fetchGlobalTimestampBasedOffsets(timestamp, isStartingOffsets,
+          strategyOnNoMatchingStartingOffset).partitionToOffsets

Review comment:
       Hmm, but the `isStartingOffsets` here is actually the starting offset of 
batch?

##########
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
##########
@@ -208,25 +205,47 @@ private[kafka010] class KafkaOffsetReaderAdmin(
       val listOffsetsParams = tps.asScala.map { tp =>
         tp -> OffsetSpec.forTimestamp(timestamp)
       }.toMap.asJava
-      admin.listOffsets(listOffsetsParams, 
listOffsetsOptions).all().get().asScala.map {
-        case (tp, offsetSpec) =>
-          if (failsOnNoMatchingOffset) {
-            assert(offsetSpec.offset() != OffsetFetchResponse.INVALID_OFFSET, 
"No offset " +
-              s"matched from request of topic-partition $tp and timestamp " +
-              s"$timestamp.")
-          }
 
-          if (offsetSpec.offset() == OffsetFetchResponse.INVALID_OFFSET) {
-            tp -> KafkaOffsetRangeLimit.LATEST
-          } else {
-            tp -> offsetSpec.offset()
-          }
-      }.toMap
+      readTimestampOffsets(
+        admin.listOffsets(listOffsetsParams, 
listOffsetsOptions).all().get().asScala.toMap,
+        isStartingOffsets,
+        strategyOnNoMatchStartingOffset,
+        _ => timestamp
+      )
     }
 
     fetchSpecificOffsets0(fnAssertParametersWithPartitions, 
fnRetrievePartitionOffsets)
   }
 
+  private def readTimestampOffsets(
+      tpToOffsetMap: Map[TopicPartition, 
ListOffsetsResult.ListOffsetsResultInfo],
+      isStartingOffsets: Boolean,
+      strategyOnNoMatchStartingOffset: StrategyOnNoMatchStartingOffset.Value,
+      partitionTimestampFn: TopicPartition => Long): Map[TopicPartition, Long] 
= {
+
+    tpToOffsetMap.map { case (tp, offsetSpec) =>
+      val offset = if (offsetSpec.offset() == 
OffsetFetchResponse.INVALID_OFFSET) {
+        if (isStartingOffsets) {
+          strategyOnNoMatchStartingOffset match {
+            case StrategyOnNoMatchStartingOffset.ERROR =>
+              throw new IllegalArgumentException("No offset " +
+                s"matched from request of topic-partition $tp and timestamp " +
+                s"${partitionTimestampFn(tp)}.")
+
+            case StrategyOnNoMatchStartingOffset.LATEST =>
+              KafkaOffsetRangeLimit.LATEST
+          }

Review comment:
       Looks like this checks the strategy for each start offsets per batch? 
I'm a bit confused. Isn't the `startingOffsetsByTimestamp` only for `The start 
point of timestamp when a query is started ...`?

##########
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
##########
@@ -135,12 +136,12 @@ private[kafka010] class KafkaOffsetReaderAdmin(
       }.toMap
       case SpecificOffsetRangeLimit(partitionOffsets) =>
         validateTopicPartitions(partitions, partitionOffsets)
-      case SpecificTimestampRangeLimit(partitionTimestamps) =>
-        fetchSpecificTimestampBasedOffsets(partitionTimestamps,
-          failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets
-      case GlobalTimestampRangeLimit(timestamp) =>
-        fetchGlobalTimestampBasedOffsets(timestamp,
-          failsOnNoMatchingOffset = isStartingOffsets).partitionToOffsets
+      case SpecificTimestampRangeLimit(partitionTimestamps, 
strategyOnNoMatchingStartingOffset) =>
+        fetchSpecificTimestampBasedOffsets(partitionTimestamps, 
isStartingOffsets,
+          strategyOnNoMatchingStartingOffset).partitionToOffsets
+      case GlobalTimestampRangeLimit(timestamp, 
strategyOnNoMatchingStartingOffset) =>
+        fetchGlobalTimestampBasedOffsets(timestamp, isStartingOffsets,
+          strategyOnNoMatchingStartingOffset).partitionToOffsets

Review comment:
       Oh, I see. `fetchPartitionOffsets` is only called for batch query.

##########
File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
##########
@@ -208,25 +205,47 @@ private[kafka010] class KafkaOffsetReaderAdmin(
       val listOffsetsParams = tps.asScala.map { tp =>
         tp -> OffsetSpec.forTimestamp(timestamp)
       }.toMap.asJava
-      admin.listOffsets(listOffsetsParams, 
listOffsetsOptions).all().get().asScala.map {
-        case (tp, offsetSpec) =>
-          if (failsOnNoMatchingOffset) {
-            assert(offsetSpec.offset() != OffsetFetchResponse.INVALID_OFFSET, 
"No offset " +
-              s"matched from request of topic-partition $tp and timestamp " +
-              s"$timestamp.")
-          }
 
-          if (offsetSpec.offset() == OffsetFetchResponse.INVALID_OFFSET) {
-            tp -> KafkaOffsetRangeLimit.LATEST
-          } else {
-            tp -> offsetSpec.offset()
-          }
-      }.toMap
+      readTimestampOffsets(
+        admin.listOffsets(listOffsetsParams, 
listOffsetsOptions).all().get().asScala.toMap,
+        isStartingOffsets,
+        strategyOnNoMatchStartingOffset,
+        _ => timestamp
+      )
     }
 
     fetchSpecificOffsets0(fnAssertParametersWithPartitions, 
fnRetrievePartitionOffsets)
   }
 
+  private def readTimestampOffsets(
+      tpToOffsetMap: Map[TopicPartition, 
ListOffsetsResult.ListOffsetsResultInfo],
+      isStartingOffsets: Boolean,
+      strategyOnNoMatchStartingOffset: StrategyOnNoMatchStartingOffset.Value,
+      partitionTimestampFn: TopicPartition => Long): Map[TopicPartition, Long] 
= {
+
+    tpToOffsetMap.map { case (tp, offsetSpec) =>
+      val offset = if (offsetSpec.offset() == 
OffsetFetchResponse.INVALID_OFFSET) {
+        if (isStartingOffsets) {
+          strategyOnNoMatchStartingOffset match {
+            case StrategyOnNoMatchStartingOffset.ERROR =>
+              throw new IllegalArgumentException("No offset " +
+                s"matched from request of topic-partition $tp and timestamp " +
+                s"${partitionTimestampFn(tp)}.")
+
+            case StrategyOnNoMatchStartingOffset.LATEST =>
+              KafkaOffsetRangeLimit.LATEST
+          }

Review comment:
       This is the same. nvm. Although `fetchSpecificTimestampBasedOffsets` is 
called in batch, and streaming batch queries, for streaming batch it is only 
called for initial partition offsets.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to