[ 
https://issues.apache.org/jira/browse/KAFKA-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-4632.
----------------------------------
       Resolution: Fixed
    Fix Version/s: 0.10.0.1
                   0.10.1.0

I'm going to close this as fixed in 0.10.0.1. [~ScottReynolds], if you 
disagree, please feel free to reopen with more detail.

> Kafka Connect WorkerSinkTask.closePartitions doesn't handle WakeupException
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-4632
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4632
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0
>            Reporter: Scott Reynolds
>            Priority: Major
>             Fix For: 0.10.1.0, 0.10.0.1
>
>
> WorkerSinkTask's closePartitions method isn't handling WakeupException that 
> can be thrown from commitSync.
> {code}
> org.apache.kafka.common.errors.WakeupException
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup
>  (ConsumerNetworkClient.java:404)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll 
> (ConsumerNetworkClient.java:245)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll 
> (ConsumerNetworkClient.java:180)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync
>  (ConsumerCoordinator.java:499)
> at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync 
> (KafkaConsumer.java:1104)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync 
> (WorkerSinkTask.java:245)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit 
> (WorkerSinkTask.java:264)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets 
> (WorkerSinkTask.java:305)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions 
> (WorkerSinkTask.java:435)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.execute 
> (WorkerSinkTask.java:147)
> at org.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java:140)
> at org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:175)
> at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
> at java.util.concurrent.FutureTask.run (FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker 
> (ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run 
> (ThreadPoolExecutor.java:617)
> at java.lang.Thread.run (Thread.java:745)
> {code}
> I believe it should catch it and ignore it as that is what the poll method 
> does when isStopping is true
> {code:java}
>         } catch (WakeupException we) {
>             log.trace("{} consumer woken up", id);
>             if (isStopping())
>                 return;
>             if (shouldPause()) {
>                 pauseAll();
>             } else if (!pausedForRedelivery) {
>                 resumeAll();
>             }
>         }
> {code}
> But unsure, love some insight into this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to