tomasbartalos commented on a change in pull request #23749: [SPARK-26841][SQL]
Kafka timestamp pushdown
URL: https://github.com/apache/spark/pull/23749#discussion_r266492737
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
##########
@@ -144,6 +180,81 @@ private[kafka010] class KafkaRelation(
}
}
+ private val TIMESTAMP_ATTR = "timestamp"
+
+ private def getStartingPartitionOffsetsByFilter(
+ kafkaReader: KafkaOffsetReader,
+ limitOffsets: Map[TopicPartition, Long],
+ filters: Array[Filter]): Map[TopicPartition, Long] = {
+
+ val timeOffsets: Map[TopicPartition, Long] = filters.flatMap {
+ case op: GreaterThan if op.attribute == TIMESTAMP_ATTR =>
+ val times = limitOffsets.map { case (tp, _) =>
+ tp -> (op.value.asInstanceOf[Timestamp].getTime + 1)}
+ kafkaReader.fetchOffsetsByTime(times)
+ case op: EqualTo if op.attribute == TIMESTAMP_ATTR =>
+ val times = limitOffsets.map { case (tp, _) =>
+ tp -> op.value.asInstanceOf[Timestamp].getTime}
+ kafkaReader.fetchOffsetsByTime(times)
+ case op: GreaterThanOrEqual if op.attribute == TIMESTAMP_ATTR =>
+ val times = limitOffsets.map { case (tp, _) =>
+ tp -> op.value.asInstanceOf[Timestamp].getTime}
+ kafkaReader.fetchOffsetsByTime(times)
+ case _ => None
+ }.toMap
+
+ limitOffsets.map {case (tp, offset) =>
+ val timeOffset = timeOffsets.getOrElse(tp, offset)
+ tp -> (if (timeOffset != EMPTY_OFFSET) math.max(offset, timeOffset) else
EMPTY_OFFSET)
+ }
+ }
+
+ private def getEndingPartitionOffsetsByFilter(
+ kafkaReader: KafkaOffsetReader,
+ limitOffsets: Map[TopicPartition, Long],
+ filters: Array[Filter]): Map[TopicPartition, Long] = {
+
+ val timeOffsets: Map[TopicPartition, Long] = filters.flatMap {
+ case op: LessThan if op.attribute == TIMESTAMP_ATTR =>
+ val times = limitOffsets.map { case (tp, _) =>
+ tp -> op.value.asInstanceOf[Timestamp].getTime}
+ kafkaReader.fetchOffsetsByTime(times)
+ case op: LessThanOrEqual if op.attribute == TIMESTAMP_ATTR =>
+ val times = limitOffsets.map { case (tp, _) =>
+ tp -> (op.value.asInstanceOf[Timestamp].getTime + 1)}
+ kafkaReader.fetchOffsetsByTime(times)
+ case op: EqualTo if op.attribute == TIMESTAMP_ATTR =>
+ val times = limitOffsets.map { case (tp, _) =>
+ tp -> (op.value.asInstanceOf[Timestamp].getTime + 1)}
+ kafkaReader.fetchOffsetsByTime(times)
+ case _ => None
+ }.toMap
+
+ limitOffsets.map {case (tp, offset) =>
+ var newOffset = timeOffsets.getOrElse(tp, offset)
+ if (isLimitSpecified(offset)) {
+ newOffset = if (newOffset != EMPTY_OFFSET) Math.min(offset, newOffset)
else EMPTY_OFFSET
+ }
+ tp -> newOffset
+ }
+ }
+
+ private def isLimitSpecified(offset: Long): Boolean = {
Review comment:
Yep, it may seem confusing. The thing is Kafka offset can have special flag
`LATEST = -1 EARLIEST = -2`. The method is checking if the offset is bound by
specific range and not unbound (latest, earliest). Honestly I had this renamed
like 3 times and I'm still not satisfied, but I can't find something meaningful.
Maybe `isNotLatestOrEarliest` would be better ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]