Re: Log compaction not working as expected
Some further information, and is this a bug? I'm using 0.8.2.1. Log compaction will only occur on the non active segments. Intentional or not, it seems that the last segment is always the active segment. In other words, an expired segment will not be cleaned until a new segment has been created. As a result, a log won't be compacted until new data comes in (per partition). Does this mean I need to send the equivalent of a pig ( https://en.wikipedia.org/wiki/Pigging) through each partition in order to force compaction? Or can I force the cleaning somehow? Here are the steps to recreate: 1. Create a new topic with a 5 minute segment.ms: kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC --replication-factor 1 --partitions 1 --config cleanup.policy=compact --config min.cleanable.dirty.ratio=0.01 --config segment.ms=30 2. Repeatedly add messages with identical keys (3x): echo ABC123,{\test\: 1} | kafka-console-producer.sh --broker-list localhost:9092 --topic TEST_TOPIC --property parse.key=true --property key.separator=, --new-producer 3. Wait 5+ minutes and confirm no log compaction. 4. Once satisfied, send a new message: echo DEF456,{\test\: 1} | kafka-console-producer.sh --broker-list localhost:9092 --topic TEST_TOPIC --property parse.key=true --property key.separator=, --new-producer 5. Log compaction will occur quickly soon after. Is my use case of infrequent logs not supported? Is this intentional behavior? It's unnecessarily challenging to target each partition with a dummy message to trigger compaction. Also, I believe there is another issue with logs originally configured without a segment timeout that lead to my original issue. I still cannot get those logs to compact. Thanks! Shayne
Re: Broken auto leader rebalance after using reassign partitions tool
Hi Gwen, sure, the following commands were executed: ./kafka-reassign-partitions.sh --zookeeper XXX --reassignment-json-file ~/partition_redist.json --execute ./kafka-reassign-partitions.sh --zookeeper XXX --reassignment-json-file ~/partition_redist.json --verify The contents of partition_redist.json are: { partitions: [ { topic: T1, partition: 0, replicas: [1,3,5] }, { topic: T1, partition: 1, replicas: [2,4,6] }, { topic: T1, partition: 2, replicas: [1,3,5] }, { topic: T1, partition: 3, replicas: [2,4,6] }, { topic: T1, partition: 4, replicas: [1,3,5] }, { topic: T1, partition: 5, replicas: [2,4,6] }, { topic: T1, partition: 6, replicas: [1,3,5] }, { topic: T1, partition: 7, replicas: [2,4,6] }, { topic: T1, partition: 8, replicas: [1,3,5] }, { topic: T1, partition: 9, replicas: [2,4,6] }, { topic: T2, partition: 0, replicas: [1,3,5] }, { topic: T2, partition: 1, replicas: [2,4,6] }, { topic: T2, partition: 2, replicas: [1,3,5] }, { topic: T2, partition: 3, replicas: [2,4,6] }, { topic: T2, partition: 4, replicas: [1,3,5] }, { topic: T2, partition: 5, replicas: [2,4,6] }, { topic: T2, partition: 6, replicas: [1,3,5] }, { topic: T2, partition: 7, replicas: [2,4,6] }, { topic: T2, partition: 8, replicas: [1,3,5] }, { topic: T2, partition: 9, replicas: [2,4,6] }, { topic: T3, partition: 0, replicas: [1,3,5] }, { topic: T3, partition: 1, replicas: [2,4,6] }, { topic: T3, partition: 2, replicas: [1,3,5] }, { topic: T3, partition: 3, replicas: [2,4,6] }, { topic: T3, partition: 4, replicas: [1,3,5] }, { topic: T3, partition: 5, replicas: [2,4,6] }, { topic: T3, partition: 6, replicas: [1,3,5] }, { topic: T3, partition: 7, replicas: [2,4,6] }, { topic: T3, partition: 8, replicas: [1,3,5] }, { topic: T3, partition: 9, replicas: [2,4,6] }, { topic: T4, partition: 0, replicas: [1,3,5] }, { topic: T4, partition: 1, replicas: [2,4,6] }, { topic: T4, partition: 2, replicas: [1,3,5] }, { topic: T4, partition: 3, replicas: [2,4,6] }, { topic: T4, partition: 4, replicas: [1,3,5] }, { topic: T4, partition: 5, replicas: [2,4,6] }, { topic: T4, partition: 6, replicas: [1,3,5] }, { topic: T4, partition: 7, replicas: [2,4,6] }, { topic: T4, partition: 8, replicas: [1,3,5] }, { topic: T4, partition: 9, replicas: [2,4,6] }, { topic: T5, partition: 0, replicas: [1,3,5] }, { topic: T5, partition: 1, replicas: [2,4,6] }, { topic: T5, partition: 2, replicas: [1,3,5] }, { topic: T5, partition: 3, replicas: [2,4,6] }, { topic: T5, partition: 4, replicas: [1,3,5] }, { topic: T5, partition: 5, replicas: [2,4,6] }, { topic: T5, partition: 6, replicas: [1,3,5] }, { topic: T5, partition: 7, replicas: [2,4,6] }, { topic: T5, partition: 8, replicas: [1,3,5] }, { topic: T5, partition: 9, replicas: [2,4,6] }, { topic: T6, partition: 0, replicas: [1,3,5] }, { topic: T6, partition: 1, replicas: [2,4,6] }, { topic: T6, partition: 2, replicas: [1,3,5] }, { topic: T6, partition: 3, replicas: [2,4,6] }, { topic: T6, partition: 4, replicas: [1,3,5] }, { topic: T6, partition: 5, replicas: [2,4,6] }, { topic: T6, partition: 6, replicas: [1,3,5] }, { topic: T6, partition: 7, replicas: [2,4,6] }, { topic: T6, partition: 8, replicas: [1,3,5] }, { topic: T6, partition: 9, replicas: [2,4,6] }, { topic: Search, partition: 0, replicas: [1,3,5] }, { topic: Search, partition: 1, replicas: [2,4,6] }, { topic: Search, partition: 2, replicas: [1,3,5] }, { topic: Search, partition: 3, replicas: [2,4,6] }, { topic: Search, partition: 4, replicas: [1,3,5] }, { topic: Search, partition: 5, replicas: [2,4,6] }, { topic: Search, partition: 6, replicas: [1,3,5] }, { topic: Search, partition: 7, replicas: [2,4,6] }, { topic: Search, partition: 8, replicas: [1,3,5] }, { topic: Search, partition: 9, replicas: [2,4,6] } ], version: 1 } Greetings Valentin -Original Message- From: Gwen Shapira gshap...@cloudera.com Reply-To: users@kafka.apache.org users@kafka.apache.org Date: Montag, 15. Juni 2015 18:31 To: users@kafka.apache.org users@kafka.apache.org Subject: Re: Broken auto leader rebalance after using reassign partitions tool Can you share the command you ran for partition reassignment? (and the JSON) On Mon, Jun 15, 2015 at 8:41 AM, Valentin kafka-...@sblk.de wrote: Hi guys, today I have observed a very strange behavior of the auto leader rebalance feature after I used the reassign partitions tool. For some reason only the first two of my six brokers are now used as leaders. Example: # ./kafka-topics.sh --zookeeper xxx --describe --topic Search Topic:SearchPartitionCount:10 ReplicationFactor:3 Configs: Topic: Search Partition: 0Leader: 1 Replicas: 1,3,5 Isr: 5,3,1 Topic: Search Partition: 1Leader: 2
Re: Log compaction not working as expected
Hi, Your observation is correct. we never compact the active segment. Some improvements are proposed here, https://issues.apache.org/jira/browse/KAFKA-1981 Manikumar On Tue, Jun 16, 2015 at 5:35 PM, Shayne S shaynest...@gmail.com wrote: Some further information, and is this a bug? I'm using 0.8.2.1. Log compaction will only occur on the non active segments. Intentional or not, it seems that the last segment is always the active segment. In other words, an expired segment will not be cleaned until a new segment has been created. As a result, a log won't be compacted until new data comes in (per partition). Does this mean I need to send the equivalent of a pig ( https://en.wikipedia.org/wiki/Pigging) through each partition in order to force compaction? Or can I force the cleaning somehow? Here are the steps to recreate: 1. Create a new topic with a 5 minute segment.ms: kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC --replication-factor 1 --partitions 1 --config cleanup.policy=compact --config min.cleanable.dirty.ratio=0.01 --config segment.ms=30 2. Repeatedly add messages with identical keys (3x): echo ABC123,{\test\: 1} | kafka-console-producer.sh --broker-list localhost:9092 --topic TEST_TOPIC --property parse.key=true --property key.separator=, --new-producer 3. Wait 5+ minutes and confirm no log compaction. 4. Once satisfied, send a new message: echo DEF456,{\test\: 1} | kafka-console-producer.sh --broker-list localhost:9092 --topic TEST_TOPIC --property parse.key=true --property key.separator=, --new-producer 5. Log compaction will occur quickly soon after. Is my use case of infrequent logs not supported? Is this intentional behavior? It's unnecessarily challenging to target each partition with a dummy message to trigger compaction. Also, I believe there is another issue with logs originally configured without a segment timeout that lead to my original issue. I still cannot get those logs to compact. Thanks! Shayne
Re: Log compaction not working as expected
Ok..I got your point. Currently we check the log segment constraints (segment.bytes, segment.ms) only before appending new messages. So we will not create a new log segment until new data comes. In your case, your approach(sending periodic dummy/ping message) should be fine. On Tue, Jun 16, 2015 at 7:19 PM, Shayne S shaynest...@gmail.com wrote: Thank you for the response! Unfortunately, those improvements would not help. It is the lack of activity resulting in a new segment that prevents compaction. I was confused by what qualifies as the active segment. The active segment is the last segment as opposed to the segment that would be written to if something were received right now. On Tue, Jun 16, 2015 at 8:38 AM, Manikumar Reddy ku...@nmsworks.co.in wrote: Hi, Your observation is correct. we never compact the active segment. Some improvements are proposed here, https://issues.apache.org/jira/browse/KAFKA-1981 Manikumar On Tue, Jun 16, 2015 at 5:35 PM, Shayne S shaynest...@gmail.com wrote: Some further information, and is this a bug? I'm using 0.8.2.1. Log compaction will only occur on the non active segments. Intentional or not, it seems that the last segment is always the active segment. In other words, an expired segment will not be cleaned until a new segment has been created. As a result, a log won't be compacted until new data comes in (per partition). Does this mean I need to send the equivalent of a pig ( https://en.wikipedia.org/wiki/Pigging) through each partition in order to force compaction? Or can I force the cleaning somehow? Here are the steps to recreate: 1. Create a new topic with a 5 minute segment.ms: kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC --replication-factor 1 --partitions 1 --config cleanup.policy=compact --config min.cleanable.dirty.ratio=0.01 --config segment.ms=30 2. Repeatedly add messages with identical keys (3x): echo ABC123,{\test\: 1} | kafka-console-producer.sh --broker-list localhost:9092 --topic TEST_TOPIC --property parse.key=true --property key.separator=, --new-producer 3. Wait 5+ minutes and confirm no log compaction. 4. Once satisfied, send a new message: echo DEF456,{\test\: 1} | kafka-console-producer.sh --broker-list localhost:9092 --topic TEST_TOPIC --property parse.key=true --property key.separator=, --new-producer 5. Log compaction will occur quickly soon after. Is my use case of infrequent logs not supported? Is this intentional behavior? It's unnecessarily challenging to target each partition with a dummy message to trigger compaction. Also, I believe there is another issue with logs originally configured without a segment timeout that lead to my original issue. I still cannot get those logs to compact. Thanks! Shayne
Re: Log compaction not working as expected
Thank you for the response! Unfortunately, those improvements would not help. It is the lack of activity resulting in a new segment that prevents compaction. I was confused by what qualifies as the active segment. The active segment is the last segment as opposed to the segment that would be written to if something were received right now. On Tue, Jun 16, 2015 at 8:38 AM, Manikumar Reddy ku...@nmsworks.co.in wrote: Hi, Your observation is correct. we never compact the active segment. Some improvements are proposed here, https://issues.apache.org/jira/browse/KAFKA-1981 Manikumar On Tue, Jun 16, 2015 at 5:35 PM, Shayne S shaynest...@gmail.com wrote: Some further information, and is this a bug? I'm using 0.8.2.1. Log compaction will only occur on the non active segments. Intentional or not, it seems that the last segment is always the active segment. In other words, an expired segment will not be cleaned until a new segment has been created. As a result, a log won't be compacted until new data comes in (per partition). Does this mean I need to send the equivalent of a pig ( https://en.wikipedia.org/wiki/Pigging) through each partition in order to force compaction? Or can I force the cleaning somehow? Here are the steps to recreate: 1. Create a new topic with a 5 minute segment.ms: kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC --replication-factor 1 --partitions 1 --config cleanup.policy=compact --config min.cleanable.dirty.ratio=0.01 --config segment.ms=30 2. Repeatedly add messages with identical keys (3x): echo ABC123,{\test\: 1} | kafka-console-producer.sh --broker-list localhost:9092 --topic TEST_TOPIC --property parse.key=true --property key.separator=, --new-producer 3. Wait 5+ minutes and confirm no log compaction. 4. Once satisfied, send a new message: echo DEF456,{\test\: 1} | kafka-console-producer.sh --broker-list localhost:9092 --topic TEST_TOPIC --property parse.key=true --property key.separator=, --new-producer 5. Log compaction will occur quickly soon after. Is my use case of infrequent logs not supported? Is this intentional behavior? It's unnecessarily challenging to target each partition with a dummy message to trigger compaction. Also, I believe there is another issue with logs originally configured without a segment timeout that lead to my original issue. I still cannot get those logs to compact. Thanks! Shayne
Re: HDD or SSD or EBS for kafka brokers in Amazon EC2
Should not matter. We're running 12.04. Wes On Jun 16, 2015 12:18 PM, Henry Cai h...@pinterest.com.invalid wrote: Does it still matter whether we are using Ubuntu 14 or 12? On Tue, Jun 16, 2015 at 8:44 AM, Wesley Chow w...@chartbeat.com wrote: A call with Amazon confirmed instability for d2 and c4 instances triggered by lots of network activity. They fixed the problem and have since rolled it out. We've been running Kafka with d2's for a little while now and so far so good. Wes On Tue, Jun 2, 2015 at 1:39 PM, Wes Chow w...@chartbeat.com wrote: We have run d2 instances with Kafka. They're currently unstable -- Amazon confirmed a host issue with d2 instances that gets tickled by a Kafka workload yesterday. Otherwise, it seems the d2 instance type is ideal as it gets an enormous amount of disk throughput and you'll likely be network bottlenecked. Wes Steven Wu stevenz...@gmail.com June 2, 2015 at 1:07 PM EBS (network attached storage) has got a lot better over the last a few years. we don't quite trust it for kafka workload. At Netflix, we were going with the new d2 instance type (HDD). our perf/load testing shows it satisfy our workload. SSD is better in latency curve but pretty comparable in terms of throughput. we can use the extra space from HDD for longer retention period. On Tue, Jun 2, 2015 at 9:37 AM, Henry Cai h...@pinterest.com.invalid h...@pinterest.com.invalid Henry Cai h...@pinterest.com.INVALID June 2, 2015 at 12:37 PM We have been hosting kafka brokers in Amazon EC2 and we are using EBS disk. But periodically we were hit by long I/O wait time on EBS in some Availability Zones. We are thinking to change the instance types to a local HDD or local SSD. HDD is cheaper and bigger and seems quite fit for the Kafka use case which is mostly sequential read/write, but some early experiments show the HDD cannot catch up with the message producing speed since there are many topic/partitions on the broker which actually makes the disk I/O more randomly accessed. How are people's experience of choosing disk types on Amazon?
Re: HDD or SSD or EBS for kafka brokers in Amazon EC2
Does it still matter whether we are using Ubuntu 14 or 12? On Tue, Jun 16, 2015 at 8:44 AM, Wesley Chow w...@chartbeat.com wrote: A call with Amazon confirmed instability for d2 and c4 instances triggered by lots of network activity. They fixed the problem and have since rolled it out. We've been running Kafka with d2's for a little while now and so far so good. Wes On Tue, Jun 2, 2015 at 1:39 PM, Wes Chow w...@chartbeat.com wrote: We have run d2 instances with Kafka. They're currently unstable -- Amazon confirmed a host issue with d2 instances that gets tickled by a Kafka workload yesterday. Otherwise, it seems the d2 instance type is ideal as it gets an enormous amount of disk throughput and you'll likely be network bottlenecked. Wes Steven Wu stevenz...@gmail.com June 2, 2015 at 1:07 PM EBS (network attached storage) has got a lot better over the last a few years. we don't quite trust it for kafka workload. At Netflix, we were going with the new d2 instance type (HDD). our perf/load testing shows it satisfy our workload. SSD is better in latency curve but pretty comparable in terms of throughput. we can use the extra space from HDD for longer retention period. On Tue, Jun 2, 2015 at 9:37 AM, Henry Cai h...@pinterest.com.invalid h...@pinterest.com.invalid Henry Cai h...@pinterest.com.INVALID June 2, 2015 at 12:37 PM We have been hosting kafka brokers in Amazon EC2 and we are using EBS disk. But periodically we were hit by long I/O wait time on EBS in some Availability Zones. We are thinking to change the instance types to a local HDD or local SSD. HDD is cheaper and bigger and seems quite fit for the Kafka use case which is mostly sequential read/write, but some early experiments show the HDD cannot catch up with the message producing speed since there are many topic/partitions on the broker which actually makes the disk I/O more randomly accessed. How are people's experience of choosing disk types on Amazon?
Re: HDD or SSD or EBS for kafka brokers in Amazon EC2
A call with Amazon confirmed instability for d2 and c4 instances triggered by lots of network activity. They fixed the problem and have since rolled it out. We've been running Kafka with d2's for a little while now and so far so good. Wes On Tue, Jun 2, 2015 at 1:39 PM, Wes Chow w...@chartbeat.com wrote: We have run d2 instances with Kafka. They're currently unstable -- Amazon confirmed a host issue with d2 instances that gets tickled by a Kafka workload yesterday. Otherwise, it seems the d2 instance type is ideal as it gets an enormous amount of disk throughput and you'll likely be network bottlenecked. Wes Steven Wu stevenz...@gmail.com June 2, 2015 at 1:07 PM EBS (network attached storage) has got a lot better over the last a few years. we don't quite trust it for kafka workload. At Netflix, we were going with the new d2 instance type (HDD). our perf/load testing shows it satisfy our workload. SSD is better in latency curve but pretty comparable in terms of throughput. we can use the extra space from HDD for longer retention period. On Tue, Jun 2, 2015 at 9:37 AM, Henry Cai h...@pinterest.com.invalid h...@pinterest.com.invalid Henry Cai h...@pinterest.com.INVALID June 2, 2015 at 12:37 PM We have been hosting kafka brokers in Amazon EC2 and we are using EBS disk. But periodically we were hit by long I/O wait time on EBS in some Availability Zones. We are thinking to change the instance types to a local HDD or local SSD. HDD is cheaper and bigger and seems quite fit for the Kafka use case which is mostly sequential read/write, but some early experiments show the HDD cannot catch up with the message producing speed since there are many topic/partitions on the broker which actually makes the disk I/O more randomly accessed. How are people's experience of choosing disk types on Amazon?
Re: QuickStart OK locally, but getting WARN Property topic is not valid and LeaderNotAvailableException remotely
The topic warning is a bug (i.e the fact that you get a warning on perfectly valid parameter). We fixed it for next release. It is also unrelated to the real issue with the LeaderNotAvailable On Tue, Jun 16, 2015 at 2:08 PM, Mike Bridge m...@bridgecanada.com wrote: I am able to get a simple one-node Kafka (kafka_2.11-0.8.2.1) working locally on one linux machine, but when I try to run a producer remotely I'm getting some confusing errors. I'm following the quickstart guide at http://kafka.apache.org/documentation.html#quickstart. I stopped the kafka processes and deleted all the zookeeper karma files in /tmp. I am on a local 10.0.0.0/24 network NAT-ed with an external IP address, so I modified `server.properties` to tell zookeeper how to broadcast my external address: advertised.host.name=MY.EXTERNAL.IP Then I'm running this: $ bin/zookeeper-server-start.sh config/zookeeper.properties -- ... $ export KAFKA_HEAP_OPTS=-Xmx256M -Xms128M # small test server! $ bin/kafka-server-start.sh config/server.properties -- ... I opened up the firewall for my producer on the remote machine, and created a new topic and verified it: $ bin/kafka-topics.sh --create --zookeeper MY.EXTERNAL.IP:2181 --replication-factor 1 --partitions 1 --topic test123 -- Created topic test123. $ bin/kafka-topics.sh --list --zookeeper MY.EXTERNAL.IP:2181 -- test123 However, the producer I'm running remotely gives me errors: $ bin/kafka-console-producer.sh --broker-list MY.EXTERNAL.IP:9092 --topic test123 -- [2015-06-16 14:41:19,757] WARN Property topic is not valid (kafka.utils.VerifiableProperties) My Test Message -- [2015-06-16 14:42:43,347] WARN Error while fetching metadata [{TopicMetadata for topic test123 - No partition metadata for topic test123 due to kafka.common.LeaderNotAvailableException}] for topic [test123]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) -- (repeated several times) (I disabled the whole firewall to make sure that wasn't the problem.) The stdout errors in the karma-startup are repeated: `[2015-06-16 20:42:42,768] INFO Closing socket connection to /MY.EXTERNAL.IP. (kafka.network.Processor)` And the controller.log gives me this, several times: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2015-06-16 20:44:08,128] INFO [Controller-0-to-broker-0-send-thread], Controller 0 connected to id:0,host:MY.EXTERNAL.IP,port:9092 for sending state change requests (kafka.controller.RequestSendThread) [2015-06-16 20:44:08,428] WARN [Controller-0-to-broker-0-send-thread], Controller 0 epoch 1 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:1;CorrelationId:7;ClientId:id_0-host_null-port_9092;Leaders:id:0,host:MY.EXTERNAL.IP,port:9092;PartitionState:(test123,0) - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:MY.EXTERNAL.IP,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) Any idea what might be wrong? -Mike
QuickStart OK locally, but getting WARN Property topic is not valid and LeaderNotAvailableException remotely
I am able to get a simple one-node Kafka (kafka_2.11-0.8.2.1) working locally on one linux machine, but when I try to run a producer remotely I'm getting some confusing errors. I'm following the quickstart guide at http://kafka.apache.org/documentation.html#quickstart. I stopped the kafka processes and deleted all the zookeeper karma files in /tmp. I am on a local 10.0.0.0/24 network NAT-ed with an external IP address, so I modified `server.properties` to tell zookeeper how to broadcast my external address: advertised.host.name=MY.EXTERNAL.IP Then I'm running this: $ bin/zookeeper-server-start.sh config/zookeeper.properties -- ... $ export KAFKA_HEAP_OPTS=-Xmx256M -Xms128M # small test server! $ bin/kafka-server-start.sh config/server.properties -- ... I opened up the firewall for my producer on the remote machine, and created a new topic and verified it: $ bin/kafka-topics.sh --create --zookeeper MY.EXTERNAL.IP:2181 --replication-factor 1 --partitions 1 --topic test123 -- Created topic test123. $ bin/kafka-topics.sh --list --zookeeper MY.EXTERNAL.IP:2181 -- test123 However, the producer I'm running remotely gives me errors: $ bin/kafka-console-producer.sh --broker-list MY.EXTERNAL.IP:9092 --topic test123 -- [2015-06-16 14:41:19,757] WARN Property topic is not valid (kafka.utils.VerifiableProperties) My Test Message -- [2015-06-16 14:42:43,347] WARN Error while fetching metadata [{TopicMetadata for topic test123 - No partition metadata for topic test123 due to kafka.common.LeaderNotAvailableException}] for topic [test123]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) -- (repeated several times) (I disabled the whole firewall to make sure that wasn't the problem.) The stdout errors in the karma-startup are repeated: `[2015-06-16 20:42:42,768] INFO Closing socket connection to /MY.EXTERNAL.IP. (kafka.network.Processor)` And the controller.log gives me this, several times: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2015-06-16 20:44:08,128] INFO [Controller-0-to-broker-0-send-thread], Controller 0 connected to id:0,host:MY.EXTERNAL.IP,port:9092 for sending state change requests (kafka.controller.RequestSendThread) [2015-06-16 20:44:08,428] WARN [Controller-0-to-broker-0-send-thread], Controller 0 epoch 1 fails to send request Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:1;CorrelationId:7;ClientId:id_0-host_null-port_9092;Leaders:id:0,host:MY.EXTERNAL.IP,port:9092;PartitionState:(test123,0) - (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0) to broker id:0,host:MY.EXTERNAL.IP,port:9092. Reconnecting to broker. (kafka.controller.RequestSendThread) Any idea what might be wrong? -Mike
Re: Broken auto leader rebalance after using reassign partitions tool
Ah :) See how the first replica in your replicas list is always either 1 or 2? This means that after re-assignment, this will be the leader (and the preferred leader) for these partitions. Which means that Kafka will keep trying to rebalance leaders to those replicas (since they are preferred). You just need to permute the order of replicas in each set to be more balanced. Gwen On Tue, Jun 16, 2015 at 5:58 AM, Valentin kafka-...@sblk.de wrote: Hi Gwen, sure, the following commands were executed: ./kafka-reassign-partitions.sh --zookeeper XXX --reassignment-json-file ~/partition_redist.json --execute ./kafka-reassign-partitions.sh --zookeeper XXX --reassignment-json-file ~/partition_redist.json --verify The contents of partition_redist.json are: { partitions: [ { topic: T1, partition: 0, replicas: [1,3,5] }, { topic: T1, partition: 1, replicas: [2,4,6] }, { topic: T1, partition: 2, replicas: [1,3,5] }, { topic: T1, partition: 3, replicas: [2,4,6] }, { topic: T1, partition: 4, replicas: [1,3,5] }, { topic: T1, partition: 5, replicas: [2,4,6] }, { topic: T1, partition: 6, replicas: [1,3,5] }, { topic: T1, partition: 7, replicas: [2,4,6] }, { topic: T1, partition: 8, replicas: [1,3,5] }, { topic: T1, partition: 9, replicas: [2,4,6] }, { topic: T2, partition: 0, replicas: [1,3,5] }, { topic: T2, partition: 1, replicas: [2,4,6] }, { topic: T2, partition: 2, replicas: [1,3,5] }, { topic: T2, partition: 3, replicas: [2,4,6] }, { topic: T2, partition: 4, replicas: [1,3,5] }, { topic: T2, partition: 5, replicas: [2,4,6] }, { topic: T2, partition: 6, replicas: [1,3,5] }, { topic: T2, partition: 7, replicas: [2,4,6] }, { topic: T2, partition: 8, replicas: [1,3,5] }, { topic: T2, partition: 9, replicas: [2,4,6] }, { topic: T3, partition: 0, replicas: [1,3,5] }, { topic: T3, partition: 1, replicas: [2,4,6] }, { topic: T3, partition: 2, replicas: [1,3,5] }, { topic: T3, partition: 3, replicas: [2,4,6] }, { topic: T3, partition: 4, replicas: [1,3,5] }, { topic: T3, partition: 5, replicas: [2,4,6] }, { topic: T3, partition: 6, replicas: [1,3,5] }, { topic: T3, partition: 7, replicas: [2,4,6] }, { topic: T3, partition: 8, replicas: [1,3,5] }, { topic: T3, partition: 9, replicas: [2,4,6] }, { topic: T4, partition: 0, replicas: [1,3,5] }, { topic: T4, partition: 1, replicas: [2,4,6] }, { topic: T4, partition: 2, replicas: [1,3,5] }, { topic: T4, partition: 3, replicas: [2,4,6] }, { topic: T4, partition: 4, replicas: [1,3,5] }, { topic: T4, partition: 5, replicas: [2,4,6] }, { topic: T4, partition: 6, replicas: [1,3,5] }, { topic: T4, partition: 7, replicas: [2,4,6] }, { topic: T4, partition: 8, replicas: [1,3,5] }, { topic: T4, partition: 9, replicas: [2,4,6] }, { topic: T5, partition: 0, replicas: [1,3,5] }, { topic: T5, partition: 1, replicas: [2,4,6] }, { topic: T5, partition: 2, replicas: [1,3,5] }, { topic: T5, partition: 3, replicas: [2,4,6] }, { topic: T5, partition: 4, replicas: [1,3,5] }, { topic: T5, partition: 5, replicas: [2,4,6] }, { topic: T5, partition: 6, replicas: [1,3,5] }, { topic: T5, partition: 7, replicas: [2,4,6] }, { topic: T5, partition: 8, replicas: [1,3,5] }, { topic: T5, partition: 9, replicas: [2,4,6] }, { topic: T6, partition: 0, replicas: [1,3,5] }, { topic: T6, partition: 1, replicas: [2,4,6] }, { topic: T6, partition: 2, replicas: [1,3,5] }, { topic: T6, partition: 3, replicas: [2,4,6] }, { topic: T6, partition: 4, replicas: [1,3,5] }, { topic: T6, partition: 5, replicas: [2,4,6] }, { topic: T6, partition: 6, replicas: [1,3,5] }, { topic: T6, partition: 7, replicas: [2,4,6] }, { topic: T6, partition: 8, replicas: [1,3,5] }, { topic: T6, partition: 9, replicas: [2,4,6] }, { topic: Search, partition: 0, replicas: [1,3,5] }, { topic: Search, partition: 1, replicas: [2,4,6] }, { topic: Search, partition: 2, replicas: [1,3,5] }, { topic: Search, partition: 3, replicas: [2,4,6] }, { topic: Search, partition: 4, replicas: [1,3,5] }, { topic: Search, partition: 5, replicas: [2,4,6] }, { topic: Search, partition: 6, replicas: [1,3,5] }, { topic: Search, partition: 7, replicas: [2,4,6] }, { topic: Search, partition: 8, replicas: [1,3,5] }, { topic: Search, partition: 9, replicas: [2,4,6] } ], version: 1 } Greetings Valentin -Original Message- From: Gwen Shapira gshap...@cloudera.com Reply-To: users@kafka.apache.org users@kafka.apache.org Date: Montag, 15. Juni 2015 18:31 To: users@kafka.apache.org users@kafka.apache.org Subject: Re: Broken auto leader rebalance after using reassign partitions tool Can you share the command you ran for partition reassignment? (and the JSON) On Mon, Jun 15, 2015 at
subscribe kafka
duplicate messages at consumer
Hi, While testing message delivery using kafka, I realized that few duplicate messages got delivered by the consumers in the same consumer group (two consumers got the same message with few milli-seconds difference). However, I do not see any redundancy at the producer or broker. One more observation is that - this is not happening when I use only one consumer thread. I am running 3 brokers (0.8.2.1) with 3 Zookeeper nodes. There are 3 partitions in the topic and replication-factor is 3. For producing, am using New Producer with compression.type=none. On the consumer end, I have 3 High level consumers in the same consumer group running with one consumer thread each, on three different hosts. Auto commit is set to true for consumer. Size of each message would range anywhere between 0.7 KB and 2 MB. The max volume for this test is 100 messages/hr. I looked at controller log for any possibility of consumer rebalance during this time, but did not find any. In the server log of all the brokers the error - java.io.IOException: Connection reset by peer is almost being written continuously. So, is it possible to achieve exactly-once delivery with the current high level consumer without needing an extra layer to remove redundancy? Could you please point me to any settings or logs that would help me tune the configuration ? *PS: I tried searching for similar discussions, but could not find any. If its already been answered, please provide the link. Thanks, Kris
Re: Offset management: client vs broker side responsibility
Found out that there is standard API for retrieving and committing offsets (see https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka ) Problem is that the server/broker side is not extensible (see https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L142 ) - i.e. there is no API one can implement and deploy/configure together with Kafka binary with support for handling unsupported or overriding handling of already supported offsetCommitRequest.versionId/offsetFetchRequest.versionId It does not prevent one to implement custom offset management on client side (instead of using standard API to commit and retrieve offsets, one can directly talk with custom offset store) but then problem arises that no commercial or FOSS kafka monitoring solution support it out of the box. I know I would, but the question to Apache Kafka community is would you like to have Kafka broker commit/fetch extensible, and then also what committers think about this? Kind regards, Stevo Slavic. On Tue, Jun 2, 2015 at 7:11 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, I haven't followed the changes to offset tracking closely, other than that storing them in ZK is not the only option any more. I think what Stevo is asking about/suggesting is that there there be a single API from which offset information can be retrieved (e.g. by monitoring tools), so that monitoring tools work regardless of where one chose to store offsets. I know we'd love to have this for SPM's Kafka monitoring and can tell you that adding support for N different APIs for N different offset storage systems would be hard/time-consuming/expensive. But maybe this single API already exists? Thanks, Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Mon, Jun 1, 2015 at 4:41 PM, Jason Rosenberg j...@squareup.com wrote: Stevo, Both of the main solutions used by the high-level consumer are standardized and supported directly by the kafka client libraries (e.g. maintaining offsets in zookeeper or in kafka itself). And for the zk case, there is the consumer offset checker (which is good for monitoring). Consumer offset checker still needs to be extended for offsets stored in kafka _consumer_offset topics though. Anyway, I'm not sure I understand your question, you want something for better monitoring of all possible clients (some of which might choose to manage offsets in their own way)? It's just not part of the kafka design to directly track individual consumers. Jason On Wed, May 27, 2015 at 7:42 AM, Shady Xu shad...@gmail.com wrote: I guess adding a new component will increase the complexity of the system structure. And if the new component consists of one or a few nodes, it may becomes the bottleneck of the whole system, if it consists of many nodes, it will make the system even more complex. Although every solution has its downsides, I think the current one is decent. 2015-05-27 17:10 GMT+08:00 Stevo Slavić ssla...@gmail.com: It could be a separate server component, does not have to be monolith/coupled with broker. Such solution would have benefits - single API, pluggable implementations. On Wed, May 27, 2015 at 8:57 AM, Shady Xu shad...@gmail.com wrote: Storing and managing offsets by broker will leave high pressure on the brokers which will affect the performance of the cluster. You can use the advanced consumer APIs, then you can get the offsets either from zookeeper or the __consumer_offsets__ topic. On the other hand, if you use the simple consumer APIs, you mean to manage offsets yourself, then you should monitor them yourself, simple and plain, right? 2015-04-22 14:36 GMT+08:00 Stevo Slavić ssla...@gmail.com: Hello Apache Kafka community, Please correct me if wrong, AFAIK currently (Kafka 0.8.2.x) offset management responsibility is mainly client/consumer side responsibility. Wouldn't it be better if it was broker side only responsibility? E.g. now if one wants to use custom offset management, any of the Kafka monitoring tools cannot see the offsets - they would need to use same custom client implementation which is practically not possible. Kind regards, Stevo Slavic.