Re: Killing last replica for partition doesn't change ISR/Leadership if replica is running controller

2014-05-14 Thread Alex Demidko
Sure thing - https://issues.apache.org/jira/browse/KAFKA-1452


On May 13, 2014, at 8:33 PM, Jun Rao jun...@gmail.com wrote:

 Yes, that seems like a real issue. Could you file a jira?
 
 Thanks,
 
 Jun
 
 
 On Tue, May 13, 2014 at 11:58 AM, Alex Demidko 
 alexan...@metamarkets.comwrote:
 
 Hi,
 
 Kafka version is 0.8.1.1. We have three machines: A,B,C. Let’s say there
 is a topic with replication 2 and one of it’s partitions - partition 1 is
 placed on brokers A and B. If the broker A is already down than for the
 partition 1 we have: Leader: B, ISR: [B]. If the current controller is node
 C, than killing broker B will turn partition 1 into state: Leader:  -1,
 ISR: []. But if the current controller is node B, than killing it won’t
 update leadership/isr for partition 1 even when controller will be
 restarted on node C, so partition 1 will forever think it’s leader is node
 B which is dead.
 
 It looks that KafkaController.onBrokerFailure handles situation when the
 broker down is the partition leader - it sets the new leader value to -1.
 To the contrary, KafkaController.onControllerFailover never removes leader
 from the partition with all replicas offline - allegedly because partition
 gets into ReplicaDeletionIneligible state. Is it intended behavior?
 
 This behavior affects DefaultEventHandler.getPartition in the null key
 case - it can’t determine partition 1 as having no leader, and this results
 into events send failure.
 
 
 What we are trying to achieve - is to be able to write data even if some
 partitions lost all replicas, which is rare yet still possible scenario.
 Using null key looked suitable with minor DefaultEventHandler modifications
 (like getting rid from DefaultEventHandler.sendPartitionPerTopicCache to
 avoid caching and uneven events distribution) as we neither use logs
 compaction nor rely on partitioning of the data. We had such behavior with
 kafka 0.7 - if the node is down, simply produce to a different one.
 
 
 Thanks,
 Alex
 
 



Killing last replica for partition doesn't change ISR/Leadership if replica is running controller

2014-05-13 Thread Alex Demidko
Hi,

Kafka version is 0.8.1.1. We have three machines: A,B,C. Let’s say there is a 
topic with replication 2 and one of it’s partitions - partition 1 is placed on 
brokers A and B. If the broker A is already down than for the partition 1 we 
have: Leader: B, ISR: [B]. If the current controller is node C, than killing 
broker B will turn partition 1 into state: Leader:  -1, ISR: []. But if the 
current controller is node B, than killing it won’t update leadership/isr for 
partition 1 even when controller will be restarted on node C, so partition 1 
will forever think it’s leader is node B which is dead.

It looks that KafkaController.onBrokerFailure handles situation when the broker 
down is the partition leader - it sets the new leader value to -1. To the 
contrary, KafkaController.onControllerFailover never removes leader from the 
partition with all replicas offline - allegedly because partition gets into 
ReplicaDeletionIneligible state. Is it intended behavior?

This behavior affects DefaultEventHandler.getPartition in the null key case - 
it can’t determine partition 1 as having no leader, and this results into 
events send failure.


What we are trying to achieve - is to be able to write data even if some 
partitions lost all replicas, which is rare yet still possible scenario. Using 
null key looked suitable with minor DefaultEventHandler modifications (like 
getting rid from DefaultEventHandler.sendPartitionPerTopicCache to avoid 
caching and uneven events distribution) as we neither use logs compaction nor 
rely on partitioning of the data. We had such behavior with kafka 0.7 - if the 
node is down, simply produce to a different one.


Thanks, 
Alex



KafkaException: This operation cannot be completed on a complete request

2014-04-18 Thread Alex Demidko
Hi,

I’m performing a producing load test on two node kafka cluster built from the 
last 0.8.1 branch sources. I have topic loadtest with replication factor 2 and 
256 partitions. Initially both brokers are in ISR and leadership is balanced. 
When in the middle of the load test one broker was restarted (wasn’t able to go 
with controlled shutdown in specified time and was killed), I started receiving 
following errors which as far as I understand coming from replication:


On restarted broker

2014-04-18 16:15:02,214 ERROR [ReplicaFetcherThread-5-2] 
kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-5-2], Error in fetch 
Name: FetchRequest; Version: 0; CorrelationId: 52890; ClientId: 
ReplicaFetcherThread-5-2; ReplicaId: 1; MaxWait: 1000 ms; MinBytes: 1 bytes; 
RequestInfo: [loadtest2,71] - PartitionFetchInfo(0,104857600),[loadtest,85] - 
PartitionFetchInfo(113676000,104857600),[loadtest,189] - 
PartitionFetchInfo(112277000,104857600),[loadtest,21] - 
PartitionFetchInfo(0,104857600),[loadtest,205] - 
PartitionFetchInfo(112986000,104857600),[loadtest,141] - 
PartitionFetchInfo(0,104857600),[loadtest,253] - 
PartitionFetchInfo(0,104857600),[loadtest,77] - 
PartitionFetchInfo(0,104857600),[loadtest,61] - 
PartitionFetchInfo(11249,104857600),[loadtest,229] - 
PartitionFetchInfo(112805000,104857600),[loadtest,133] - 
PartitionFetchInfo(0,104857600),[loadtest2,15] - 
PartitionFetchInfo(0,104857600),[loadtest2,63] - 
PartitionFetchInfo(0,104857600),[loadtest,181] - 
PartitionFetchInfo(0,104857600),[loadtest,5] - 
PartitionFetchInfo(11253,104857600),[loadtest,29] - 
PartitionFetchInfo(0,104857600),[loadtest,45] - 
PartitionFetchInfo(113113000,104857600),[loadtest2,39] - 
PartitionFetchInfo(0,104857600),[loadtest,37] - 
PartitionFetchInfo(112145000,104857600),[loadtest,13] - 
PartitionFetchInfo(112915000,104857600),[loadtest,237] - 
PartitionFetchInfo(112896000,104857600),[loadtest,149] - 
PartitionFetchInfo(113232000,104857600),[loadtest,117] - 
PartitionFetchInfo(11310,104857600),[loadtest,157] - 
PartitionFetchInfo(0,104857600),[loadtest,165] - 
PartitionFetchInfo(0,104857600),[loadtest,101] - 
PartitionFetchInfo(0,104857600),[loadtest,93] - 
PartitionFetchInfo(113025000,104857600),[loadtest,125] - 
PartitionFetchInfo(112896000,104857600),[loadtest,197] - 
PartitionFetchInfo(0,104857600),[loadtest,109] - 
PartitionFetchInfo(0,104857600),[loadtest,245] - 
PartitionFetchInfo(0,104857600),[loadtest,213] - 
PartitionFetchInfo(0,104857600),[loadtest,53] - 
PartitionFetchInfo(0,104857600),[loadtest,173] - 
PartitionFetchInfo(112757000,104857600),[loadtest,69] - 
PartitionFetchInfo(112378000,104857600),[loadtest,221] - 
PartitionFetchInfo(0,104857600)
java.io.EOFException: Received -1 when reading from channel, socket has likely 
been closed.
at kafka.utils.Utils$.read(Utils.scala:376)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at 
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
2014-04-18 16:15:02,215 WARN  [ReplicaFetcherThread-1-2] 
kafka.consumer.SimpleConsumer - Reconnect due to socket error: null 


On current leader

2014-04-18 16:15:10,235 ERROR [kafka-processor-9092-1] kafka.network.Processor 
- Closing socket for /10.41.133.59 because of error
kafka.common.KafkaException: This operation cannot be completed on a complete 
request.
at 
kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)
at 

Re: KafkaException: This operation cannot be completed on a complete request

2014-04-18 Thread Alex Demidko
These on alive node:

2014-04-17 21:36:29,276 ERROR [ZkClient-EventThread-15] state.change.logger - 
Controller 2 epoch 8 encountered error while electing leader for partition 
[loadtest,143] due to: Preferred replica 1 for partition [loadtest,143] is 
either not alive or not in the isr. Current leader and ISR: 
[{leader:2,leader_epoch:11,isr:[2]}].
2014-04-17 21:36:29,276 ERROR [ZkClient-EventThread-15] state.change.logger - 
Controller 2 epoch 8 initiated state change for partition [loadtest,143] from 
OnlinePartition to OnlinePartition failed

2014-04-18 00:38:50,014 ERROR [Controller-2-to-broker-1-send-thread] 
kafka.controller.RequestSendThread - [Controller-2-to-broker-1-send-thread], 
Controller 2's connection to broker id:1,host:RESTARTED_BROKER_IP,port:9092 
was unsuccessful
2014-04-18 00:38:50,314 ERROR [Controller-2-to-broker-1-send-thread] 
kafka.controller.RequestSendThread - [Controller-2-to-broker-1-send-thread], 
Controller 2 epoch 8 failed to send UpdateMetadata request with correlation id 
3854 to broker id:1,host:RESTARTED_BROKER_IP,port:9092. Reconnecting to 
broker.


On Apr 18, 2014, at 10:41 AM, Jun Rao jun...@gmail.com wrote:

 Any errors from the controller/state-change log?
 
 Thanks,
 
 Jun
 
 
 On Fri, Apr 18, 2014 at 9:57 AM, Alex Demidko 
 alexan...@metamarkets.comwrote:
 
 Hi,
 
 I’m performing a producing load test on two node kafka cluster built from
 the last 0.8.1 branch sources. I have topic loadtest with replication
 factor 2 and 256 partitions. Initially both brokers are in ISR and
 leadership is balanced. When in the middle of the load test one broker was
 restarted (wasn’t able to go with controlled shutdown in specified time and
 was killed), I started receiving following errors which as far as I
 understand coming from replication:
 
 
 On restarted broker
 
 2014-04-18 16:15:02,214 ERROR [ReplicaFetcherThread-5-2]
 kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-5-2], Error in
 fetch Name: FetchRequest; Version: 0; CorrelationId: 52890; ClientId:
 ReplicaFetcherThread-5-2; ReplicaId: 1; MaxWait: 1000 ms; MinBytes: 1
 bytes; RequestInfo: [loadtest2,71] -
 PartitionFetchInfo(0,104857600),[loadtest,85] -
 PartitionFetchInfo(113676000,104857600),[loadtest,189] -
 PartitionFetchInfo(112277000,104857600),[loadtest,21] -
 PartitionFetchInfo(0,104857600),[loadtest,205] -
 PartitionFetchInfo(112986000,104857600),[loadtest,141] -
 PartitionFetchInfo(0,104857600),[loadtest,253] -
 PartitionFetchInfo(0,104857600),[loadtest,77] -
 PartitionFetchInfo(0,104857600),[loadtest,61] -
 PartitionFetchInfo(11249,104857600),[loadtest,229] -
 PartitionFetchInfo(112805000,104857600),[loadtest,133] -
 PartitionFetchInfo(0,104857600),[loadtest2,15] -
 PartitionFetchInfo(0,104857600),[loadtest2,63] -
 PartitionFetchInfo(0,104857600),[loadtest,181] -
 PartitionFetchInfo(0,104857600),[loadtest,5] -
 PartitionFetchInfo(11253,104857600),[loadtest,29] -
 PartitionFetchInfo(0,104857600),[loadtest,45] -
 PartitionFetchInfo(113113000,104857600),[loadtest2,39] -
 PartitionFetchInfo(0,104857600),[loadtest,37] -
 PartitionFetchInfo(112145000,104857600),[loadtest,13] -
 PartitionFetchInfo(112915000,104857600),[loadtest,237] -
 PartitionFetchInfo(112896000,104857600),[loadtest,149] -
 PartitionFetchInfo(113232000,104857600),[loadtest,117] -
 PartitionFetchInfo(11310,104857600),[loadtest,157] -
 PartitionFetchInfo(0,104857600),[loadtest,165] -
 PartitionFetchInfo(0,104857600),[loadtest,101] -
 PartitionFetchInfo(0,104857600),[loadtest,93] -
 PartitionFetchInfo(113025000,104857600),[loadtest,125] -
 PartitionFetchInfo(112896000,104857600),[loadtest,197] -
 PartitionFetchInfo(0,104857600),[loadtest,109] -
 PartitionFetchInfo(0,104857600),[loadtest,245] -
 PartitionFetchInfo(0,104857600),[loadtest,213] -
 PartitionFetchInfo(0,104857600),[loadtest,53] -
 PartitionFetchInfo(0,104857600),[loadtest,173] -
 PartitionFetchInfo(112757000,104857600),[loadtest,69] -
 PartitionFetchInfo(112378000,104857600),[loadtest,221] -
 PartitionFetchInfo(0,104857600)
 java.io.EOFException: Received -1 when reading from channel, socket has
 likely been closed.
at kafka.utils.Utils$.read(Utils.scala:376)
at
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at
 kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
 kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at
 kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
at
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109

Re: KafkaException: This operation cannot be completed on a complete request

2014-04-18 Thread Alex Demidko
]
kafka.server.KafkaHealthcheck - Subscribing to /brokers/topics path to
watch for new topics
2014-04-18 20:45:07,998 WARN  [kafka-request-handler-20]
state.change.logger - Broker 2 received invalid LeaderAndIsr request with
correlation id 11 from controller 1 epoch 13 with an older leader epoch 22
for partition [loadtest,66], current leader epoch is 22
(...)
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.
at kafka.utils.Utils$.read(Utils.scala:376)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)


Even if I restart RestartedNode it doesn't help, only the LeaderNode
restart helps which I'd really like to avoid because leader is the only
in-sync replica (and also there might be multiple leaders for different
partitions).

Another question why there is an OOME on a RestartedNode.





On Fri, Apr 18, 2014 at 1:30 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hello Alex,

 I think this is a bug on the FetchResponseSend class. Just to confirm,
 before the

 kafka.common.KafkaException: This operation cannot be completed on a
 complete request.

 do you see other warn/error logs on the current leader?

 Guozhang


 On Fri, Apr 18, 2014 at 11:57 AM, Alexander Demidko 
 alexan...@metamarkets.com wrote:

  Have tried to reproduce this error, and it occurs pretty consistently
 when
  node being forcefully shutdown w/o graceful termination. When graceful
  shutdown was successful no errors occur in a log when the instance was
  rebooted starts.
 
 
  On Fri, Apr 18, 2014 at 11:17 AM, Alex Demidko 
 alexan...@metamarkets.com
  wrote:
 
   These on alive node:
  
   2014-04-17 21:36:29,276 ERROR [ZkClient-EventThread-15]
   state.change.logger - Controller 2 epoch 8 encountered error while
  electing
   leader for partition [loadtest,143] due to: Preferred replica 1 for
   partition [loadtest,143] is either not alive or not in the isr. Current
   leader and ISR: [{leader:2,leader_epoch:11,isr:[2]}].
   2014-04-17 21:36:29,276 ERROR [ZkClient-EventThread-15]
   state.change.logger - Controller 2 epoch 8 initiated state change for
   partition [loadtest,143] from OnlinePartition to OnlinePartition failed
  
   2014-04-18 00:38:50,014 ERROR [Controller-2-to-broker-1-send-thread]
   kafka.controller.RequestSendThread -
   [Controller-2-to-broker-1-send-thread], Controller 2's connection to
  broker
   id:1,host:RESTARTED_BROKER_IP,port:9092 was unsuccessful
   2014-04-18 00:38:50,314 ERROR [Controller-2-to-broker-1-send-thread]
   kafka.controller.RequestSendThread -
   [Controller-2-to-broker-1-send-thread], Controller 2 epoch 8 failed to
  send
   UpdateMetadata request with correlation id 3854 to broker
   id:1,host:RESTARTED_BROKER_IP,port:9092. Reconnecting to broker.
  
  
   On Apr 18, 2014, at 10:41 AM, Jun Rao jun...@gmail.com wrote:
  
Any errors from the controller/state-change log?
   
Thanks,
   
Jun
   
   
On Fri, Apr 18, 2014 at 9:57 AM, Alex Demidko 
  alexan...@metamarkets.com
   wrote:
   
Hi,
   
I'm performing a producing load test on two node kafka cluster built
   from
the last 0.8.1 branch sources. I have topic loadtest with
 replication
factor 2 and 256 partitions. Initially both brokers are in ISR and
leadership is balanced. When in the middle of the load test one
 broker
   was
restarted (wasn't able to go with controlled shutdown in specified
  time
   and
was killed), I started

Re: KafkaException: This operation cannot be completed on a complete request

2014-04-18 Thread Alex Demidko
Tried to reproduce this one more time. I was using kill -9 shutdown to test
reiliability, with graceful termination I haven't seen this problem to
arise. Leader node started complaining that ReplicaFetcherThread can't
connect to other node and that Producer can't send request to terminated
node, but I guess this is pretty much expectable.

In overall, the pattern to more or less stable reproduce is the following:

Node 1(RestartedNode) and 2(LeaderNode) both in ISR and with balanced
leadership
Kill -9 node 1
Start node 1
Node 1 recovers segments and starts broker with a little suspicious
message: kafka.utils.ZkUtils$ - conflict in /controller data:
{version:1,brokerid:1,timestamp:1397864759089} stored data:
{version:1,brokerid:2,timestamp:1397863473709}
Node 2 expands ISR for all partitions from 2 to 2,1
Node 2 starts emit Closing socket for /Node1 because of error
KafkaException: This operation cannot be completed on a complete request,
and Node 1 - EOFException: Received -1 when reading from channel, socket
has likely been closed

There is no other exceptions in the log.



On Fri, Apr 18, 2014 at 3:21 PM, Guozhang Wang wangg...@gmail.com wrote:

 When you are shutting down the restart node, did you see any warn/error on
 the leader logs?

 Guozhang


 On Fri, Apr 18, 2014 at 1:58 PM, Alex Demidko alexan...@metamarkets.com
 wrote:

  Last time saw this exception when tried to use rebalance leadership with
  kafka-preferred-replica-election.sh. That's what got in logs:
 
  LeaderNode: just kafka.common.KafkaException: This operation cannot be
  completed on a complete request without any other exceptions.
 
 
  RestartedNode:
 
  2014-04-18 20:43:39,281 INFO  [ReplicaFetcherThread-2-1] kafka.log.Log -
  Rolled new log segment for 'loadtest-170' in 1 ms.
  java.lang.OutOfMemoryError: Java heap space
  Dumping heap to java_pid28305.hprof ...
  Heap dump file created [13126900997 bytes in 11.728 secs]
  2014-04-18 20:44:02,299 INFO  [main-SendThread]
  org.apache.zookeeper.ClientCnxn - Client session timed out, have not
 heard
  from server in 28101ms for sessionid 0x7455b
  0b68302016, closing socket connection and attempting reconnect
  2014-04-18 20:44:02,328 ERROR [ReplicaFetcherThread-2-1]
  kafka.network.BoundedByteBufferReceive - OOME with size 2022780218
  java.lang.OutOfMemoryError: Java heap space
  at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
  at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
  at
 
 
 kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
  at
 
 
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
  at
  kafka.network.Receive$class.readCompletely(Transmission.scala:56)
  at
 
 
 kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
  at
 kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
  at
  kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73)
  at
 
 
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
  at
 
 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
  at
 
 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
  at
 
 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
  at
 
 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
  at
 
 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
  at
 
 
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
  at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
  at
 
 
 kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
  at
  kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
  at
 kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
  (...)
  2014-04-18 20:44:41,212 ERROR [ZkClient-EventThread]
  org.I0Itec.zkclient.ZkEventThread - Error handling event ZkEvent[New
  session event sent to
  kafka.controller.KafkaController$SessionExpirationListener@7ea9f7b8]
  java.lang.IllegalStateException: Kafka scheduler has not been started
  at
  kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:116)
  at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86)
  at
 
 
 kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:349)
  at
 
 
 kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:339