jerrypeng commented on code in PR #52729:
URL: https://github.com/apache/spark/pull/52729#discussion_r2479085963


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala:
##########
@@ -93,6 +104,38 @@ private case class KafkaBatchPartitionReader(
     }
   }
 
+  override def nextWithTimeout(timeoutMs: java.lang.Long): RecordStatus = {
+    if (!iteratorForRealTimeMode.isDefined) {
+      logInfo(s"Getting a new kafka consuming iterator for 
${offsetRange.topicPartition} " +
+        s"starting from ${nextOffset}, timeoutMs ${timeoutMs}")
+      iteratorForRealTimeMode = Some(consumer.getIterator(nextOffset))
+    }
+    assert(iteratorForRealTimeMode.isDefined)
+    val nextRecord = iteratorForRealTimeMode.get.nextWithTimeout(timeoutMs)
+    nextRecord.foreach { record =>
+
+      nextRow = unsafeRowProjector(record)
+      nextOffset = record.offset + 1
+      if (record.timestampType() == TimestampType.LOG_APPEND_TIME ||
+        record.timestampType() == TimestampType.CREATE_TIME) {
+        if (!timestampTypeLogged) {
+          logInfo(log"Kafka source record timestamp type is " +
+            log"${MDC(LogKeys.TIMESTAMP_COLUMN_NAME, record.timestampType())}")
+          timestampTypeLogged = true
+        }

Review Comment:
   This tells us the semantics of of the timestamp column from a kafka record.  
 That is, whether timestamp for records from this topic is set to wal append 
time (when the record is persisted by kafka brokers) or create time which is 
either when the record is produced by a kafka producer or is user-defined.  
This information is use when calculating latency to understand what journey we 
are actually measuring.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to