[ 
https://issues.apache.org/jira/browse/KAFKA-6595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hanlin Liu updated KAFKA-6595:
------------------------------
    Description: 
Version: ConfluentPlatform Kafka 3.2.0

SourceTaskOffsetComitter calls commitOffset() and waits for all incomplete 
records to be sent. While the task is stopped, commitOffset() is called again 
by the final block in WorkerSourceTask.execute(), it will throw {{Invalid call 
to OffsetStorageWriter flush() while already flushing, the framework should not 
allow this}} exception. This will trigger closing Producer without waiting the 
flush timeout.

After 30 seconds, all incomplete records has been forcefully aborted. If the 
{{offset.flush.timeout.ms}} is configured larger than 30 seconds, 
WorkerSourceTask will consider those aborted records as sent within flush 
timeout, which results in incorrectly flushing the source offset.

 
{code:java}
// code placeholder

2018-02-27 02:59:33,134 INFO  [] Stopping connector 
dp-sqlserver-connector-dptask_455   [pool-3-thread-6][Worker.java:254]

2018-02-27 02:59:33,134 INFO  [] Stopped connector 
dp-sqlserver-connector-dptask_455   [pool-3-thread-6][Worker.java:264]



2018-02-27 02:59:34,121 ERROR [] Invalid call to OffsetStorageWriter flush() 
while already flushing, the framework should not allow this   
[pool-1-thread-13][OffsetStorageWriter.java:110]

2018-02-27 02:59:34,121 ERROR [] Task dp-sqlserver-connector-dptask_455-0 threw 
an uncaught and unrecoverable exception   
[pool-1-thread-13][WorkerTask.java:141]

org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is 
already flushing

        at 
org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)

        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:294)

        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:177)

        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)

        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)

        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

2018-02-27 03:00:00,734 ERROR [] Graceful stop of task 
dp-sqlserver-connector-dptask_455-0 failed.   [pool-3-thread-1][Worker.java:405]

2018-02-27 03:00:04,126 INFO  [] Proceeding to force close the producer since 
pending requests could not be completed within timeout 30 ms.   
[pool-1-thread-13][KafkaProducer.java:713]

2018-02-27 03:00:04,127 ERROR [] dp-sqlserver-connector-dptask_455-0 failed to 
send record to dptask_455.JF_TEST_11.jf_test_tab_8: {}   
[kafka-producer-network-thread | producer-31][WorkerSourceTask.java:228]

java.lang.IllegalStateException: Producer is closed forcefully.

        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:522)

        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:502)

        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147)

        at java.lang.Thread.run(Thread.java:745)

2018-02-27 03:00:09,920 INFO  [] Finished 
WorkerSourceTask{id=dp-sqlserver-connector-dptask_455-0} commitOffsets 
successfully in 47088 ms   [pool-4-thread-1][WorkerSourceTask.java:371]
{code}
 

 

  was:
SourceTaskOffsetComitter calls commitOffset() and waits for all incomplete 
records to be sent. While the task is stopped, commitOffset() is called again 
by the final block in WorkerSourceTask.execute(), it will throw {{Invalid call 
to OffsetStorageWriter flush() while already flushing, the framework should not 
allow this}} exception. This will trigger closing Producer without waiting the 
flush timeout.

After 30 seconds, all incomplete records has been forcefully aborted. If the 
{{offset.flush.timeout.ms}} is configured larger than 30 seconds, 
WorkerSourceTask will consider those aborted records as sent within flush 
timeout, which results in incorrectly flushing the source offset.

 
{code:java}
// code placeholder

2018-02-27 02:59:33,134 INFO  [] Stopping connector 
dp-sqlserver-connector-dptask_455   [pool-3-thread-6][Worker.java:254]

2018-02-27 02:59:33,134 INFO  [] Stopped connector 
dp-sqlserver-connector-dptask_455   [pool-3-thread-6][Worker.java:264]



2018-02-27 02:59:34,121 ERROR [] Invalid call to OffsetStorageWriter flush() 
while already flushing, the framework should not allow this   
[pool-1-thread-13][OffsetStorageWriter.java:110]

2018-02-27 02:59:34,121 ERROR [] Task dp-sqlserver-connector-dptask_455-0 threw 
an uncaught and unrecoverable exception   
[pool-1-thread-13][WorkerTask.java:141]

org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is 
already flushing

        at 
org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)

        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:294)

        at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:177)

        at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)

        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)

        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

2018-02-27 03:00:00,734 ERROR [] Graceful stop of task 
dp-sqlserver-connector-dptask_455-0 failed.   [pool-3-thread-1][Worker.java:405]

2018-02-27 03:00:04,126 INFO  [] Proceeding to force close the producer since 
pending requests could not be completed within timeout 30 ms.   
[pool-1-thread-13][KafkaProducer.java:713]

2018-02-27 03:00:04,127 ERROR [] dp-sqlserver-connector-dptask_455-0 failed to 
send record to dptask_455.JF_TEST_11.jf_test_tab_8: {}   
[kafka-producer-network-thread | producer-31][WorkerSourceTask.java:228]

java.lang.IllegalStateException: Producer is closed forcefully.

        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:522)

        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:502)

        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147)

        at java.lang.Thread.run(Thread.java:745)

2018-02-27 03:00:09,920 INFO  [] Finished 
WorkerSourceTask{id=dp-sqlserver-connector-dptask_455-0} commitOffsets 
successfully in 47088 ms   [pool-4-thread-1][WorkerSourceTask.java:371]
{code}
 

 


> Kafka connect commit offset incorrectly.
> ----------------------------------------
>
>                 Key: KAFKA-6595
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6595
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.10.2.0
>            Reporter: Hanlin Liu
>            Priority: Major
>
> Version: ConfluentPlatform Kafka 3.2.0
> SourceTaskOffsetComitter calls commitOffset() and waits for all incomplete 
> records to be sent. While the task is stopped, commitOffset() is called again 
> by the final block in WorkerSourceTask.execute(), it will throw {{Invalid 
> call to OffsetStorageWriter flush() while already flushing, the framework 
> should not allow this}} exception. This will trigger closing Producer without 
> waiting the flush timeout.
> After 30 seconds, all incomplete records has been forcefully aborted. If the 
> {{offset.flush.timeout.ms}} is configured larger than 30 seconds, 
> WorkerSourceTask will consider those aborted records as sent within flush 
> timeout, which results in incorrectly flushing the source offset.
>  
> {code:java}
> // code placeholder
> 2018-02-27 02:59:33,134 INFO  [] Stopping connector 
> dp-sqlserver-connector-dptask_455   [pool-3-thread-6][Worker.java:254]
> 2018-02-27 02:59:33,134 INFO  [] Stopped connector 
> dp-sqlserver-connector-dptask_455   [pool-3-thread-6][Worker.java:264]
> 2018-02-27 02:59:34,121 ERROR [] Invalid call to OffsetStorageWriter flush() 
> while already flushing, the framework should not allow this   
> [pool-1-thread-13][OffsetStorageWriter.java:110]
> 2018-02-27 02:59:34,121 ERROR [] Task dp-sqlserver-connector-dptask_455-0 
> threw an uncaught and unrecoverable exception   
> [pool-1-thread-13][WorkerTask.java:141]
> org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is 
> already flushing
>         at 
> org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
>         at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:294)
>         at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:177)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> 2018-02-27 03:00:00,734 ERROR [] Graceful stop of task 
> dp-sqlserver-connector-dptask_455-0 failed.   
> [pool-3-thread-1][Worker.java:405]
> 2018-02-27 03:00:04,126 INFO  [] Proceeding to force close the producer since 
> pending requests could not be completed within timeout 30 ms.   
> [pool-1-thread-13][KafkaProducer.java:713]
> 2018-02-27 03:00:04,127 ERROR [] dp-sqlserver-connector-dptask_455-0 failed 
> to send record to dptask_455.JF_TEST_11.jf_test_tab_8: {}   
> [kafka-producer-network-thread | producer-31][WorkerSourceTask.java:228]
> java.lang.IllegalStateException: Producer is closed forcefully.
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:522)
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:502)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147)
>         at java.lang.Thread.run(Thread.java:745)
> 2018-02-27 03:00:09,920 INFO  [] Finished 
> WorkerSourceTask{id=dp-sqlserver-connector-dptask_455-0} commitOffsets 
> successfully in 47088 ms   [pool-4-thread-1][WorkerSourceTask.java:371]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to