HeartSaVioR commented on code in PR #40905:
URL: https://github.com/apache/spark/pull/40905#discussion_r1174729496


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala:
##########
@@ -34,9 +36,17 @@ private[kafka010] case class KafkaBatchInputPartition(
     failOnDataLoss: Boolean,
     includeHeaders: Boolean) extends InputPartition
 
-private[kafka010] object KafkaBatchReaderFactory extends 
PartitionReaderFactory {
+private[kafka010] object KafkaBatchReaderFactory extends 
PartitionReaderFactory with Logging {
   override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
     val p = partition.asInstanceOf[KafkaBatchInputPartition]
+
+    val queryId = 
TaskContext.get().getLocalProperty(StreamExecution.QUERY_ID_KEY)

Review Comment:
   nit: shall we simply add a new local variable for `TaskContext.get()`? 
Something like `val taskCtx = TaskContext.get()` or even just `ctx`.



##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala:
##########
@@ -34,9 +36,17 @@ private[kafka010] case class KafkaBatchInputPartition(
     failOnDataLoss: Boolean,
     includeHeaders: Boolean) extends InputPartition
 
-private[kafka010] object KafkaBatchReaderFactory extends 
PartitionReaderFactory {
+private[kafka010] object KafkaBatchReaderFactory extends 
PartitionReaderFactory with Logging {
   override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
     val p = partition.asInstanceOf[KafkaBatchInputPartition]
+
+    val queryId = 
TaskContext.get().getLocalProperty(StreamExecution.QUERY_ID_KEY)
+    val batchId = 
TaskContext.get().getLocalProperty(MicroBatchExecution.BATCH_ID_KEY)
+    logInfo(s"Creating Kafka reader 
partitionId=${TaskContext.get().partitionId()} " +

Review Comment:
   minor: placing partitionId near to topicPartition may lead to misunderstand 
that partitionId refer to Kafka's one (hence redundant info.). It's Spark's 
partition ID. 
   
   I'd suggest placing it after taskId, and maybe add ` for the query ` in 
between `untilOffset` and `queryId` to differentiate Kafka related info vs 
Spark/query related info.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to