tomasbartalos commented on a change in pull request #23749: [SPARK-26841][SQL]
Kafka timestamp pushdown
URL: https://github.com/apache/spark/pull/23749#discussion_r266487112
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
##########
@@ -106,19 +112,49 @@ private[kafka010] class KafkaRelation(
val rdd = new KafkaSourceRDD(
sqlContext.sparkContext, executorKafkaParams, offsetRanges,
pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer = false).map { cr =>
- InternalRow(
- cr.key,
- cr.value,
- UTF8String.fromString(cr.topic),
- cr.partition,
- cr.offset,
- DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)),
- cr.timestampType.id)
+ val columns =
requiredColumns.map{KafkaRelation.columnToValueExtractor(_)(cr)}
+ InternalRow.fromSeq(columns)
+ }
+ val schemaProjected = StructType(requiredColumns.map{schema(_)})
+ sqlContext.internalCreateDataFrame(rdd.setName("kafka"),
schemaProjected).rdd
+ }
+
+ def invalidateEmptyOffsets(
Review comment:
First I'll elaborate why we need empty offset. When we request mapping of
timestamp to Kafka offset, Kafka may return null for some partitions. This
means that specific partition doesn't contain any record which timestamp is
equal or greater to the given timestamp. I need to handle this situation and
transform null to something meaningful (currently constant, will be changed to
None as you've proposed).
The above situation may happen for calculated startingOffsets or
endingOffsets. As a result we have to invalidate every partition which have
startOffsets or endOffsets empty - set their offset range to (0,0).
Should I just add a comment to the method ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]