tomasbartalos commented on a change in pull request #23749: [SPARK-26841][SQL] 
Kafka timestamp pushdown
URL: https://github.com/apache/spark/pull/23749#discussion_r266487112
 
 

 ##########
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
 ##########
 @@ -106,19 +112,49 @@ private[kafka010] class KafkaRelation(
     val rdd = new KafkaSourceRDD(
       sqlContext.sparkContext, executorKafkaParams, offsetRanges,
       pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer = false).map { cr =>
-      InternalRow(
-        cr.key,
-        cr.value,
-        UTF8String.fromString(cr.topic),
-        cr.partition,
-        cr.offset,
-        DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)),
-        cr.timestampType.id)
+        val columns = 
requiredColumns.map{KafkaRelation.columnToValueExtractor(_)(cr)}
+        InternalRow.fromSeq(columns)
+      }
+    val schemaProjected = StructType(requiredColumns.map{schema(_)})
+    sqlContext.internalCreateDataFrame(rdd.setName("kafka"), 
schemaProjected).rdd
+  }
+
+  def invalidateEmptyOffsets(
 
 Review comment:
   First I'll elaborate why we need empty offset. When we request mapping of 
timestamp to Kafka offset, Kafka may return null for some partitions. This 
means that specific partition doesn't contain any record which timestamp is 
equal or greater to the given timestamp. I need to handle this situation and 
transform null to something meaningful (currently constant, will be changed to 
None as you've proposed).
   
   The above situation may happen for calculated startingOffsets or 
endingOffsets. As a result we have to invalidate every partition which have 
startOffsets or endOffsets empty - set their offset range to (0,0). 
   
   Should I just add a comment to the method ?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to