tomasbartalos commented on a change in pull request #23749: [SPARK-26841][SQL] 
Kafka timestamp pushdown
URL: https://github.com/apache/spark/pull/23749#discussion_r266552202
 
 

 ##########
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
 ##########
 @@ -106,19 +112,49 @@ private[kafka010] class KafkaRelation(
     val rdd = new KafkaSourceRDD(
       sqlContext.sparkContext, executorKafkaParams, offsetRanges,
       pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer = false).map { cr =>
-      InternalRow(
-        cr.key,
-        cr.value,
-        UTF8String.fromString(cr.topic),
-        cr.partition,
-        cr.offset,
-        DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)),
-        cr.timestampType.id)
+        val columns = 
requiredColumns.map{KafkaRelation.columnToValueExtractor(_)(cr)}
 
 Review comment:
   I'm not sure I understand what you mean by
   > I'd prefer just having the test harness create its own extractor if it 
needs one
   
   Could you please elaborate more on that ?
   
   The relation type was changed from `TableScan` to `PrunedFilteredScan` and 
so I need to return only required columns (not all columns as before), thus 
extracting only specific fields from `ConsumerRecord`.
   
   I was able to come up with alternative solution:
   ```
     class ConsumerRecordInspector(cr: ConsumerRecord[Array[Byte], 
Array[Byte]]) {
       def getValues(requiredColumns: List[Any]) : Seq[Any] = {
         requiredColumns match {
           case "key"::rest => cr.key +: getValues(rest)
           case "value"::rest => cr.value +: getValues(rest)
           case "topic"::rest => UTF8String.fromString(cr.topic) +: 
getValues(rest)
           case "partition"::rest => cr.partition +: getValues(rest)
           case "offset"::rest => cr.offset +: getValues(rest)
           case "timestamp"::rest => DateTimeUtils.
             fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)) +: 
getValues(rest)
           case "timestampType"::rest => cr.timestampType.id +: getValues(rest)
           case Seq() => Seq.empty
         }
       }
     }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to