gaborgsomogyi commented on a change in pull request #29729:
URL: https://github.com/apache/spark/pull/29729#discussion_r487981062
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
##########
@@ -213,64 +180,70 @@ private[kafka010] class KafkaOffsetReader(
assert(partitions.asScala == partitionTimestamps.keySet,
"If starting/endingOffsetsByTimestamp contains specific offsets, you
must specify all " +
s"topics. Specified: ${partitionTimestamps.keySet} Assigned:
${partitions.asScala}")
- logDebug(s"Partitions assigned to consumer: $partitions. Seeking to
$partitionTimestamps")
+ logDebug(s"Assigned partitions: $partitions. Seeking to
$partitionTimestamps")
}
val fnRetrievePartitionOffsets: ju.Set[TopicPartition] =>
Map[TopicPartition, Long] = { _ => {
- val converted = partitionTimestamps.map { case (tp, timestamp) =>
- tp -> java.lang.Long.valueOf(timestamp)
+ val listOffsetsParams = partitionTimestamps.map { p =>
+ p._1 -> OffsetSpec.forTimestamp(p._2)
}.asJava
+ admin.listOffsets(listOffsetsParams,
listOffsetsOptions()).all().get().asScala.map {
+ case (tp, offsetSpec) =>
+ if (failsOnNoMatchingOffset) {
+ assert(offsetSpec.offset() !=
OffsetFetchResponse.INVALID_OFFSET, "No offset " +
+ s"matched from request of topic-partition $tp and timestamp " +
+ s"${partitionTimestamps(tp)}.")
+ }
- val offsetForTime: ju.Map[TopicPartition, OffsetAndTimestamp] =
- consumer.offsetsForTimes(converted)
-
- offsetForTime.asScala.map { case (tp, offsetAndTimestamp) =>
- if (failsOnNoMatchingOffset) {
- assert(offsetAndTimestamp != null, "No offset matched from request
of " +
- s"topic-partition $tp and timestamp ${partitionTimestamps(tp)}.")
- }
-
- if (offsetAndTimestamp == null) {
- tp -> KafkaOffsetRangeLimit.LATEST
- } else {
- tp -> offsetAndTimestamp.offset()
- }
+ if (offsetSpec == null) {
Review comment:
This happens when I'm not creating test application or a gist at least
when I see API difference. Here is an example test to see how the new API
behaves: https://gist.github.com/gaborgsomogyi/69cfac7c6d22f766d89d4b46e5ffec75
To make the initial comment correct: Different API, different result.
`offsetAndTimestamp` can be null.
All in all `offsetSpec.offset() == OffsetFetchResponse.INVALID_OFFSET` must
be checked.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]