The workers seems happier when reducing number of partitions for each
worker. And when adding more topics they eventually die into a
rebalancing state.

May I ask what's a good configuration? At the moment we have...

- 2 docker instances with 4 cores, 4 GB heap
- each instance reads 4000 kB/s and writes 300 kB/s on the network
- 3 topics with 100 partitions
- Each topic has around 10s of millions of messages (so the connector
needs to catch up when first started)
- tasks.max=8
- rotate.interval.ms=600000
- flush.size=4096
- request.timeout.ms=610000
- heartbeat.interval.ms=120000
- session.timeout.ms=300000
- max.poll.records=10000

Please let me know if something stands out as a
bad/imbalanced/under-provisioned.

Cheers,
-Kristoffer

On Tue, Jul 26, 2016 at 12:38 PM, Kristoffer Sjögren <sto...@gmail.com> wrote:
> We found very high cpu usage which might cause the problem. Seems to be
> spending a lot of cycles querying and parsing hdfs paths?
>
> Den 24 jul 2016 02:40 skrev "Ewen Cheslack-Postava" <e...@confluent.io>:
>
> That definitely sounds unusual -- rebalancing normally only happens either
> when a) there are new workers or b) there are connectivity issues/failures.
> Is it possible there's something causing large latencies?
>
> -Ewen
>
> On Sat, Jul 16, 2016 at 6:09 AM, Kristoffer Sjögren <sto...@gmail.com>
> wrote:
>
>> Hi
>>
>> I'm running Kafka Connect in distributed mode with the confluent HDFS
>> sink connector.
>>
>> But the WorkerSinkTask constantly gets interfered with rebalancing
>> requests from the broker (onPartitionsRevoked) [1] and gets stuck in a
>> recovery state where the brokers constantly logs "Preparing to
>> restabilize group ... for generation xx" around every 30 seconds.
>>
>> I have configured the connector with very high session timeouts and
>> low max poll records but it doesn't help. The topic have 100
>> partitions and there are 3 brokers. Kafka connect runs two single core
>> machines.
>>
>> request.timeout.ms=310000
>> heartbeat.interval.ms=60000
>> session.timeout.ms=300000
>> max.poll.records=1
>> tasks.max=64
>>
>> I'm not sure what else to tweak in order to make the problem go away.
>>
>> Cheers,
>> -Kristoffer
>>
>>
>> [1]
>>
>> [2016-07-16 12:52:52,668] ERROR Commit of
>> WorkerSinkTask{id=hdfs-sink-sting-impression-18} offsets threw an
>> unexpected exception:
>> (org.apache.kafka.connect.runtime.WorkerSinkTask:180)
>> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
>> be completed since the group has already rebalanced and assigned the
>> partitions to another member. This means that the time between
>> subsequent calls to poll() was longer than the configured
>> session.timeout.ms, which typically implies that the poll loop is
>> spending too much time message processing. You can address this either
>> by increasing the session timeout or by reducing the maximum size of
>> batches returned in poll() with max.poll.records.
>> at
>>
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
>> at
>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998)
>> at
>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>> at
>>
>> org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305)
>> at
>>
>> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222)
>> at
>>
>> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
>> at
>>
>> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
>> at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
>> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> [2016-07-16 12:52:52,676] INFO Revoking previously assigned partitions
>> [sting_actions_impression-23, sting_actions_impression-21,
>> sting_actions_impression-22] for group
>> connect-hdfs-sink-sting-impression
>> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)
>> [2016-07-16 12:52:52,679] INFO
>> WorkerSinkTask{id=hdfs-sink-sting-impression-18} Committing offsets
>> (org.apache.kafka.connect.runtime.WorkerSinkTask:244)
>> [2016-07-16 12:52:52,686] ERROR Commit of
>> WorkerSinkTask{id=hdfs-sink-sting-impression-18} offsets threw an
>> unexpected exception:
>> (org.apache.kafka.connect.runtime.WorkerSinkTask:180)
>> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
>> be completed since the group has already rebalanced and assigned the
>> partitions to another member. This means that the time between
>> subsequent calls to poll() was longer than the configured
>> session.timeout.ms, which typically implies that the poll loop is
>> spending too much time message processing. You can address this either
>> by increasing the session timeout or by reducing the maximum size of
>> batches returned in poll() with max.poll.records.
>> at
>>
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404)
>> at
>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058)
>> at
>>
>> org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:247)
>> at
>>
>> org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:293)
>> at
>>
>> org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421)
>> at
>>
>> org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:54)
>> at
>>
>> org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:465)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:283)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:212)
>> at
>>
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
>> at
>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
>> at
>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>> at
>>
>> org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305)
>> at
>>
>> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222)
>> at
>>
>> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
>> at
>>
>> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
>> at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
>> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>
>
>
> --
> Thanks,
> Ewen

Reply via email to