HeartSaVioR commented on code in PR #40905:
URL: https://github.com/apache/spark/pull/40905#discussion_r1174729496
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala:
##########
@@ -34,9 +36,17 @@ private[kafka010] case class KafkaBatchInputPartition(
failOnDataLoss: Boolean,
includeHeaders: Boolean) extends InputPartition
-private[kafka010] object KafkaBatchReaderFactory extends
PartitionReaderFactory {
+private[kafka010] object KafkaBatchReaderFactory extends
PartitionReaderFactory with Logging {
override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
val p = partition.asInstanceOf[KafkaBatchInputPartition]
+
+ val queryId =
TaskContext.get().getLocalProperty(StreamExecution.QUERY_ID_KEY)
Review Comment:
nit: shall we simply add a new local variable for `TaskContext.get()`?
Something like `val taskCtx = TaskContext.get()` or even just `ctx`.
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala:
##########
@@ -34,9 +36,17 @@ private[kafka010] case class KafkaBatchInputPartition(
failOnDataLoss: Boolean,
includeHeaders: Boolean) extends InputPartition
-private[kafka010] object KafkaBatchReaderFactory extends
PartitionReaderFactory {
+private[kafka010] object KafkaBatchReaderFactory extends
PartitionReaderFactory with Logging {
override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
val p = partition.asInstanceOf[KafkaBatchInputPartition]
+
+ val queryId =
TaskContext.get().getLocalProperty(StreamExecution.QUERY_ID_KEY)
+ val batchId =
TaskContext.get().getLocalProperty(MicroBatchExecution.BATCH_ID_KEY)
+ logInfo(s"Creating Kafka reader
partitionId=${TaskContext.get().partitionId()} " +
Review Comment:
minor: placing partitionId near to topicPartition may lead to misunderstand
that partitionId refer to Kafka's one (hence redundant info.). It's Spark's
partition ID.
I'd suggest placing it after taskId, and maybe add ` for the query ` in
between `untilOffset` and `queryId` to differentiate Kafka related info vs
Spark/query related info.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]