Re: Killing last replica for partition doesn't change ISR/Leadership if replica is running controller
Sure thing - https://issues.apache.org/jira/browse/KAFKA-1452 On May 13, 2014, at 8:33 PM, Jun Rao jun...@gmail.com wrote: Yes, that seems like a real issue. Could you file a jira? Thanks, Jun On Tue, May 13, 2014 at 11:58 AM, Alex Demidko alexan...@metamarkets.comwrote: Hi, Kafka version is 0.8.1.1. We have three machines: A,B,C. Let’s say there is a topic with replication 2 and one of it’s partitions - partition 1 is placed on brokers A and B. If the broker A is already down than for the partition 1 we have: Leader: B, ISR: [B]. If the current controller is node C, than killing broker B will turn partition 1 into state: Leader: -1, ISR: []. But if the current controller is node B, than killing it won’t update leadership/isr for partition 1 even when controller will be restarted on node C, so partition 1 will forever think it’s leader is node B which is dead. It looks that KafkaController.onBrokerFailure handles situation when the broker down is the partition leader - it sets the new leader value to -1. To the contrary, KafkaController.onControllerFailover never removes leader from the partition with all replicas offline - allegedly because partition gets into ReplicaDeletionIneligible state. Is it intended behavior? This behavior affects DefaultEventHandler.getPartition in the null key case - it can’t determine partition 1 as having no leader, and this results into events send failure. What we are trying to achieve - is to be able to write data even if some partitions lost all replicas, which is rare yet still possible scenario. Using null key looked suitable with minor DefaultEventHandler modifications (like getting rid from DefaultEventHandler.sendPartitionPerTopicCache to avoid caching and uneven events distribution) as we neither use logs compaction nor rely on partitioning of the data. We had such behavior with kafka 0.7 - if the node is down, simply produce to a different one. Thanks, Alex
Killing last replica for partition doesn't change ISR/Leadership if replica is running controller
Hi, Kafka version is 0.8.1.1. We have three machines: A,B,C. Let’s say there is a topic with replication 2 and one of it’s partitions - partition 1 is placed on brokers A and B. If the broker A is already down than for the partition 1 we have: Leader: B, ISR: [B]. If the current controller is node C, than killing broker B will turn partition 1 into state: Leader: -1, ISR: []. But if the current controller is node B, than killing it won’t update leadership/isr for partition 1 even when controller will be restarted on node C, so partition 1 will forever think it’s leader is node B which is dead. It looks that KafkaController.onBrokerFailure handles situation when the broker down is the partition leader - it sets the new leader value to -1. To the contrary, KafkaController.onControllerFailover never removes leader from the partition with all replicas offline - allegedly because partition gets into ReplicaDeletionIneligible state. Is it intended behavior? This behavior affects DefaultEventHandler.getPartition in the null key case - it can’t determine partition 1 as having no leader, and this results into events send failure. What we are trying to achieve - is to be able to write data even if some partitions lost all replicas, which is rare yet still possible scenario. Using null key looked suitable with minor DefaultEventHandler modifications (like getting rid from DefaultEventHandler.sendPartitionPerTopicCache to avoid caching and uneven events distribution) as we neither use logs compaction nor rely on partitioning of the data. We had such behavior with kafka 0.7 - if the node is down, simply produce to a different one. Thanks, Alex
KafkaException: This operation cannot be completed on a complete request
Hi, I’m performing a producing load test on two node kafka cluster built from the last 0.8.1 branch sources. I have topic loadtest with replication factor 2 and 256 partitions. Initially both brokers are in ISR and leadership is balanced. When in the middle of the load test one broker was restarted (wasn’t able to go with controlled shutdown in specified time and was killed), I started receiving following errors which as far as I understand coming from replication: On restarted broker 2014-04-18 16:15:02,214 ERROR [ReplicaFetcherThread-5-2] kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-5-2], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 52890; ClientId: ReplicaFetcherThread-5-2; ReplicaId: 1; MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [loadtest2,71] - PartitionFetchInfo(0,104857600),[loadtest,85] - PartitionFetchInfo(113676000,104857600),[loadtest,189] - PartitionFetchInfo(112277000,104857600),[loadtest,21] - PartitionFetchInfo(0,104857600),[loadtest,205] - PartitionFetchInfo(112986000,104857600),[loadtest,141] - PartitionFetchInfo(0,104857600),[loadtest,253] - PartitionFetchInfo(0,104857600),[loadtest,77] - PartitionFetchInfo(0,104857600),[loadtest,61] - PartitionFetchInfo(11249,104857600),[loadtest,229] - PartitionFetchInfo(112805000,104857600),[loadtest,133] - PartitionFetchInfo(0,104857600),[loadtest2,15] - PartitionFetchInfo(0,104857600),[loadtest2,63] - PartitionFetchInfo(0,104857600),[loadtest,181] - PartitionFetchInfo(0,104857600),[loadtest,5] - PartitionFetchInfo(11253,104857600),[loadtest,29] - PartitionFetchInfo(0,104857600),[loadtest,45] - PartitionFetchInfo(113113000,104857600),[loadtest2,39] - PartitionFetchInfo(0,104857600),[loadtest,37] - PartitionFetchInfo(112145000,104857600),[loadtest,13] - PartitionFetchInfo(112915000,104857600),[loadtest,237] - PartitionFetchInfo(112896000,104857600),[loadtest,149] - PartitionFetchInfo(113232000,104857600),[loadtest,117] - PartitionFetchInfo(11310,104857600),[loadtest,157] - PartitionFetchInfo(0,104857600),[loadtest,165] - PartitionFetchInfo(0,104857600),[loadtest,101] - PartitionFetchInfo(0,104857600),[loadtest,93] - PartitionFetchInfo(113025000,104857600),[loadtest,125] - PartitionFetchInfo(112896000,104857600),[loadtest,197] - PartitionFetchInfo(0,104857600),[loadtest,109] - PartitionFetchInfo(0,104857600),[loadtest,245] - PartitionFetchInfo(0,104857600),[loadtest,213] - PartitionFetchInfo(0,104857600),[loadtest,53] - PartitionFetchInfo(0,104857600),[loadtest,173] - PartitionFetchInfo(112757000,104857600),[loadtest,69] - PartitionFetchInfo(112378000,104857600),[loadtest,221] - PartitionFetchInfo(0,104857600) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:376) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) 2014-04-18 16:15:02,215 WARN [ReplicaFetcherThread-1-2] kafka.consumer.SimpleConsumer - Reconnect due to socket error: null On current leader 2014-04-18 16:15:10,235 ERROR [kafka-processor-9092-1] kafka.network.Processor - Closing socket for /10.41.133.59 because of error kafka.common.KafkaException: This operation cannot be completed on a complete request. at kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34) at
Re: KafkaException: This operation cannot be completed on a complete request
These on alive node: 2014-04-17 21:36:29,276 ERROR [ZkClient-EventThread-15] state.change.logger - Controller 2 epoch 8 encountered error while electing leader for partition [loadtest,143] due to: Preferred replica 1 for partition [loadtest,143] is either not alive or not in the isr. Current leader and ISR: [{leader:2,leader_epoch:11,isr:[2]}]. 2014-04-17 21:36:29,276 ERROR [ZkClient-EventThread-15] state.change.logger - Controller 2 epoch 8 initiated state change for partition [loadtest,143] from OnlinePartition to OnlinePartition failed 2014-04-18 00:38:50,014 ERROR [Controller-2-to-broker-1-send-thread] kafka.controller.RequestSendThread - [Controller-2-to-broker-1-send-thread], Controller 2's connection to broker id:1,host:RESTARTED_BROKER_IP,port:9092 was unsuccessful 2014-04-18 00:38:50,314 ERROR [Controller-2-to-broker-1-send-thread] kafka.controller.RequestSendThread - [Controller-2-to-broker-1-send-thread], Controller 2 epoch 8 failed to send UpdateMetadata request with correlation id 3854 to broker id:1,host:RESTARTED_BROKER_IP,port:9092. Reconnecting to broker. On Apr 18, 2014, at 10:41 AM, Jun Rao jun...@gmail.com wrote: Any errors from the controller/state-change log? Thanks, Jun On Fri, Apr 18, 2014 at 9:57 AM, Alex Demidko alexan...@metamarkets.comwrote: Hi, I’m performing a producing load test on two node kafka cluster built from the last 0.8.1 branch sources. I have topic loadtest with replication factor 2 and 256 partitions. Initially both brokers are in ISR and leadership is balanced. When in the middle of the load test one broker was restarted (wasn’t able to go with controlled shutdown in specified time and was killed), I started receiving following errors which as far as I understand coming from replication: On restarted broker 2014-04-18 16:15:02,214 ERROR [ReplicaFetcherThread-5-2] kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-5-2], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 52890; ClientId: ReplicaFetcherThread-5-2; ReplicaId: 1; MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [loadtest2,71] - PartitionFetchInfo(0,104857600),[loadtest,85] - PartitionFetchInfo(113676000,104857600),[loadtest,189] - PartitionFetchInfo(112277000,104857600),[loadtest,21] - PartitionFetchInfo(0,104857600),[loadtest,205] - PartitionFetchInfo(112986000,104857600),[loadtest,141] - PartitionFetchInfo(0,104857600),[loadtest,253] - PartitionFetchInfo(0,104857600),[loadtest,77] - PartitionFetchInfo(0,104857600),[loadtest,61] - PartitionFetchInfo(11249,104857600),[loadtest,229] - PartitionFetchInfo(112805000,104857600),[loadtest,133] - PartitionFetchInfo(0,104857600),[loadtest2,15] - PartitionFetchInfo(0,104857600),[loadtest2,63] - PartitionFetchInfo(0,104857600),[loadtest,181] - PartitionFetchInfo(0,104857600),[loadtest,5] - PartitionFetchInfo(11253,104857600),[loadtest,29] - PartitionFetchInfo(0,104857600),[loadtest,45] - PartitionFetchInfo(113113000,104857600),[loadtest2,39] - PartitionFetchInfo(0,104857600),[loadtest,37] - PartitionFetchInfo(112145000,104857600),[loadtest,13] - PartitionFetchInfo(112915000,104857600),[loadtest,237] - PartitionFetchInfo(112896000,104857600),[loadtest,149] - PartitionFetchInfo(113232000,104857600),[loadtest,117] - PartitionFetchInfo(11310,104857600),[loadtest,157] - PartitionFetchInfo(0,104857600),[loadtest,165] - PartitionFetchInfo(0,104857600),[loadtest,101] - PartitionFetchInfo(0,104857600),[loadtest,93] - PartitionFetchInfo(113025000,104857600),[loadtest,125] - PartitionFetchInfo(112896000,104857600),[loadtest,197] - PartitionFetchInfo(0,104857600),[loadtest,109] - PartitionFetchInfo(0,104857600),[loadtest,245] - PartitionFetchInfo(0,104857600),[loadtest,213] - PartitionFetchInfo(0,104857600),[loadtest,53] - PartitionFetchInfo(0,104857600),[loadtest,173] - PartitionFetchInfo(112757000,104857600),[loadtest,69] - PartitionFetchInfo(112378000,104857600),[loadtest,221] - PartitionFetchInfo(0,104857600) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:376) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109
Re: KafkaException: This operation cannot be completed on a complete request
] kafka.server.KafkaHealthcheck - Subscribing to /brokers/topics path to watch for new topics 2014-04-18 20:45:07,998 WARN [kafka-request-handler-20] state.change.logger - Broker 2 received invalid LeaderAndIsr request with correlation id 11 from controller 1 epoch 13 with an older leader epoch 22 for partition [loadtest,66], current leader epoch is 22 (...) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:376) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) Even if I restart RestartedNode it doesn't help, only the LeaderNode restart helps which I'd really like to avoid because leader is the only in-sync replica (and also there might be multiple leaders for different partitions). Another question why there is an OOME on a RestartedNode. On Fri, Apr 18, 2014 at 1:30 PM, Guozhang Wang wangg...@gmail.com wrote: Hello Alex, I think this is a bug on the FetchResponseSend class. Just to confirm, before the kafka.common.KafkaException: This operation cannot be completed on a complete request. do you see other warn/error logs on the current leader? Guozhang On Fri, Apr 18, 2014 at 11:57 AM, Alexander Demidko alexan...@metamarkets.com wrote: Have tried to reproduce this error, and it occurs pretty consistently when node being forcefully shutdown w/o graceful termination. When graceful shutdown was successful no errors occur in a log when the instance was rebooted starts. On Fri, Apr 18, 2014 at 11:17 AM, Alex Demidko alexan...@metamarkets.com wrote: These on alive node: 2014-04-17 21:36:29,276 ERROR [ZkClient-EventThread-15] state.change.logger - Controller 2 epoch 8 encountered error while electing leader for partition [loadtest,143] due to: Preferred replica 1 for partition [loadtest,143] is either not alive or not in the isr. Current leader and ISR: [{leader:2,leader_epoch:11,isr:[2]}]. 2014-04-17 21:36:29,276 ERROR [ZkClient-EventThread-15] state.change.logger - Controller 2 epoch 8 initiated state change for partition [loadtest,143] from OnlinePartition to OnlinePartition failed 2014-04-18 00:38:50,014 ERROR [Controller-2-to-broker-1-send-thread] kafka.controller.RequestSendThread - [Controller-2-to-broker-1-send-thread], Controller 2's connection to broker id:1,host:RESTARTED_BROKER_IP,port:9092 was unsuccessful 2014-04-18 00:38:50,314 ERROR [Controller-2-to-broker-1-send-thread] kafka.controller.RequestSendThread - [Controller-2-to-broker-1-send-thread], Controller 2 epoch 8 failed to send UpdateMetadata request with correlation id 3854 to broker id:1,host:RESTARTED_BROKER_IP,port:9092. Reconnecting to broker. On Apr 18, 2014, at 10:41 AM, Jun Rao jun...@gmail.com wrote: Any errors from the controller/state-change log? Thanks, Jun On Fri, Apr 18, 2014 at 9:57 AM, Alex Demidko alexan...@metamarkets.com wrote: Hi, I'm performing a producing load test on two node kafka cluster built from the last 0.8.1 branch sources. I have topic loadtest with replication factor 2 and 256 partitions. Initially both brokers are in ISR and leadership is balanced. When in the middle of the load test one broker was restarted (wasn't able to go with controlled shutdown in specified time and was killed), I started
Re: KafkaException: This operation cannot be completed on a complete request
Tried to reproduce this one more time. I was using kill -9 shutdown to test reiliability, with graceful termination I haven't seen this problem to arise. Leader node started complaining that ReplicaFetcherThread can't connect to other node and that Producer can't send request to terminated node, but I guess this is pretty much expectable. In overall, the pattern to more or less stable reproduce is the following: Node 1(RestartedNode) and 2(LeaderNode) both in ISR and with balanced leadership Kill -9 node 1 Start node 1 Node 1 recovers segments and starts broker with a little suspicious message: kafka.utils.ZkUtils$ - conflict in /controller data: {version:1,brokerid:1,timestamp:1397864759089} stored data: {version:1,brokerid:2,timestamp:1397863473709} Node 2 expands ISR for all partitions from 2 to 2,1 Node 2 starts emit Closing socket for /Node1 because of error KafkaException: This operation cannot be completed on a complete request, and Node 1 - EOFException: Received -1 when reading from channel, socket has likely been closed There is no other exceptions in the log. On Fri, Apr 18, 2014 at 3:21 PM, Guozhang Wang wangg...@gmail.com wrote: When you are shutting down the restart node, did you see any warn/error on the leader logs? Guozhang On Fri, Apr 18, 2014 at 1:58 PM, Alex Demidko alexan...@metamarkets.com wrote: Last time saw this exception when tried to use rebalance leadership with kafka-preferred-replica-election.sh. That's what got in logs: LeaderNode: just kafka.common.KafkaException: This operation cannot be completed on a complete request without any other exceptions. RestartedNode: 2014-04-18 20:43:39,281 INFO [ReplicaFetcherThread-2-1] kafka.log.Log - Rolled new log segment for 'loadtest-170' in 1 ms. java.lang.OutOfMemoryError: Java heap space Dumping heap to java_pid28305.hprof ... Heap dump file created [13126900997 bytes in 11.728 secs] 2014-04-18 20:44:02,299 INFO [main-SendThread] org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 28101ms for sessionid 0x7455b 0b68302016, closing socket connection and attempting reconnect 2014-04-18 20:44:02,328 ERROR [ReplicaFetcherThread-2-1] kafka.network.BoundedByteBufferReceive - OOME with size 2022780218 java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) (...) 2014-04-18 20:44:41,212 ERROR [ZkClient-EventThread] org.I0Itec.zkclient.ZkEventThread - Error handling event ZkEvent[New session event sent to kafka.controller.KafkaController$SessionExpirationListener@7ea9f7b8] java.lang.IllegalStateException: Kafka scheduler has not been started at kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:116) at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86) at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:349) at kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:339