[GitHub] flink pull request #5329: [FLINK-8409] [kafka] Fix potential NPE in KafkaCon...

2018-02-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5329


---


[GitHub] flink pull request #5329: [FLINK-8409] [kafka] Fix potential NPE in KafkaCon...

2018-01-21 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5329

[FLINK-8409] [kafka] Fix potential NPE in KafkaConsumerThread

## What is the purpose of the change

This PR fixes a race condition that may lead to a NPE in the async 
callbacks for Kafka offset committing.

The following lines in the KafkaConsumerThread::setOffsetsToCommit(...) 
suggests a race condition with the asynchronous callback from committing 
offsets to Kafka:

```
// record the work to be committed by the main consumer thread and make 
sure the consumer notices that
if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
log.warn("Committing offsets to Kafka takes longer than the checkpoint 
interval. " +
"Skipping commit of previous offsets because newer complete 
checkpoint offsets are available. " +
"This does not compromise Flink's checkpoint integrity.");
}
this.offsetCommitCallback = commitCallback;
```

In the main consumer thread's main loop, `nextOffsetsToCommit` will be 
checked if there are any offsets to commit. If so, an asynchronous offset 
commit operation will be performed. The NPE happens in the case when the commit 
completes, but `this.offsetCommitCallback = commitCallback;` is not yet reached.

This PR fixes this by making setting the `commitCallback` and 
`nextOffsetsToCommit` an atomic operation.

## Verifying this change

No new tests were added.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8409

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5329.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5329


commit c8081acaf51b3407af7d425063572f03a68021ac
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-13T12:13:20Z

[FLINK-8409] [kafka] Fix offset committing race condition in 
KafkaConsumerThread

commit a1bf7af3a806871b1177cb58c127c94bd75c
Author: Tzu-Li (Gordon) Tai 
Date:   2018-01-13T12:35:03Z

[hotfix] [kafka] Make AbortedReassignmentException a static class




---