[jira] [Commented] (KAFKA-5756) Synchronization issue on flush

2023-02-15 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17689450#comment-17689450
 ] 

Chris Egerton commented on KAFKA-5756:
--

I've merged Greg's fix and updated the fix version to 3.5.0, since this issue 
was not fully addressed before that fix.

> Synchronization issue on flush
> --
>
> Key: KAFKA-5756
> URL: https://issues.apache.org/jira/browse/KAFKA-5756
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Oleg Kuznetsov
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.5.0
>
>
> Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* 
> method, whereas this collection can be accessed from 2 different threads:
> - *WorkerSourceTask.execute()*, finally block
> - *SourceTaskOffsetCommitter*, from periodic flush task



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-5756) Synchronization issue on flush

2023-02-06 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17684977#comment-17684977
 ] 

Greg Harris commented on KAFKA-5756:


[~mimaison] I have opened a PR that I think may alleviate this failure mode.

> Synchronization issue on flush
> --
>
> Key: KAFKA-5756
> URL: https://issues.apache.org/jira/browse/KAFKA-5756
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Oleg Kuznetsov
>Priority: Major
> Fix For: 0.11.0.1, 1.0.0
>
>
> Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* 
> method, whereas this collection can be accessed from 2 different threads:
> - *WorkerSourceTask.execute()*, finally block
> - *SourceTaskOffsetCommitter*, from periodic flush task



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-5756) Synchronization issue on flush

2023-02-03 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17683888#comment-17683888
 ] 

Mickael Maison commented on KAFKA-5756:
---

As far as I can tell this is still an issue. [~gharris1727] Were you planning 
to propose a fix?

> Synchronization issue on flush
> --
>
> Key: KAFKA-5756
> URL: https://issues.apache.org/jira/browse/KAFKA-5756
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Oleg Kuznetsov
>Priority: Major
> Fix For: 0.11.0.1, 1.0.0
>
>
> Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* 
> method, whereas this collection can be accessed from 2 different threads:
> - *WorkerSourceTask.execute()*, finally block
> - *SourceTaskOffsetCommitter*, from periodic flush task



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-5756) Synchronization issue on flush

2020-07-20 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17161666#comment-17161666
 ] 

Greg Harris commented on KAFKA-5756:


I was able to reproduce this race condition with the following setup:
 * ConnectDistributedTest.test_bounce
 * clean bounces = True
 * tests/kafkatest/tests/connect/templates/connect-distributed.properties 
edited to include offset.flush.interval.ms=1

This setup makes collisions between the periodic commitOffsets and the 
commitOffsets call from stop()/close() extremely likely. In a single run of 9 
bounces, I managed to have 7 instances of duplication (12 records in 7 
non-consecutive groups).

An example log file when one of these race conditions happens. I've 
interspersed the VerifiableSourceTask's stdout messages to indicate when the 
records are produced and committed.
{noformat}
[2020-07-20 22:45:47,619] DEBUG Submitting 1 entries to backing store. The 
offsets are: {{id=1}={seqno=15709}
[2020-07-20 22:45:52,622] DEBUG Submitting 1 entries to backing store. The 
offsets are: {{id=1}={seqno=16209}}
[2020-07-20 22:45:52.622] 
{"task":1,"seqno":16210,"time_ms":1595285152622,"name":"verifiable-source","topic":"test"}
[2020-07-20 22:45:52,623] DEBUG [Producer 
clientId=connector-producer-verifiable-source-1] Sending PRODUCE request with 
header RequestHeader(apiKey=PRODUCE, apiVersion=8, 
clientId=connector-producer-verifiable-source-1, correlationId=1395) and 
timeout 2147483647 to node 1: 
{acks=-1,timeout=2147483647,partitionSizes=[test-0=185]}
[2020-07-20 22:45:52,623] INFO Stopping task verifiable-source-1
[2020-07-20 22:45:52.627] 
{"task":1,"seqno":16211,"time_ms":1595285152627,"name":"verifiable-source","topic":"test"}
[2020-07-20 22:45:52,627] INFO WorkerSourceTask{id=verifiable-source-1} 
Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-07-20 22:45:52,627] DEBUG [Producer 
clientId=connector-producer-verifiable-source-1] Received PRODUCE response from 
node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=8, 
clientId=connector-producer-verifiable-source-1, correlationId=1395): 
org.apache.kafka.common.requests.ProduceResponse@464a5bec 
(org.apache.kafka.clients.NetworkClient)
[2020-07-20 22:45:52,627] ERROR Invalid call to OffsetStorageWriter flush() 
while already flushing, the framework should not allow this 
(org.apache.kafka.connect.storage.OffsetStorageWriter)
[2020-07-20 22:45:52,630] ERROR WorkerSourceTask{id=verifiable-source-1} Task 
threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is 
already flushing
at 
org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:490)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:274)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-07-20 22:45:52.630] 
{"committed":true,"task":1,"seqno":16210,"time_ms":1595285152630,"name":"verifiable-source","topic":"test"}
[2020-07-20 22:45:52,630] DEBUG [Producer 
clientId=connector-producer-verifiable-source-1] Sending PRODUCE request with 
header RequestHeader(apiKey=PRODUCE, apiVersion=8, 
clientId=connector-producer-verifiable-source-1, correlationId=1396) and 
timeout 2147483647 to node 1: 
{acks=-1,timeout=2147483647,partitionSizes=[test-0=185]}
[2020-07-20 22:45:52,630] ERROR WorkerSourceTask{id=verifiable-source-1} Task 
is being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask)
[2020-07-20 22:45:52,631] INFO WorkerSourceTask{id=verifiable-source-1} 
Finished commitOffsets successfully in 9 ms 
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-07-20 22:45:52,631] INFO [Producer 
clientId=connector-producer-verifiable-source-1] Closing the Kafka producer 
with timeoutMillis = 3 ms.
[2020-07-20 22:45:52,632] DEBUG [Producer 
clientId=connector-producer-verifiable-source-1] Received PRODUCE response from 
node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=8, 
clientId=connector-producer-verifiable-source-1, correlationId=1396): 
org.apache.kafka.common.requests.ProduceResponse@6f98d76
[2020-07-20 22:45:52,632] DEBUG [Producer 

[jira] [Commented] (KAFKA-5756) Synchronization issue on flush

2019-12-18 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16999547#comment-16999547
 ] 

Chris Egerton commented on KAFKA-5756:
--

[~olkuznsmith] [~rhauch] [~hachikuji] do you agree with the analysis above?

> Synchronization issue on flush
> --
>
> Key: KAFKA-5756
> URL: https://issues.apache.org/jira/browse/KAFKA-5756
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Oleg Kuznetsov
>Priority: Major
> Fix For: 0.11.0.1, 1.0.0
>
>
> Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* 
> method, whereas this collection can be accessed from 2 different threads:
> - *WorkerSourceTask.execute()*, finally block
> - *SourceTaskOffsetCommitter*, from periodic flush task



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5756) Synchronization issue on flush

2019-12-18 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16999546#comment-16999546
 ] 

Chris Egerton commented on KAFKA-5756:
--

I believe a very similar issue may still be possible and that the 
synchronization added in https://github.com/apache/kafka/pull/3702, while an 
improvement, still doesn't prevent all possible errors caused by concurrent 
calls to {{WorkerSourceTask::commitOffsets}} (which, as noted earlier in the 
ticket, can from from both the periodic offset commit from the 
{{SourceTaskOffsetCommitter}} class and the end-of-life offset commit from the 
{{WorkerSourceTask}} itself).

The {{WorkerSourceTask}} class takes care to ensure that 
{{OffsetStorageWriter::beginFlush}} isn't invoked concurrently, which was 
implemented as part of https://github.com/apache/kafka/pull/3702. However, 
there doesn't appear to be anything in place to prevent that method from being 
invoked before a flush has completed (either via a call to 
{{OffsetStorageWriter::cancelFlush}} or to 
{{OffsetStorageWriter::doFlush::get}}). If this occurs, [an exception is 
thrown|https://github.com/apache/kafka/blob/c6e25bb362899f4e6335ac5578b1cae31b7f2575/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java#L108-L112]
 stating that "the framework should not allow this".

Reopening this issue until the above scenario has been addressed.

> Synchronization issue on flush
> --
>
> Key: KAFKA-5756
> URL: https://issues.apache.org/jira/browse/KAFKA-5756
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Oleg Kuznetsov
>Priority: Major
> Fix For: 0.11.0.1, 1.0.0
>
>
> Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* 
> method, whereas this collection can be accessed from 2 different threads:
> - *WorkerSourceTask.execute()*, finally block
> - *SourceTaskOffsetCommitter*, from periodic flush task



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5756) Synchronization issue on flush

2017-09-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16155816#comment-16155816
 ] 

ASF GitHub Bot commented on KAFKA-5756:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3702


> Synchronization issue on flush
> --
>
> Key: KAFKA-5756
> URL: https://issues.apache.org/jira/browse/KAFKA-5756
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Oleg Kuznetsov
>
> Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* 
> method, whereas this collection can be accessed from 2 different threads:
> - *WorkerSourceTask.execute()*, finally block
> - *SourceTaskOffsetCommitter*, from periodic flush task



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5756) Synchronization issue on flush

2017-08-25 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16141672#comment-16141672
 ] 

Randall Hauch commented on KAFKA-5756:
--

[~olkuznsmith], thanks for the fix. As mentioned on the PR, the code change 
looks good. 

However, this PR needs to be against the {{trunk}} branch rather than an older 
release branch. This is the case for all Apache Kafka PRs. 

To have a change backported, you simply need to ask for it in the PR and to 
mention the specific branches. If the PR is approved and the backporting is 
approved, the committer will merge the changes and backport to those branches. 
If there are any issues with backporting, the committer may ask you to create a 
specific PR against that branch.

> Synchronization issue on flush
> --
>
> Key: KAFKA-5756
> URL: https://issues.apache.org/jira/browse/KAFKA-5756
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Oleg Kuznetsov
>
> Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* 
> method, whereas this collection can be accessed from 2 different threads:
> - *WorkerSourceTask.execute()*, finally block
> - *SourceTaskOffsetCommitter*, from periodic flush task



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)