[jira] [Commented] (KAFKA-5756) Synchronization issue on flush
[ https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17689450#comment-17689450 ] Chris Egerton commented on KAFKA-5756: -- I've merged Greg's fix and updated the fix version to 3.5.0, since this issue was not fully addressed before that fix. > Synchronization issue on flush > -- > > Key: KAFKA-5756 > URL: https://issues.apache.org/jira/browse/KAFKA-5756 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Oleg Kuznetsov >Assignee: Greg Harris >Priority: Major > Fix For: 3.5.0 > > > Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* > method, whereas this collection can be accessed from 2 different threads: > - *WorkerSourceTask.execute()*, finally block > - *SourceTaskOffsetCommitter*, from periodic flush task -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-5756) Synchronization issue on flush
[ https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684977#comment-17684977 ] Greg Harris commented on KAFKA-5756: [~mimaison] I have opened a PR that I think may alleviate this failure mode. > Synchronization issue on flush > -- > > Key: KAFKA-5756 > URL: https://issues.apache.org/jira/browse/KAFKA-5756 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Oleg Kuznetsov >Priority: Major > Fix For: 0.11.0.1, 1.0.0 > > > Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* > method, whereas this collection can be accessed from 2 different threads: > - *WorkerSourceTask.execute()*, finally block > - *SourceTaskOffsetCommitter*, from periodic flush task -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-5756) Synchronization issue on flush
[ https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17683888#comment-17683888 ] Mickael Maison commented on KAFKA-5756: --- As far as I can tell this is still an issue. [~gharris1727] Were you planning to propose a fix? > Synchronization issue on flush > -- > > Key: KAFKA-5756 > URL: https://issues.apache.org/jira/browse/KAFKA-5756 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Oleg Kuznetsov >Priority: Major > Fix For: 0.11.0.1, 1.0.0 > > > Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* > method, whereas this collection can be accessed from 2 different threads: > - *WorkerSourceTask.execute()*, finally block > - *SourceTaskOffsetCommitter*, from periodic flush task -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-5756) Synchronization issue on flush
[ https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161666#comment-17161666 ] Greg Harris commented on KAFKA-5756: I was able to reproduce this race condition with the following setup: * ConnectDistributedTest.test_bounce * clean bounces = True * tests/kafkatest/tests/connect/templates/connect-distributed.properties edited to include offset.flush.interval.ms=1 This setup makes collisions between the periodic commitOffsets and the commitOffsets call from stop()/close() extremely likely. In a single run of 9 bounces, I managed to have 7 instances of duplication (12 records in 7 non-consecutive groups). An example log file when one of these race conditions happens. I've interspersed the VerifiableSourceTask's stdout messages to indicate when the records are produced and committed. {noformat} [2020-07-20 22:45:47,619] DEBUG Submitting 1 entries to backing store. The offsets are: {{id=1}={seqno=15709} [2020-07-20 22:45:52,622] DEBUG Submitting 1 entries to backing store. The offsets are: {{id=1}={seqno=16209}} [2020-07-20 22:45:52.622] {"task":1,"seqno":16210,"time_ms":1595285152622,"name":"verifiable-source","topic":"test"} [2020-07-20 22:45:52,623] DEBUG [Producer clientId=connector-producer-verifiable-source-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=connector-producer-verifiable-source-1, correlationId=1395) and timeout 2147483647 to node 1: {acks=-1,timeout=2147483647,partitionSizes=[test-0=185]} [2020-07-20 22:45:52,623] INFO Stopping task verifiable-source-1 [2020-07-20 22:45:52.627] {"task":1,"seqno":16211,"time_ms":1595285152627,"name":"verifiable-source","topic":"test"} [2020-07-20 22:45:52,627] INFO WorkerSourceTask{id=verifiable-source-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) [2020-07-20 22:45:52,627] DEBUG [Producer clientId=connector-producer-verifiable-source-1] Received PRODUCE response from node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=connector-producer-verifiable-source-1, correlationId=1395): org.apache.kafka.common.requests.ProduceResponse@464a5bec (org.apache.kafka.clients.NetworkClient) [2020-07-20 22:45:52,627] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter) [2020-07-20 22:45:52,630] ERROR WorkerSourceTask{id=verifiable-source-1} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111) at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:490) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:274) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2020-07-20 22:45:52.630] {"committed":true,"task":1,"seqno":16210,"time_ms":1595285152630,"name":"verifiable-source","topic":"test"} [2020-07-20 22:45:52,630] DEBUG [Producer clientId=connector-producer-verifiable-source-1] Sending PRODUCE request with header RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=connector-producer-verifiable-source-1, correlationId=1396) and timeout 2147483647 to node 1: {acks=-1,timeout=2147483647,partitionSizes=[test-0=185]} [2020-07-20 22:45:52,630] ERROR WorkerSourceTask{id=verifiable-source-1} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [2020-07-20 22:45:52,631] INFO WorkerSourceTask{id=verifiable-source-1} Finished commitOffsets successfully in 9 ms (org.apache.kafka.connect.runtime.WorkerSourceTask) [2020-07-20 22:45:52,631] INFO [Producer clientId=connector-producer-verifiable-source-1] Closing the Kafka producer with timeoutMillis = 3 ms. [2020-07-20 22:45:52,632] DEBUG [Producer clientId=connector-producer-verifiable-source-1] Received PRODUCE response from node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=8, clientId=connector-producer-verifiable-source-1, correlationId=1396): org.apache.kafka.common.requests.ProduceResponse@6f98d76 [2020-07-20 22:45:52,632] DEBUG [Producer
[jira] [Commented] (KAFKA-5756) Synchronization issue on flush
[ https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16999547#comment-16999547 ] Chris Egerton commented on KAFKA-5756: -- [~olkuznsmith] [~rhauch] [~hachikuji] do you agree with the analysis above? > Synchronization issue on flush > -- > > Key: KAFKA-5756 > URL: https://issues.apache.org/jira/browse/KAFKA-5756 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Oleg Kuznetsov >Priority: Major > Fix For: 0.11.0.1, 1.0.0 > > > Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* > method, whereas this collection can be accessed from 2 different threads: > - *WorkerSourceTask.execute()*, finally block > - *SourceTaskOffsetCommitter*, from periodic flush task -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-5756) Synchronization issue on flush
[ https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16999546#comment-16999546 ] Chris Egerton commented on KAFKA-5756: -- I believe a very similar issue may still be possible and that the synchronization added in https://github.com/apache/kafka/pull/3702, while an improvement, still doesn't prevent all possible errors caused by concurrent calls to {{WorkerSourceTask::commitOffsets}} (which, as noted earlier in the ticket, can from from both the periodic offset commit from the {{SourceTaskOffsetCommitter}} class and the end-of-life offset commit from the {{WorkerSourceTask}} itself). The {{WorkerSourceTask}} class takes care to ensure that {{OffsetStorageWriter::beginFlush}} isn't invoked concurrently, which was implemented as part of https://github.com/apache/kafka/pull/3702. However, there doesn't appear to be anything in place to prevent that method from being invoked before a flush has completed (either via a call to {{OffsetStorageWriter::cancelFlush}} or to {{OffsetStorageWriter::doFlush::get}}). If this occurs, [an exception is thrown|https://github.com/apache/kafka/blob/c6e25bb362899f4e6335ac5578b1cae31b7f2575/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java#L108-L112] stating that "the framework should not allow this". Reopening this issue until the above scenario has been addressed. > Synchronization issue on flush > -- > > Key: KAFKA-5756 > URL: https://issues.apache.org/jira/browse/KAFKA-5756 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Oleg Kuznetsov >Priority: Major > Fix For: 0.11.0.1, 1.0.0 > > > Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* > method, whereas this collection can be accessed from 2 different threads: > - *WorkerSourceTask.execute()*, finally block > - *SourceTaskOffsetCommitter*, from periodic flush task -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-5756) Synchronization issue on flush
[ https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16155816#comment-16155816 ] ASF GitHub Bot commented on KAFKA-5756: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3702 > Synchronization issue on flush > -- > > Key: KAFKA-5756 > URL: https://issues.apache.org/jira/browse/KAFKA-5756 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Oleg Kuznetsov > > Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* > method, whereas this collection can be accessed from 2 different threads: > - *WorkerSourceTask.execute()*, finally block > - *SourceTaskOffsetCommitter*, from periodic flush task -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5756) Synchronization issue on flush
[ https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141672#comment-16141672 ] Randall Hauch commented on KAFKA-5756: -- [~olkuznsmith], thanks for the fix. As mentioned on the PR, the code change looks good. However, this PR needs to be against the {{trunk}} branch rather than an older release branch. This is the case for all Apache Kafka PRs. To have a change backported, you simply need to ask for it in the PR and to mention the specific branches. If the PR is approved and the backporting is approved, the committer will merge the changes and backport to those branches. If there are any issues with backporting, the committer may ask you to create a specific PR against that branch. > Synchronization issue on flush > -- > > Key: KAFKA-5756 > URL: https://issues.apache.org/jira/browse/KAFKA-5756 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Oleg Kuznetsov > > Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* > method, whereas this collection can be accessed from 2 different threads: > - *WorkerSourceTask.execute()*, finally block > - *SourceTaskOffsetCommitter*, from periodic flush task -- This message was sent by Atlassian JIRA (v6.4.14#64029)