Nishanth28 opened a new pull request, #52745:
URL: https://github.com/apache/spark/pull/52745

   ## Summary
   Add taskId and partitionId to KafkaDataConsumer release() log messages for 
better correlation with Spark executor logs
   
   ## Issue Type
   **Improvement / Enhancement**
   
   ## Components
   - Structured Streaming
   - Kafka
   
   ## Affects Version
   - 4.1.0 (and earlier versions)
   
   ---
   
   ## Description
   
   ### Problem Statement
   
   The current Kafka consumer logging in `KafkaDataConsumer.release()` does not 
include task context information (taskId and partitionId), making it difficult 
to correlate Kafka read durations with specific Spark tasks that executed them. 
This limitation reduces visibility into which tasks consumed the most time 
reading from Kafka, hindering performance analysis and debugging in large-scale 
streaming applications.
   
   ### Current Behavior
   
   When examining executor logs, there is a disconnect between task execution 
logs and Kafka consumer logs:
   
   ```
   14:30:16 INFO Executor: Running task 83.0 in stage 32.0 (TID 4188)
   
   14:30:16 INFO KafkaBatchReaderFactoryWithRowBytesAccumulator: Creating Kafka 
reader...  taskId=4188 partitionId=83
   
   14:35:19 INFO KafkaDataConsumer: From Kafka topicPartition=my-topic-0 
groupId=spark-kafka-consumer read 12922 records through 80 polls (polled out 
12979 records), taking 299911.706198 ms, during time span of 303617.473973 ms.
   ❌ NO TASK CONTEXT - Cannot correlate with task 4188
   
   14:35:19 INFO Executor: Finished task 83.0 in stage 32.0 (TID 4188)
   ```
   
   **Issues with Current Logging:**
   1. Cannot determine which task ID corresponds to which Kafka read operation
   2. Difficult to identify slow tasks reading from Kafka
   3. Hard to correlate Kafka consumer performance with specific partitions 
being processed
   4. Limited ability to diagnose partition-level performance issues in 
streaming jobs
   
   ### Expected Behavior
   
   After this enhancement, logs should include task context information:
   
   ```
   14:30:16 INFO Executor: Running task 83.0 in stage 32.0 (TID 4188)
   
   14:30:16 INFO KafkaBatchReaderFactoryWithRowBytesAccumulator: Creating Kafka 
reader...  taskId=4188 partitionId=83
   
   14:35:19 INFO KafkaDataConsumer: From Kafka topicPartition=my-topic-0 
groupId=spark-kafka-consumer read 12922 records through 80 polls (polled out 
12979 records), taking 299911.706198 ms, during time span of 303617.473973 ms 
for [taskId: 4188, partitionId: 83]
   ✅ INCLUDES TASK CONTEXT - Easy to correlate
   
   14:35:19 INFO Executor: Finished task 83.0 in stage 32.0 (TID 4188)
   ```
   
   ---
   
   ## Motivation
   
   ### Use Cases
   
   1. **Performance Debugging**: Quickly identify which Spark tasks are 
spending excessive time reading from Kafka
   2. **Partition Skew Analysis**: Correlate slow Kafka reads with specific 
partition IDs to identify data skew issues
   3. **End-to-End Task Tracking**: Enable seamless correlation between task 
execution start/end and Kafka consumer operations
   4. **Production Monitoring**: Facilitate better alerting and monitoring by 
tracking Kafka read performance per task
   5. **Troubleshooting**: Easier root cause analysis when investigating 
streaming job failures or slowness
   
   ### Benefits
   
   - **Improved Observability**: Complete visibility into Kafka consumer 
behavior at task granularity
   - **Faster Debugging**: Reduce time to identify performance bottlenecks in 
streaming applications
   - **Better Monitoring**: Enable more accurate SLAs and alerting based on 
per-task Kafka read metrics
   - **Consistency**: Aligns KafkaDataConsumer logging with other Spark 
components that already include task context
   
   ---
   
   ## Proposed Solution
   
   ### Implementation Details
   
   Modify the `KafkaDataConsumer.release()` method to capture and log task 
context information:
   
   **File**: 
`connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala`
   
   **Changes**:
   
   1. Extract TaskContext information when available
   2. Format task context as structured log entries using MDC (Mapped 
Diagnostic Context)
   3. Append task context to existing log message
   
   ```scala
   def release(): Unit = {
     val kafkaMeta = _consumer
       .map(c => s"topicPartition=${c.topicPartition} groupId=${c.groupId}")
       .getOrElse("")
     val walTime = System.nanoTime() - startTimestampNano
   
     // Get task context information for correlation with executor logs
     val taskCtx = TaskContext.get()
     val taskContextInfo = if (taskCtx != null) {
       log" for [taskId: ${MDC(TASK_ATTEMPT_ID, taskCtx.taskAttemptId())}, " +
         log"partitionId: ${MDC(PARTITION_ID, taskCtx.partitionId())}]"
     } else {
       log""
     }
   
     logInfo(log"From Kafka ${MDC(CONSUMER, kafkaMeta)} read " +
       log"${MDC(NUM_RECORDS_READ, totalRecordsRead)} records through " +
       log"${MDC(NUM_KAFKA_PULLS, numPolls)} polls " +
       log"(polled out ${MDC(NUM_KAFKA_RECORDS_PULLED, numRecordsPolled)} 
records), " +
       log"taking ${MDC(TOTAL_TIME_READ, totalTimeReadNanos / 
NANOS_PER_MILLIS.toDouble)} ms, " +
       log"during time span of ${MDC(TIME, walTime / 
NANOS_PER_MILLIS.toDouble)} ms" +
       taskContextInfo  // ✅ ADDED
     )
   
     releaseConsumer()
     releaseFetchedData()
   }
   ```
   
   ### Key Implementation Notes
   
   1. **Graceful Handling**: Uses `TaskContext.get()` which returns `null` when 
not in task context (e.g., during driver-side operations)
   2. **Structured Logging**: Leverages existing MDC (Mapped Diagnostic 
Context) for consistent structured logging
   3. **Backward Compatible**: Does not break existing log parsing tools - task 
context is appended at the end
   4. **Zero Overhead**: Only adds minimal string formatting when task context 
is available
   5. **Consistent Format**: Uses same MDC keys (`TASK_ATTEMPT_ID`, 
`PARTITION_ID`) as other Spark components
   
   ---
   
   ## Testing
   
   ### Test Cases
   
   1. **Unit Tests**: Verify log output includes task context when TaskContext 
is available
   2. **Null Safety**: Verify graceful handling when TaskContext is null 
(driver-side operations)
   3. **Integration Tests**: Validate task context correlation in end-to-end 
streaming scenarios
   4. **Format Validation**: Ensure structured logging format is maintained
   
   ### Test Validation
   
   Run existing test suite:
   ```bash
   ./build/mvn -pl connector/kafka-0-10-sql \
     -am test \
     
-DwildcardSuites=org.apache.spark.sql.kafka010.consumer.KafkaDataConsumerSuite
   ```
   
   Expected output in test logs:
   ```
   From Kafka topicPartition=test-topic-0 groupId=test-group read 1000 records 
through 2 polls (polled out 1000 records), taking 148.63 ms, during time span 
of 557.43 ms for [taskId: 0, partitionId: 0]
   ```
   
   ---
   
   ## Impact Analysis
   
   ### Scope
   - **Minimal Code Change**: Single method modification in 
`KafkaDataConsumer.scala`
   - **No API Changes**: Internal logging enhancement, no public API 
modifications
   - **No Performance Impact**: Negligible overhead (simple string formatting)
   - **Backward Compatible**: Existing log parsing tools will continue to work
   
   ### Risk Assessment
   - **Low Risk**: Changes are isolated to logging statements
   - **No Breaking Changes**: Additive enhancement only
   - **Easy to Revert**: Can be rolled back if issues arise
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to