GitHub user tzulitai opened a pull request:
https://github.com/apache/flink/pull/5329
[FLINK-8409] [kafka] Fix potential NPE in KafkaConsumerThread
## What is the purpose of the change
This PR fixes a race condition that may lead to a NPE in the async
callbacks for Kafka offset committing.
The following lines in the KafkaConsumerThread::setOffsetsToCommit(...)
suggests a race condition with the asynchronous callback from committing
offsets to Kafka:
```
// record the work to be committed by the main consumer thread and make
sure the consumer notices that
if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
log.warn("Committing offsets to Kafka takes longer than the checkpoint
interval. " +
"Skipping commit of previous offsets because newer complete
checkpoint offsets are available. " +
"This does not compromise Flink's checkpoint integrity.");
}
this.offsetCommitCallback = commitCallback;
```
In the main consumer thread's main loop, `nextOffsetsToCommit` will be
checked if there are any offsets to commit. If so, an asynchronous offset
commit operation will be performed. The NPE happens in the case when the commit
completes, but `this.offsetCommitCallback = commitCallback;` is not yet reached.
This PR fixes this by making setting the `commitCallback` and
`nextOffsetsToCommit` an atomic operation.
## Verifying this change
No new tests were added.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / **no**)
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (yes / **no**)
- The serializers: (yes / **no** / don't know)
- The runtime per-record code paths (performance sensitive): (yes /
**no** / don't know)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
- The S3 file system connector: (yes / **no** / don't know)
## Documentation
- Does this pull request introduce a new feature? (yes / **no**)
- If yes, how is the feature documented? (**not applicable** / docs /
JavaDocs / not documented)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tzulitai/flink FLINK-8409
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5329.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5329
commit c8081acaf51b3407af7d425063572f03a68021ac
Author: Tzu-Li (Gordon) Tai
Date: 2018-01-13T12:13:20Z
[FLINK-8409] [kafka] Fix offset committing race condition in
KafkaConsumerThread
commit a1bf7af3a806871b1177cb58c127c94bd75c
Author: Tzu-Li (Gordon) Tai
Date: 2018-01-13T12:35:03Z
[hotfix] [kafka] Make AbortedReassignmentException a static class
---