jose-torres commented on a change in pull request #23749: [SPARK-26841][SQL]
Kafka timestamp pushdown
URL: https://github.com/apache/spark/pull/23749#discussion_r266097445
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
##########
@@ -144,6 +180,81 @@ private[kafka010] class KafkaRelation(
}
}
+ private val TIMESTAMP_ATTR = "timestamp"
+
+ private def getStartingPartitionOffsetsByFilter(
+ kafkaReader: KafkaOffsetReader,
+ limitOffsets: Map[TopicPartition, Long],
+ filters: Array[Filter]): Map[TopicPartition, Long] = {
+
+ val timeOffsets: Map[TopicPartition, Long] = filters.flatMap {
+ case op: GreaterThan if op.attribute == TIMESTAMP_ATTR =>
+ val times = limitOffsets.map { case (tp, _) =>
+ tp -> (op.value.asInstanceOf[Timestamp].getTime + 1)}
+ kafkaReader.fetchOffsetsByTime(times)
+ case op: EqualTo if op.attribute == TIMESTAMP_ATTR =>
+ val times = limitOffsets.map { case (tp, _) =>
+ tp -> op.value.asInstanceOf[Timestamp].getTime}
+ kafkaReader.fetchOffsetsByTime(times)
+ case op: GreaterThanOrEqual if op.attribute == TIMESTAMP_ATTR =>
+ val times = limitOffsets.map { case (tp, _) =>
+ tp -> op.value.asInstanceOf[Timestamp].getTime}
+ kafkaReader.fetchOffsetsByTime(times)
+ case _ => None
+ }.toMap
+
+ limitOffsets.map {case (tp, offset) =>
+ val timeOffset = timeOffsets.getOrElse(tp, offset)
+ tp -> (if (timeOffset != EMPTY_OFFSET) math.max(offset, timeOffset) else
EMPTY_OFFSET)
+ }
+ }
+
+ private def getEndingPartitionOffsetsByFilter(
+ kafkaReader: KafkaOffsetReader,
+ limitOffsets: Map[TopicPartition, Long],
+ filters: Array[Filter]): Map[TopicPartition, Long] = {
+
+ val timeOffsets: Map[TopicPartition, Long] = filters.flatMap {
+ case op: LessThan if op.attribute == TIMESTAMP_ATTR =>
+ val times = limitOffsets.map { case (tp, _) =>
+ tp -> op.value.asInstanceOf[Timestamp].getTime}
+ kafkaReader.fetchOffsetsByTime(times)
+ case op: LessThanOrEqual if op.attribute == TIMESTAMP_ATTR =>
+ val times = limitOffsets.map { case (tp, _) =>
+ tp -> (op.value.asInstanceOf[Timestamp].getTime + 1)}
+ kafkaReader.fetchOffsetsByTime(times)
+ case op: EqualTo if op.attribute == TIMESTAMP_ATTR =>
+ val times = limitOffsets.map { case (tp, _) =>
+ tp -> (op.value.asInstanceOf[Timestamp].getTime + 1)}
+ kafkaReader.fetchOffsetsByTime(times)
+ case _ => None
+ }.toMap
+
+ limitOffsets.map {case (tp, offset) =>
+ var newOffset = timeOffsets.getOrElse(tp, offset)
+ if (isLimitSpecified(offset)) {
+ newOffset = if (newOffset != EMPTY_OFFSET) Math.min(offset, newOffset)
else EMPTY_OFFSET
+ }
+ tp -> newOffset
+ }
+ }
+
+ private def isLimitSpecified(offset: Long): Boolean = {
Review comment:
I don't see the connection between the name and implementation here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]