The workers seems happier when reducing number of partitions for each worker. And when adding more topics they eventually die into a rebalancing state.
May I ask what's a good configuration? At the moment we have... - 2 docker instances with 4 cores, 4 GB heap - each instance reads 4000 kB/s and writes 300 kB/s on the network - 3 topics with 100 partitions - Each topic has around 10s of millions of messages (so the connector needs to catch up when first started) - tasks.max=8 - rotate.interval.ms=600000 - flush.size=4096 - request.timeout.ms=610000 - heartbeat.interval.ms=120000 - session.timeout.ms=300000 - max.poll.records=10000 Please let me know if something stands out as a bad/imbalanced/under-provisioned. Cheers, -Kristoffer On Tue, Jul 26, 2016 at 12:38 PM, Kristoffer Sjögren <sto...@gmail.com> wrote: > We found very high cpu usage which might cause the problem. Seems to be > spending a lot of cycles querying and parsing hdfs paths? > > Den 24 jul 2016 02:40 skrev "Ewen Cheslack-Postava" <e...@confluent.io>: > > That definitely sounds unusual -- rebalancing normally only happens either > when a) there are new workers or b) there are connectivity issues/failures. > Is it possible there's something causing large latencies? > > -Ewen > > On Sat, Jul 16, 2016 at 6:09 AM, Kristoffer Sjögren <sto...@gmail.com> > wrote: > >> Hi >> >> I'm running Kafka Connect in distributed mode with the confluent HDFS >> sink connector. >> >> But the WorkerSinkTask constantly gets interfered with rebalancing >> requests from the broker (onPartitionsRevoked) [1] and gets stuck in a >> recovery state where the brokers constantly logs "Preparing to >> restabilize group ... for generation xx" around every 30 seconds. >> >> I have configured the connector with very high session timeouts and >> low max poll records but it doesn't help. The topic have 100 >> partitions and there are 3 brokers. Kafka connect runs two single core >> machines. >> >> request.timeout.ms=310000 >> heartbeat.interval.ms=60000 >> session.timeout.ms=300000 >> max.poll.records=1 >> tasks.max=64 >> >> I'm not sure what else to tweak in order to make the problem go away. >> >> Cheers, >> -Kristoffer >> >> >> [1] >> >> [2016-07-16 12:52:52,668] ERROR Commit of >> WorkerSinkTask{id=hdfs-sink-sting-impression-18} offsets threw an >> unexpected exception: >> (org.apache.kafka.connect.runtime.WorkerSinkTask:180) >> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot >> be completed since the group has already rebalanced and assigned the >> partitions to another member. This means that the time between >> subsequent calls to poll() was longer than the configured >> session.timeout.ms, which typically implies that the poll loop is >> spending too much time message processing. You can address this either >> by increasing the session timeout or by reducing the maximum size of >> batches returned in poll() with max.poll.records. >> at >> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578) >> at >> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519) >> at >> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) >> at >> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) >> at >> >> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) >> at >> >> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) >> at >> >> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) >> at >> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) >> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) >> at >> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) >> at >> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) >> at >> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201) >> at >> >> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998) >> at >> >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) >> at >> >> org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305) >> at >> >> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222) >> at >> >> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) >> at >> >> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) >> at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) >> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) >> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> [2016-07-16 12:52:52,676] INFO Revoking previously assigned partitions >> [sting_actions_impression-23, sting_actions_impression-21, >> sting_actions_impression-22] for group >> connect-hdfs-sink-sting-impression >> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280) >> [2016-07-16 12:52:52,679] INFO >> WorkerSinkTask{id=hdfs-sink-sting-impression-18} Committing offsets >> (org.apache.kafka.connect.runtime.WorkerSinkTask:244) >> [2016-07-16 12:52:52,686] ERROR Commit of >> WorkerSinkTask{id=hdfs-sink-sting-impression-18} offsets threw an >> unexpected exception: >> (org.apache.kafka.connect.runtime.WorkerSinkTask:180) >> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot >> be completed since the group has already rebalanced and assigned the >> partitions to another member. This means that the time between >> subsequent calls to poll() was longer than the configured >> session.timeout.ms, which typically implies that the poll loop is >> spending too much time message processing. You can address this either >> by increasing the session timeout or by reducing the maximum size of >> batches returned in poll() with max.poll.records. >> at >> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578) >> at >> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519) >> at >> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) >> at >> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) >> at >> >> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) >> at >> >> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) >> at >> >> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) >> at >> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) >> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) >> at >> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) >> at >> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) >> at >> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) >> at >> >> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) >> at >> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404) >> at >> >> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058) >> at >> >> org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:247) >> at >> >> org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:293) >> at >> >> org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421) >> at >> >> org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:54) >> at >> >> org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:465) >> at >> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:283) >> at >> >> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:212) >> at >> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) >> at >> >> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) >> at >> >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) >> at >> >> org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305) >> at >> >> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222) >> at >> >> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) >> at >> >> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) >> at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) >> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) >> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> > > > > -- > Thanks, > Ewen