We're also seeing lots of failures as the TopicPartitionWriter tries to close WAL files in HDFS [1].
[1] http://pastebin.com/6ipUndZv On Wed, Jul 27, 2016 at 5:01 PM, Kristoffer Sjögren <sto...@gmail.com> wrote: > The workers seems happier when reducing number of partitions for each > worker. And when adding more topics they eventually die into a > rebalancing state. > > May I ask what's a good configuration? At the moment we have... > > - 2 docker instances with 4 cores, 4 GB heap > - each instance reads 4000 kB/s and writes 300 kB/s on the network > - 3 topics with 100 partitions > - Each topic has around 10s of millions of messages (so the connector > needs to catch up when first started) > - tasks.max=8 > - rotate.interval.ms=600000 > - flush.size=4096 > - request.timeout.ms=610000 > - heartbeat.interval.ms=120000 > - session.timeout.ms=300000 > - max.poll.records=10000 > > Please let me know if something stands out as a > bad/imbalanced/under-provisioned. > > Cheers, > -Kristoffer > > On Tue, Jul 26, 2016 at 12:38 PM, Kristoffer Sjögren <sto...@gmail.com> wrote: >> We found very high cpu usage which might cause the problem. Seems to be >> spending a lot of cycles querying and parsing hdfs paths? >> >> Den 24 jul 2016 02:40 skrev "Ewen Cheslack-Postava" <e...@confluent.io>: >> >> That definitely sounds unusual -- rebalancing normally only happens either >> when a) there are new workers or b) there are connectivity issues/failures. >> Is it possible there's something causing large latencies? >> >> -Ewen >> >> On Sat, Jul 16, 2016 at 6:09 AM, Kristoffer Sjögren <sto...@gmail.com> >> wrote: >> >>> Hi >>> >>> I'm running Kafka Connect in distributed mode with the confluent HDFS >>> sink connector. >>> >>> But the WorkerSinkTask constantly gets interfered with rebalancing >>> requests from the broker (onPartitionsRevoked) [1] and gets stuck in a >>> recovery state where the brokers constantly logs "Preparing to >>> restabilize group ... for generation xx" around every 30 seconds. >>> >>> I have configured the connector with very high session timeouts and >>> low max poll records but it doesn't help. The topic have 100 >>> partitions and there are 3 brokers. Kafka connect runs two single core >>> machines. >>> >>> request.timeout.ms=310000 >>> heartbeat.interval.ms=60000 >>> session.timeout.ms=300000 >>> max.poll.records=1 >>> tasks.max=64 >>> >>> I'm not sure what else to tweak in order to make the problem go away. >>> >>> Cheers, >>> -Kristoffer >>> >>> >>> [1] >>> >>> [2016-07-16 12:52:52,668] ERROR Commit of >>> WorkerSinkTask{id=hdfs-sink-sting-impression-18} offsets threw an >>> unexpected exception: >>> (org.apache.kafka.connect.runtime.WorkerSinkTask:180) >>> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot >>> be completed since the group has already rebalanced and assigned the >>> partitions to another member. This means that the time between >>> subsequent calls to poll() was longer than the configured >>> session.timeout.ms, which typically implies that the poll loop is >>> spending too much time message processing. You can address this either >>> by increasing the session timeout or by reducing the maximum size of >>> batches returned in poll() with max.poll.records. >>> at >>> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) >>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201) >>> at >>> >>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998) >>> at >>> >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) >>> at >>> >>> org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305) >>> at >>> >>> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222) >>> at >>> >>> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) >>> at >>> >>> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) >>> at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) >>> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) >>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at >>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> [2016-07-16 12:52:52,676] INFO Revoking previously assigned partitions >>> [sting_actions_impression-23, sting_actions_impression-21, >>> sting_actions_impression-22] for group >>> connect-hdfs-sink-sting-impression >>> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280) >>> [2016-07-16 12:52:52,679] INFO >>> WorkerSinkTask{id=hdfs-sink-sting-impression-18} Committing offsets >>> (org.apache.kafka.connect.runtime.WorkerSinkTask:244) >>> [2016-07-16 12:52:52,686] ERROR Commit of >>> WorkerSinkTask{id=hdfs-sink-sting-impression-18} offsets threw an >>> unexpected exception: >>> (org.apache.kafka.connect.runtime.WorkerSinkTask:180) >>> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot >>> be completed since the group has already rebalanced and assigned the >>> partitions to another member. This means that the time between >>> subsequent calls to poll() was longer than the configured >>> session.timeout.ms, which typically implies that the poll loop is >>> spending too much time message processing. You can address this either >>> by increasing the session timeout or by reducing the maximum size of >>> batches returned in poll() with max.poll.records. >>> at >>> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) >>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404) >>> at >>> >>> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058) >>> at >>> >>> org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:247) >>> at >>> >>> org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:293) >>> at >>> >>> org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421) >>> at >>> >>> org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:54) >>> at >>> >>> org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:465) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:283) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:212) >>> at >>> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) >>> at >>> >>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) >>> at >>> >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) >>> at >>> >>> org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305) >>> at >>> >>> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222) >>> at >>> >>> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) >>> at >>> >>> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) >>> at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) >>> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) >>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>> at >>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >> >> >> >> -- >> Thanks, >> Ewen