Mazhar,

There is probably a mis-understanding. Ack=-1 (or all) doesn't mean waiting
for all replicas. It means waiting for all replicas that are in sync. So,
if a replica is down, it will be removed from the in-sync replicas, which
allows the producer to continue with fewer replicas.

For the connection issue that you saw in the log, this could happen when a
connection is idle for some time. It won't break the replication logic
since a new connection will be created automatically. You can increase the
socket idle time on the broker if you want to turn off this behavior.

Thanks,

Jun

On Thu, Aug 18, 2016 at 12:07 AM, Mazhar Shaikh <mazhar.shaikh...@gmail.com>
wrote:

> Hi Jun,
>
> Setting to -1, may solve this issue.
> But it will cause producer buffer full in load test resulting to failures
> and drop of messages from client(producer side)
> Hence, this will not actually solve the problem.
>
> I need to fix this from kafka broker side, so that there is no impact on
> producer or consumer.
>
> From the logs looks like there is connection problem during between brokers
> and kafka broker is loosing records during this process.
>
> But why is kafka broker loosing records,
>
> I feel this is a BUG in kafka.
>
> [2016-08-17 12:54:50,293] TRACE [Controller 2]: checking need to trigger
> partition rebalance (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,294] DEBUG [Controller 2]: preferred replicas by
> broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0, 1),
> [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] ->
> List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1),
> [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] ->
> List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1),
> [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] ->
> List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] -> List(5, 3),
> [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] ->
> List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3),
> [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] ->
> List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3),
> [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] ->
> List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1 ->
> Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69] ->
> List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0),
> [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] ->
> List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0),
> [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] ->
> List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0),
> [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4), [topic1,3]
> -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4),
> [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] -> List(2,
> 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11] ->
> List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4), [topic1,4]
> -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4),
> [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5), [topic1,30]
> -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5),
> [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] ->
> List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5),
> [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] ->
> List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5),
> [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 -> Map([topic1,53]
> -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2),
> [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] ->
> List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2),
> [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] ->
> List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2),
> [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] ->
> List(4, 2))) (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance ratio for
> broker 0 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance ratio for
> broker 5 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,294] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,294] TRACE [Controller 2]: leader imbalance ratio for
> broker 1 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance ratio for
> broker 2 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance ratio for
> broker 3 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,295] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:54:50,295] TRACE [Controller 2]: leader imbalance ratio for
> broker 4 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:55:32,783] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:55:32,894] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> topic1,36],
> [topic1,30], [topic1,31], [topic1,86], [topic1,78], [topic1,74],
> [topic1,82], [topic1,33]) (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:55:32,896] WARN [Controller-2-to-broker-2-send-thread],
> Controller 2 epoch 2 fails to send request {controller_id=2,controller_
> epoch=2,partition_states=[{topic=topic1,partition=82,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=14,replicas=[5,3]},{topic=topic1,partition=30,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=18,replicas=[3,5]},{topic=topic1,partition=78,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=12,replicas=[5,3]},{topic=topic1,partition=86,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=14,replicas=[5,3]},{topic=topic1,partition=31,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=20,replicas=[3,5]},{topic=topic1,partition=36,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=18,replicas=[3,5]},{topic=topic1,partition=74,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=14,replicas=[5,3]},{topic=topic1,partition=33,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=18,replicas=[3,5]}],live_brokers=[{id=5,end_
> points=[{port=9092,host=b5.kafka,security_protocol_type=
> 0}]},{id=3,end_points=[{port=9092,host=b3.kafka,security_
> protocol_type=0}]},{id=2,end_points=[{port=9092,host=b2.
> kafka,security_protocol_type=0}]},{id=1,end_points=[{port=
> 9092,host=b1.kafka,security_protocol_type=0}]},{id=4,end_
> points=[{port=9092,host=b4.kafka,security_protocol_type=
> 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_
> protocol_type=0}]}]}
> to broker Node(2, b2.kafka, 9092). Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.io.IOException: Connection to 2 was disconnected before the response
> was read
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:87)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:84)
>         at scala.Option.foreach(Option.scala:257)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:84)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:80)
>         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
> NetworkClientBlockingOps.scala:129)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntilFound$extension(
> NetworkClientBlockingOps.
> scala:139)
>         at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
> extension(NetworkClientBlockingOps.scala:80)
>         at kafka.controller.RequestSendThread.liftedTree1$
> 1(ControllerChannelManager.scala:180)
>         at kafka.controller.RequestSendThread.doWork(
> ControllerChannelManager.scala:171)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-08-17 12:55:32,897] WARN [Controller-2-to-broker-5-send-thread],
> Controller 2 epoch 2 fails to send request {controller_id=2,controller_
> epoch=2,partition_states=[{topic=topic1,partition=82,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=14,replicas=[5,3]},{topic=topic1,partition=30,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=18,replicas=[3,5]},{topic=topic1,partition=78,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=12,replicas=[5,3]},{topic=topic1,partition=86,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=14,replicas=[5,3]},{topic=topic1,partition=31,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=20,replicas=[3,5]},{topic=topic1,partition=36,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=18,replicas=[3,5]},{topic=topic1,partition=74,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=14,replicas=[5,3]},{topic=topic1,partition=33,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=18,replicas=[3,5]}],live_brokers=[{id=0,end_
> points=[{port=9092,host=b0.kafka,security_protocol_type=
> 0}]},{id=5,end_points=[{port=9092,host=b5.kafka,security_
> protocol_type=0}]},{id=3,end_points=[{port=9092,host=b3.
> kafka,security_protocol_type=0}]},{id=1,end_points=[{port=
> 9092,host=b1.kafka,security_protocol_type=0}]},{id=2,end_
> points=[{port=9092,host=b2.kafka,security_protocol_type=
> 0}]},{id=4,end_points=[{port=9092,host=b4.kafka,security_
> protocol_type=0}]}]}
> to broker Node(5, b5.kafka, 9092). Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.io.IOException: Connection to 5 was disconnected before the response
> was read
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:87)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:84)
>         at scala.Option.foreach(Option.scala:257)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:84)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:80)
>         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
> NetworkClientBlockingOps.scala:129)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntilFound$extension(
> NetworkClientBlockingOps.
> scala:139)
>         at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
> extension(NetworkClientBlockingOps.scala:80)
>         at kafka.controller.RequestSendThread.liftedTree1$
> 1(ControllerChannelManager.scala:180)
>         at kafka.controller.RequestSendThread.doWork(
> ControllerChannelManager.scala:171)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-08-17 12:55:32,898] WARN [Controller-2-to-broker-4-send-thread],
> Controller 2 epoch 2 fails to send request {controller_id=2,controller_
> epoch=2,partition_states=[{topic=topic1,partition=82,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=14,replicas=[5,3]},{topic=topic1,partition=30,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=18,replicas=[3,5]},{topic=topic1,partition=78,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=12,replicas=[5,3]},{topic=topic1,partition=86,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=14,replicas=[5,3]},{topic=topic1,partition=31,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=20,replicas=[3,5]},{topic=topic1,partition=36,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=18,replicas=[3,5]},{topic=topic1,partition=74,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=14,replicas=[5,3]},{topic=topic1,partition=33,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=18,replicas=[3,5]}],live_brokers=[{id=3,end_
> points=[{port=9092,host=b3.kafka,security_protocol_type=
> 0}]},{id=1,end_points=[{port=9092,host=b1.kafka,security_
> protocol_type=0}]},{id=4,end_points=[{port=9092,host=b4.
> kafka,security_protocol_type=0}]},{id=2,end_points=[{port=
> 9092,host=b2.kafka,security_protocol_type=0}]},{id=0,end_
> points=[{port=9092,host=b0.kafka,security_protocol_type=
> 0}]},{id=5,end_points=[{port=9092,host=b5.kafka,security_
> protocol_type=0}]}]}
> to broker Node(4, b4.kafka, 9092). Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.io.IOException: Connection to 4 was disconnected before the response
> was read
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:87)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:84)
>         at scala.Option.foreach(Option.scala:257)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:84)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:80)
>         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
> NetworkClientBlockingOps.scala:129)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntilFound$extension(
> NetworkClientBlockingOps.
> scala:139)
>         at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
> extension(NetworkClientBlockingOps.scala:80)
>         at kafka.controller.RequestSendThread.liftedTree1$
> 1(ControllerChannelManager.scala:180)
>         at kafka.controller.RequestSendThread.doWork(
> ControllerChannelManager.scala:171)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-08-17 12:55:32,900] WARN [Controller-2-to-broker-1-send-thread],
> Controller 2 epoch 2 fails to send request {controller_id=2,controller_
> epoch=2,partition_states=[{topic=topic1,partition=82,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=14,replicas=[5,3]},{topic=topic1,partition=30,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=18,replicas=[3,5]},{topic=topic1,partition=78,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=12,replicas=[5,3]},{topic=topic1,partition=86,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=14,replicas=[5,3]},{topic=topic1,partition=31,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=20,replicas=[3,5]},{topic=topic1,partition=36,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=18,replicas=[3,5]},{topic=topic1,partition=74,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=14,replicas=[5,3]},{topic=topic1,partition=33,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=18,replicas=[3,5]}],live_brokers=[{id=5,end_
> points=[{port=9092,host=b5.kafka,security_protocol_type=
> 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_
> protocol_type=0}]},{id=3,end_points=[{port=9092,host=b3.
> kafka,security_protocol_type=0}]},{id=1,end_points=[{port=
> 9092,host=b1.kafka,security_protocol_type=0}]},{id=4,end_
> points=[{port=9092,host=b4.kafka,security_protocol_type=
> 0}]},{id=2,end_points=[{port=9092,host=b2.kafka,security_
> protocol_type=0}]}]}
> to broker Node(1, b1.kafka, 9092). Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.io.IOException: Connection to 1 was disconnected before the response
> was read
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:87)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:84)
>         at scala.Option.foreach(Option.scala:257)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:84)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:80)
>         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
> NetworkClientBlockingOps.scala:129)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntilFound$extension(
> NetworkClientBlockingOps.
> scala:139)
>         at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
> extension(NetworkClientBlockingOps.scala:80)
>         at kafka.controller.RequestSendThread.liftedTree1$
> 1(ControllerChannelManager.scala:180)
>         at kafka.controller.RequestSendThread.doWork(
> ControllerChannelManager.scala:171)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-08-17 12:55:32,902] WARN [Controller-2-to-broker-3-send-thread],
> Controller 2 epoch 2 fails to send request {controller_id=2,controller_
> epoch=2,partition_states=[{topic=topic1,partition=82,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=14,replicas=[5,3]},{topic=topic1,partition=30,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=18,replicas=[3,5]},{topic=topic1,partition=78,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=12,replicas=[5,3]},{topic=topic1,partition=86,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=14,replicas=[5,3]},{topic=topic1,partition=31,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=20,replicas=[3,5]},{topic=topic1,partition=36,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=18,replicas=[3,5]},{topic=topic1,partition=74,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=14,replicas=[5,3]},{topic=topic1,partition=33,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=18,replicas=[3,5]}],live_brokers=[{id=3,end_
> points=[{port=9092,host=b3.kafka,security_protocol_type=
> 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_
> protocol_type=0}]},{id=5,end_points=[{port=9092,host=b5.
> kafka,security_protocol_type=0}]},{id=2,end_points=[{port=
> 9092,host=b2.kafka,security_protocol_type=0}]},{id=1,end_
> points=[{port=9092,host=b1.kafka,security_protocol_type=
> 0}]},{id=4,end_points=[{port=9092,host=b4.kafka,security_
> protocol_type=0}]}]}
> to broker Node(3, b3.kafka, 9092). Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.io.IOException: Connection to 3 was disconnected before the response
> was read
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:87)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:84)
>         at scala.Option.foreach(Option.scala:257)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:84)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:80)
>         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
> NetworkClientBlockingOps.scala:129)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntilFound$extension(
> NetworkClientBlockingOps.
> scala:139)
>         at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
> extension(NetworkClientBlockingOps.scala:80)
>         at kafka.controller.RequestSendThread.liftedTree1$
> 1(ControllerChannelManager.scala:180)
>         at kafka.controller.RequestSendThread.doWork(
> ControllerChannelManager.scala:171)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-08-17 12:55:32,903] WARN [Controller-2-to-broker-0-send-thread],
> Controller 2 epoch 2 fails to send request {controller_id=2,controller_
> epoch=2,partition_states=[{topic=topic1,partition=82,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=14,replicas=[5,3]},{topic=topic1,partition=30,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=18,replicas=[3,5]},{topic=topic1,partition=78,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=12,replicas=[5,3]},{topic=topic1,partition=86,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=14,replicas=[5,3]},{topic=topic1,partition=31,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=20,replicas=[3,5]},{topic=topic1,partition=36,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=18,replicas=[3,5]},{topic=topic1,partition=74,
> controller_epoch=2,leader=5,leader_epoch=4,isr=[5,3],zk_
> version=14,replicas=[5,3]},{topic=topic1,partition=33,
> controller_epoch=2,leader=3,leader_epoch=2,isr=[3,5],zk_
> version=18,replicas=[3,5]}],live_brokers=[{id=4,end_
> points=[{port=9092,host=b4.kafka,security_protocol_type=
> 0}]},{id=3,end_points=[{port=9092,host=b3.kafka,security_
> protocol_type=0}]},{id=1,end_points=[{port=9092,host=b1.
> kafka,security_protocol_type=0}]},{id=2,end_points=[{port=
> 9092,host=b2.kafka,security_protocol_type=0}]},{id=5,end_
> points=[{port=9092,host=b5.kafka,security_protocol_type=
> 0}]},{id=0,end_points=[{port=9092,host=b0.kafka,security_
> protocol_type=0}]}]}
> to broker Node(0, b0.kafka, 9092). Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.io.IOException: Connection to 0 was disconnected before the response
> was read
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:87)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:84)
>         at scala.Option.foreach(Option.scala:257)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:84)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:80)
>         at kafka.utils.NetworkClientBlockingOps$.recurse$1(
> NetworkClientBlockingOps.scala:129)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntilFound$extension(
> NetworkClientBlockingOps.
> scala:139)
>         at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
> extension(NetworkClientBlockingOps.scala:80)
>         at kafka.controller.RequestSendThread.liftedTree1$
> 1(ControllerChannelManager.scala:180)
>         at kafka.controller.RequestSendThread.doWork(
> ControllerChannelManager.scala:171)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-08-17 12:55:32,927] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:55:33,162] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:55:33,169] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> topic1,50])
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:55:33,194] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:55:33,198] INFO [Controller-2-to-broker-2-send-thread],
> Controller 2 connected to Node(2, b2.kafka, 9092) for sending state change
> requests (kafka.controller.RequestSendThread)
> [2016-08-17 12:55:33,199] INFO [Controller-2-to-broker-5-send-thread],
> Controller 2 connected to Node(5, b5.kafka, 9092) for sending state change
> requests (kafka.controller.RequestSendThread)
> [2016-08-17 12:55:33,200] INFO [Controller-2-to-broker-4-send-thread],
> Controller 2 connected to Node(4, b4.kafka, 9092) for sending state change
> requests (kafka.controller.RequestSendThread)
> [2016-08-17 12:55:33,202] INFO [Controller-2-to-broker-1-send-thread],
> Controller 2 connected to Node(1, b1.kafka, 9092) for sending state change
> requests (kafka.controller.RequestSendThread)
> [2016-08-17 12:55:33,204] INFO [Controller-2-to-broker-0-send-thread],
> Controller 2 connected to Node(0, b0.kafka, 9092) for sending state change
> requests (kafka.controller.RequestSendThread)
> [2016-08-17 12:55:33,207] INFO [Controller-2-to-broker-3-send-thread],
> Controller 2 connected to Node(3, b3.kafka, 9092) for sending state change
> requests (kafka.controller.RequestSendThread)
> [2016-08-17 12:55:39,981] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:55:40,018] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> topic1,34],
> [topic1,30], [topic1,31], [topic1,25], [topic1,29], [topic1,38],
> [topic1,35], [topic1,33]) (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:55:40,377] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:55:40,388] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> topic1,86],
> [topic1,78], [topic1,82]) (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:55:40,409] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 12:59:50,293] TRACE [Controller 2]: checking need to trigger
> partition rebalance (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,294] DEBUG [Controller 2]: preferred replicas by
> broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0, 1),
> [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] ->
> List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1),
> [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] ->
> List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1),
> [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] ->
> List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] -> List(5, 3),
> [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] ->
> List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3),
> [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] ->
> List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3),
> [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] ->
> List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1 ->
> Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69] ->
> List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0),
> [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] ->
> List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0),
> [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] ->
> List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0),
> [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4), [topic1,3]
> -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4),
> [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] -> List(2,
> 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11] ->
> List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4), [topic1,4]
> -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4),
> [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5), [topic1,30]
> -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5),
> [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] ->
> List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5),
> [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] ->
> List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5),
> [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 -> Map([topic1,53]
> -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2),
> [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] ->
> List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2),
> [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] ->
> List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2),
> [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] ->
> List(4, 2))) (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance ratio for
> broker 0 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance ratio for
> broker 5 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance ratio for
> broker 1 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,294] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,294] TRACE [Controller 2]: leader imbalance ratio for
> broker 2 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,295] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,295] TRACE [Controller 2]: leader imbalance ratio for
> broker 3 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,295] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 12:59:50,295] TRACE [Controller 2]: leader imbalance ratio for
> broker 4 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:00:39,546] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:39,604] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> topic1,5])
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:39,649] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:39,888] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:40,071] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> topic1,37],
> [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39],
> [topic1,36], [topic1,30], [topic1,31], [topic1,25], [topic1,29],
> [topic1,38], [topic1,26], [topic1,35], [topic1,33], [topic1,28])
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:40,103] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:40,261] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:40,283] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> topic1,72],
> [topic1,80]) (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:40,296] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:40,656] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:40,662] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> topic1,55])
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:40,934] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:47,335] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:47,393] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> topic1,37],
> [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39],
> [topic1,36], [topic1,30], [topic1,31], [topic1,25], [topic1,29],
> [topic1,38], [topic1,26], [topic1,35], [topic1,33], [topic1,28])
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:47,423] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:47,897] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:00:47,944] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> topic1,5],
> [topic1,3], [topic1,7], [topic1,11], [topic1,2], [topic1,6], [topic1,1],
> [topic1,10], [topic1,14], [topic1,9], [topic1,15]) (kafka.controller.
> IsrChangeNotificationListener)
> [2016-08-17 13:00:48,020] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:04:50,293] TRACE [Controller 2]: checking need to trigger
> partition rebalance (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,295] DEBUG [Controller 2]: preferred replicas by
> broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0, 1),
> [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] ->
> List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1),
> [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] ->
> List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1),
> [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] ->
> List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] -> List(5, 3),
> [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] ->
> List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3),
> [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] ->
> List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3),
> [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] ->
> List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1 ->
> Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69] ->
> List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0),
> [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] ->
> List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0),
> [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] ->
> List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0),
> [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4), [topic1,3]
> -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4),
> [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] -> List(2,
> 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11] ->
> List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4), [topic1,4]
> -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4),
> [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5), [topic1,30]
> -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5),
> [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] ->
> List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5),
> [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] ->
> List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5),
> [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 -> Map([topic1,53]
> -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2),
> [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] ->
> List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2),
> [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] ->
> List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2),
> [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] ->
> List(4, 2))) (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,295] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,295] TRACE [Controller 2]: leader imbalance ratio for
> broker 0 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,295] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio for
> broker 5 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio for
> broker 1 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio for
> broker 2 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio for
> broker 3 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,296] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:04:50,296] TRACE [Controller 2]: leader imbalance ratio for
> broker 4 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:05:34,317] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:34,365] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> topic1,80],
> [topic1,40], [topic1,21], [topic1,31], [topic1,84], [topic1,33])
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:34,388] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:36,426] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:36,437] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> topic1,92])
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:36,699] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:40,225] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:40,239] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> topic1,80],
> [topic1,84]) (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:40,246] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:40,958] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:41,006] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> topic1,22],
> [topic1,16], [topic1,20], [topic1,19], [topic1,40], [topic1,21],
> [topic1,18], [topic1,47], [topic1,44], [topic1,45], [topic1,42],
> [topic1,46], [topic1,43], [topic1,23]) (kafka.controller.
> IsrChangeNotificationListener)
> [2016-08-17 13:05:41,067] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:42,517] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:05:42,622] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> topic1,37],
> [topic1,27], [topic1,34], [topic1,32], [topic1,24], [topic1,39],
> [topic1,30], [topic1,31], [topic1,25], [topic1,29], [topic1,38],
> [topic1,26], [topic1,35], [topic1,33], [topic1,28]) (kafka.controller.
> IsrChangeNotificationListener)
> [2016-08-17 13:05:42,690] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:09:50,293] TRACE [Controller 2]: checking need to trigger
> partition rebalance (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,295] DEBUG [Controller 2]: preferred replicas by
> broker Map(0 -> Map([topic1,45] -> List(0, 1), [topic1,17] -> List(0, 1),
> [topic1,19] -> List(0, 1), [topic1,42] -> List(0, 1), [topic1,43] ->
> List(0, 1), [topic1,44] -> List(0, 1), [topic1,16] -> List(0, 1),
> [topic1,46] -> List(0, 1), [topic1,20] -> List(0, 1), [topic1,41] ->
> List(0, 1), [topic1,18] -> List(0, 1), [topic1,22] -> List(0, 1),
> [topic1,40] -> List(0, 1), [topic1,47] -> List(0, 1), [topic1,23] ->
> List(0, 1), [topic1,21] -> List(0, 1)), 5 -> Map([topic1,78] -> List(5, 3),
> [topic1,84] -> List(5, 3), [topic1,87] -> List(5, 3), [topic1,74] ->
> List(5, 3), [topic1,81] -> List(5, 3), [topic1,73] -> List(5, 3),
> [topic1,80] -> List(5, 3), [topic1,77] -> List(5, 3), [topic1,75] ->
> List(5, 3), [topic1,85] -> List(5, 3), [topic1,76] -> List(5, 3),
> [topic1,83] -> List(5, 3), [topic1,86] -> List(5, 3), [topic1,72] ->
> List(5, 3), [topic1,79] -> List(5, 3), [topic1,82] -> List(5, 3)), 1 ->
> Map([topic1,92] -> List(1, 0), [topic1,95] -> List(1, 0), [topic1,69] ->
> List(1, 0), [topic1,93] -> List(1, 0), [topic1,70] -> List(1, 0),
> [topic1,67] -> List(1, 0), [topic1,65] -> List(1, 0), [topic1,88] ->
> List(1, 0), [topic1,90] -> List(1, 0), [topic1,66] -> List(1, 0),
> [topic1,94] -> List(1, 0), [topic1,64] -> List(1, 0), [topic1,89] ->
> List(1, 0), [topic1,68] -> List(1, 0), [topic1,71] -> List(1, 0),
> [topic1,91] -> List(1, 0)), 2 -> Map([topic1,8] -> List(2, 4), [topic1,3]
> -> List(2, 4), [topic1,15] -> List(2, 4), [topic1,2] -> List(2, 4),
> [topic1,1] -> List(2, 4), [topic1,6] -> List(2, 4), [topic1,9] -> List(2,
> 4), [topic1,12] -> List(2, 4), [topic1,14] -> List(2, 4), [topic1,11] ->
> List(2, 4), [topic1,13] -> List(2, 4), [topic1,0] -> List(2, 4), [topic1,4]
> -> List(2, 4), [topic1,5] -> List(2, 4), [topic1,10] -> List(2, 4),
> [topic1,7] -> List(2, 4)), 3 -> Map([topic1,33] -> List(3, 5), [topic1,30]
> -> List(3, 5), [topic1,24] -> List(3, 5), [topic1,36] -> List(3, 5),
> [topic1,38] -> List(3, 5), [topic1,26] -> List(3, 5), [topic1,27] ->
> List(3, 5), [topic1,39] -> List(3, 5), [topic1,29] -> List(3, 5),
> [topic1,34] -> List(3, 5), [topic1,28] -> List(3, 5), [topic1,32] ->
> List(3, 5), [topic1,35] -> List(3, 5), [topic1,25] -> List(3, 5),
> [topic1,31] -> List(3, 5), [topic1,37] -> List(3, 5)), 4 -> Map([topic1,53]
> -> List(4, 2), [topic1,56] -> List(4, 2), [topic1,49] -> List(4, 2),
> [topic1,50] -> List(4, 2), [topic1,51] -> List(4, 2), [topic1,58] ->
> List(4, 2), [topic1,63] -> List(4, 2), [topic1,54] -> List(4, 2),
> [topic1,48] -> List(4, 2), [topic1,61] -> List(4, 2), [topic1,62] ->
> List(4, 2), [topic1,57] -> List(4, 2), [topic1,60] -> List(4, 2),
> [topic1,52] -> List(4, 2), [topic1,55] -> List(4, 2), [topic1,59] ->
> List(4, 2))) (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,295] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,295] TRACE [Controller 2]: leader imbalance ratio for
> broker 0 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,295] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance ratio for
> broker 5 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance ratio for
> broker 1 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance ratio for
> broker 2 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,296] TRACE [Controller 2]: leader imbalance ratio for
> broker 3 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,296] DEBUG [Controller 2]: topics not in preferred
> replica Map() (kafka.controller.KafkaController)
> [2016-08-17 13:09:50,297] TRACE [Controller 2]: leader imbalance ratio for
> broker 4 is 0.000000 (kafka.controller.KafkaController)
> [2016-08-17 13:10:37,278] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:10:37,292] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> topic1,67],
> [topic1,95]) (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:10:37,304] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:10:43,375] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:10:43,383] DEBUG Sending MetadataRequest to
> Brokers:ArrayBuffer(0, 5, 1, 2, 3, 4) for TopicAndPartitions:Set([
> topic1,67],
> [topic1,95]) (kafka.controller.IsrChangeNotificationListener)
> [2016-08-17 13:10:43,394] DEBUG [IsrChangeNotificationListener] Fired!!!
> (kafka.controller.IsrChangeNotificationListener)
>
> Thanks
>
> Regards,
> Mazhar Shaikh.
>
>
>
> On Wed, Aug 17, 2016 at 9:50 PM, Jun Rao <j...@confluent.io> wrote:
>
> > Yes, you can try setting it to -1 in 0.8.1, which is the equivalent of
> > "all" in 0.9 and above.
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Aug 17, 2016 at 8:32 AM, Mazhar Shaikh <
> mazhar.shaikh...@gmail.com
> > >
> > wrote:
> >
> > > Hi Jun,
> > >
> > > I'm using default configuration (ack=1),
> > > changing it t0 all or 2 will not help, as the producer queue will be
> > > exhausted is any kafka broker goes down for long time.
> > >
> > >
> > > Thanks.
> > >
> > > Regards,
> > > Mazhar Shaikh.
> > >
> > >
> > > On Wed, Aug 17, 2016 at 8:11 PM, Jun Rao <j...@confluent.io> wrote:
> > >
> > > > Are you using acks=1 or acks=all in the producer? Only the latter
> > > > guarantees acked messages won't be lost after leader failure.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Wed, Aug 10, 2016 at 11:41 PM, Mazhar Shaikh <
> > > > mazhar.shaikh...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Kafka Team,
> > > > >
> > > > > I'm using kafka (kafka_2.11-0.9.0.1) with librdkafka (0.8.1) API
> for
> > > > > producer
> > > > > During a run of 2hrs, I notice the total number of messaged ack'd
> by
> > > > > librdkafka delivery report is greater than the maxoffset of a
> > partition
> > > > in
> > > > > kafka broker.
> > > > > I'm running kafka broker with replication factor of 2.
> > > > >
> > > > > Here, message has been lost between librdkafka - kafka broker.
> > > > >
> > > > > As librdkafka is providing success delivery report for all the
> > > messages.
> > > > >
> > > > > Looks like kafka broker is dropping the messages after
> acknowledging
> > > > > librdkafka.
> > > > >
> > > > > Requesting you help in solving this issue.
> > > > >
> > > > > Thank you.
> > > > >
> > > > >
> > > > > Regards
> > > > > Mazhar Shaikh
> > > > >
> > > >
> > >
> >
>

Reply via email to