[ https://issues.apache.org/jira/browse/KAFKA-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Randall Hauch resolved KAFKA-4632. ---------------------------------- Resolution: Fixed Fix Version/s: 0.10.0.1 0.10.1.0 I'm going to close this as fixed in 0.10.0.1. [~ScottReynolds], if you disagree, please feel free to reopen with more detail. > Kafka Connect WorkerSinkTask.closePartitions doesn't handle WakeupException > --------------------------------------------------------------------------- > > Key: KAFKA-4632 > URL: https://issues.apache.org/jira/browse/KAFKA-4632 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0 > Reporter: Scott Reynolds > Priority: Major > Fix For: 0.10.1.0, 0.10.0.1 > > > WorkerSinkTask's closePartitions method isn't handling WakeupException that > can be thrown from commitSync. > {code} > org.apache.kafka.common.errors.WakeupException > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup > (ConsumerNetworkClient.java:404) > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll > (ConsumerNetworkClient.java:245) > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll > (ConsumerNetworkClient.java:180) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync > (ConsumerCoordinator.java:499) > at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync > (KafkaConsumer.java:1104) > at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync > (WorkerSinkTask.java:245) > at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit > (WorkerSinkTask.java:264) > at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets > (WorkerSinkTask.java:305) > at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions > (WorkerSinkTask.java:435) > at org.apache.kafka.connect.runtime.WorkerSinkTask.execute > (WorkerSinkTask.java:147) > at org.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java:140) > at org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:175) > at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) > at java.util.concurrent.FutureTask.run (FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker > (ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run > (ThreadPoolExecutor.java:617) > at java.lang.Thread.run (Thread.java:745) > {code} > I believe it should catch it and ignore it as that is what the poll method > does when isStopping is true > {code:java} > } catch (WakeupException we) { > log.trace("{} consumer woken up", id); > if (isStopping()) > return; > if (shouldPause()) { > pauseAll(); > } else if (!pausedForRedelivery) { > resumeAll(); > } > } > {code} > But unsure, love some insight into this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)