Nishanth28 opened a new pull request, #52745:
URL: https://github.com/apache/spark/pull/52745
## Summary
Add taskId and partitionId to KafkaDataConsumer release() log messages for
better correlation with Spark executor logs
## Issue Type
**Improvement / Enhancement**
## Components
- Structured Streaming
- Kafka
## Affects Version
- 4.1.0 (and earlier versions)
---
## Description
### Problem Statement
The current Kafka consumer logging in `KafkaDataConsumer.release()` does not
include task context information (taskId and partitionId), making it difficult
to correlate Kafka read durations with specific Spark tasks that executed them.
This limitation reduces visibility into which tasks consumed the most time
reading from Kafka, hindering performance analysis and debugging in large-scale
streaming applications.
### Current Behavior
When examining executor logs, there is a disconnect between task execution
logs and Kafka consumer logs:
```
14:30:16 INFO Executor: Running task 83.0 in stage 32.0 (TID 4188)
14:30:16 INFO KafkaBatchReaderFactoryWithRowBytesAccumulator: Creating Kafka
reader... taskId=4188 partitionId=83
14:35:19 INFO KafkaDataConsumer: From Kafka topicPartition=my-topic-0
groupId=spark-kafka-consumer read 12922 records through 80 polls (polled out
12979 records), taking 299911.706198 ms, during time span of 303617.473973 ms.
❌ NO TASK CONTEXT - Cannot correlate with task 4188
14:35:19 INFO Executor: Finished task 83.0 in stage 32.0 (TID 4188)
```
**Issues with Current Logging:**
1. Cannot determine which task ID corresponds to which Kafka read operation
2. Difficult to identify slow tasks reading from Kafka
3. Hard to correlate Kafka consumer performance with specific partitions
being processed
4. Limited ability to diagnose partition-level performance issues in
streaming jobs
### Expected Behavior
After this enhancement, logs should include task context information:
```
14:30:16 INFO Executor: Running task 83.0 in stage 32.0 (TID 4188)
14:30:16 INFO KafkaBatchReaderFactoryWithRowBytesAccumulator: Creating Kafka
reader... taskId=4188 partitionId=83
14:35:19 INFO KafkaDataConsumer: From Kafka topicPartition=my-topic-0
groupId=spark-kafka-consumer read 12922 records through 80 polls (polled out
12979 records), taking 299911.706198 ms, during time span of 303617.473973 ms
for [taskId: 4188, partitionId: 83]
✅ INCLUDES TASK CONTEXT - Easy to correlate
14:35:19 INFO Executor: Finished task 83.0 in stage 32.0 (TID 4188)
```
---
## Motivation
### Use Cases
1. **Performance Debugging**: Quickly identify which Spark tasks are
spending excessive time reading from Kafka
2. **Partition Skew Analysis**: Correlate slow Kafka reads with specific
partition IDs to identify data skew issues
3. **End-to-End Task Tracking**: Enable seamless correlation between task
execution start/end and Kafka consumer operations
4. **Production Monitoring**: Facilitate better alerting and monitoring by
tracking Kafka read performance per task
5. **Troubleshooting**: Easier root cause analysis when investigating
streaming job failures or slowness
### Benefits
- **Improved Observability**: Complete visibility into Kafka consumer
behavior at task granularity
- **Faster Debugging**: Reduce time to identify performance bottlenecks in
streaming applications
- **Better Monitoring**: Enable more accurate SLAs and alerting based on
per-task Kafka read metrics
- **Consistency**: Aligns KafkaDataConsumer logging with other Spark
components that already include task context
---
## Proposed Solution
### Implementation Details
Modify the `KafkaDataConsumer.release()` method to capture and log task
context information:
**File**:
`connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala`
**Changes**:
1. Extract TaskContext information when available
2. Format task context as structured log entries using MDC (Mapped
Diagnostic Context)
3. Append task context to existing log message
```scala
def release(): Unit = {
val kafkaMeta = _consumer
.map(c => s"topicPartition=${c.topicPartition} groupId=${c.groupId}")
.getOrElse("")
val walTime = System.nanoTime() - startTimestampNano
// Get task context information for correlation with executor logs
val taskCtx = TaskContext.get()
val taskContextInfo = if (taskCtx != null) {
log" for [taskId: ${MDC(TASK_ATTEMPT_ID, taskCtx.taskAttemptId())}, " +
log"partitionId: ${MDC(PARTITION_ID, taskCtx.partitionId())}]"
} else {
log""
}
logInfo(log"From Kafka ${MDC(CONSUMER, kafkaMeta)} read " +
log"${MDC(NUM_RECORDS_READ, totalRecordsRead)} records through " +
log"${MDC(NUM_KAFKA_PULLS, numPolls)} polls " +
log"(polled out ${MDC(NUM_KAFKA_RECORDS_PULLED, numRecordsPolled)}
records), " +
log"taking ${MDC(TOTAL_TIME_READ, totalTimeReadNanos /
NANOS_PER_MILLIS.toDouble)} ms, " +
log"during time span of ${MDC(TIME, walTime /
NANOS_PER_MILLIS.toDouble)} ms" +
taskContextInfo // ✅ ADDED
)
releaseConsumer()
releaseFetchedData()
}
```
### Key Implementation Notes
1. **Graceful Handling**: Uses `TaskContext.get()` which returns `null` when
not in task context (e.g., during driver-side operations)
2. **Structured Logging**: Leverages existing MDC (Mapped Diagnostic
Context) for consistent structured logging
3. **Backward Compatible**: Does not break existing log parsing tools - task
context is appended at the end
4. **Zero Overhead**: Only adds minimal string formatting when task context
is available
5. **Consistent Format**: Uses same MDC keys (`TASK_ATTEMPT_ID`,
`PARTITION_ID`) as other Spark components
---
## Testing
### Test Cases
1. **Unit Tests**: Verify log output includes task context when TaskContext
is available
2. **Null Safety**: Verify graceful handling when TaskContext is null
(driver-side operations)
3. **Integration Tests**: Validate task context correlation in end-to-end
streaming scenarios
4. **Format Validation**: Ensure structured logging format is maintained
### Test Validation
Run existing test suite:
```bash
./build/mvn -pl connector/kafka-0-10-sql \
-am test \
-DwildcardSuites=org.apache.spark.sql.kafka010.consumer.KafkaDataConsumerSuite
```
Expected output in test logs:
```
From Kafka topicPartition=test-topic-0 groupId=test-group read 1000 records
through 2 polls (polled out 1000 records), taking 148.63 ms, during time span
of 557.43 ms for [taskId: 0, partitionId: 0]
```
---
## Impact Analysis
### Scope
- **Minimal Code Change**: Single method modification in
`KafkaDataConsumer.scala`
- **No API Changes**: Internal logging enhancement, no public API
modifications
- **No Performance Impact**: Negligible overhead (simple string formatting)
- **Backward Compatible**: Existing log parsing tools will continue to work
### Risk Assessment
- **Low Risk**: Changes are isolated to logging statements
- **No Breaking Changes**: Additive enhancement only
- **Easy to Revert**: Can be rolled back if issues arise
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]