Hi

I'm running Kafka Connect in distributed mode with the confluent HDFS
sink connector.

But the WorkerSinkTask constantly gets interfered with rebalancing
requests from the broker (onPartitionsRevoked) [1] and gets stuck in a
recovery state where the brokers constantly logs "Preparing to
restabilize group ... for generation xx" around every 30 seconds.

I have configured the connector with very high session timeouts and
low max poll records but it doesn't help. The topic have 100
partitions and there are 3 brokers. Kafka connect runs two single core
machines.

request.timeout.ms=310000
heartbeat.interval.ms=60000
session.timeout.ms=300000
max.poll.records=1
tasks.max=64

I'm not sure what else to tweak in order to make the problem go away.

Cheers,
-Kristoffer


[1]

[2016-07-16 12:52:52,668] ERROR Commit of
WorkerSinkTask{id=hdfs-sink-sting-impression-18} offsets threw an
unexpected exception:
(org.apache.kafka.connect.runtime.WorkerSinkTask:180)
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
be completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between
subsequent calls to poll() was longer than the configured
session.timeout.ms, which typically implies that the poll loop is
spending too much time message processing. You can address this either
by increasing the session timeout or by reducing the maximum size of
batches returned in poll() with max.poll.records.
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2016-07-16 12:52:52,676] INFO Revoking previously assigned partitions
[sting_actions_impression-23, sting_actions_impression-21,
sting_actions_impression-22] for group
connect-hdfs-sink-sting-impression
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)
[2016-07-16 12:52:52,679] INFO
WorkerSinkTask{id=hdfs-sink-sting-impression-18} Committing offsets
(org.apache.kafka.connect.runtime.WorkerSinkTask:244)
[2016-07-16 12:52:52,686] ERROR Commit of
WorkerSinkTask{id=hdfs-sink-sting-impression-18} offsets threw an
unexpected exception:
(org.apache.kafka.connect.runtime.WorkerSinkTask:180)
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
be completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between
subsequent calls to poll() was longer than the configured
session.timeout.ms, which typically implies that the poll loop is
spending too much time message processing. You can address this either
by increasing the session timeout or by reducing the maximum size of
batches returned in poll() with max.poll.records.
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:247)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:293)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:54)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:465)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:283)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:212)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Reply via email to