Re: Log compaction not working as expected

2015-06-16 Thread Shayne S
Some further information, and is this a bug?  I'm using 0.8.2.1.

Log compaction will only occur on the non active segments.  Intentional or
not, it seems that the last segment is always the active segment.  In other
words, an expired segment will not be cleaned until a new segment has been
created.

As a result, a log won't be compacted until new data comes in (per
partition). Does this mean I need to send the equivalent of a pig (
https://en.wikipedia.org/wiki/Pigging) through each partition in order to
force compaction?  Or can I force the cleaning somehow?

Here are the steps to recreate:

1. Create a new topic with a 5 minute segment.ms:

kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC
--replication-factor 1 --partitions 1 --config cleanup.policy=compact
--config min.cleanable.dirty.ratio=0.01 --config segment.ms=30

2. Repeatedly add messages with identical keys (3x):

echo ABC123,{\test\: 1} | kafka-console-producer.sh --broker-list
localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
key.separator=, --new-producer

3. Wait 5+ minutes and confirm no log compaction.
4. Once satisfied, send a new message:

echo DEF456,{\test\: 1} | kafka-console-producer.sh --broker-list
localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
key.separator=, --new-producer

5. Log compaction will occur quickly soon after.

Is my use case of infrequent logs not supported? Is this intentional
behavior? It's unnecessarily challenging to target each partition with a
dummy message to trigger compaction.

Also, I believe there is another issue with logs originally configured
without a segment timeout that lead to my original issue.  I still cannot
get those logs to compact.

Thanks!
Shayne


Re: Broken auto leader rebalance after using reassign partitions tool

2015-06-16 Thread Valentin

Hi Gwen,

sure, the following commands were executed:
./kafka-reassign-partitions.sh --zookeeper XXX --reassignment-json-file
~/partition_redist.json --execute
./kafka-reassign-partitions.sh --zookeeper XXX --reassignment-json-file
~/partition_redist.json --verify

The contents of partition_redist.json are:
{
  partitions:
  [
{ topic: T1, partition: 0, replicas: [1,3,5] },
{ topic: T1, partition: 1, replicas: [2,4,6] },
{ topic: T1, partition: 2, replicas: [1,3,5] },
{ topic: T1, partition: 3, replicas: [2,4,6] },
{ topic: T1, partition: 4, replicas: [1,3,5] },
{ topic: T1, partition: 5, replicas: [2,4,6] },
{ topic: T1, partition: 6, replicas: [1,3,5] },
{ topic: T1, partition: 7, replicas: [2,4,6] },
{ topic: T1, partition: 8, replicas: [1,3,5] },
{ topic: T1, partition: 9, replicas: [2,4,6] },

{ topic: T2, partition: 0, replicas: [1,3,5] },
{ topic: T2, partition: 1, replicas: [2,4,6] },
{ topic: T2, partition: 2, replicas: [1,3,5] },
{ topic: T2, partition: 3, replicas: [2,4,6] },
{ topic: T2, partition: 4, replicas: [1,3,5] },
{ topic: T2, partition: 5, replicas: [2,4,6] },
{ topic: T2, partition: 6, replicas: [1,3,5] },
{ topic: T2, partition: 7, replicas: [2,4,6] },
{ topic: T2, partition: 8, replicas: [1,3,5] },
{ topic: T2, partition: 9, replicas: [2,4,6] },

{ topic: T3, partition: 0, replicas: [1,3,5] },
{ topic: T3, partition: 1, replicas: [2,4,6] },
{ topic: T3, partition: 2, replicas: [1,3,5] },
{ topic: T3, partition: 3, replicas: [2,4,6] },
{ topic: T3, partition: 4, replicas: [1,3,5] },
{ topic: T3, partition: 5, replicas: [2,4,6] },
{ topic: T3, partition: 6, replicas: [1,3,5] },
{ topic: T3, partition: 7, replicas: [2,4,6] },
{ topic: T3, partition: 8, replicas: [1,3,5] },
{ topic: T3, partition: 9, replicas: [2,4,6] },

{ topic: T4, partition: 0, replicas: [1,3,5] },
{ topic: T4, partition: 1, replicas: [2,4,6] },
{ topic: T4, partition: 2, replicas: [1,3,5] },
{ topic: T4, partition: 3, replicas: [2,4,6] },
{ topic: T4, partition: 4, replicas: [1,3,5] },
{ topic: T4, partition: 5, replicas: [2,4,6] },
{ topic: T4, partition: 6, replicas: [1,3,5] },
{ topic: T4, partition: 7, replicas: [2,4,6] },
{ topic: T4, partition: 8, replicas: [1,3,5] },
{ topic: T4, partition: 9, replicas: [2,4,6] },

{ topic: T5, partition: 0, replicas: [1,3,5] },
{ topic: T5, partition: 1, replicas: [2,4,6] },
{ topic: T5, partition: 2, replicas: [1,3,5] },
{ topic: T5, partition: 3, replicas: [2,4,6] },
{ topic: T5, partition: 4, replicas: [1,3,5] },
{ topic: T5, partition: 5, replicas: [2,4,6] },
{ topic: T5, partition: 6, replicas: [1,3,5] },
{ topic: T5, partition: 7, replicas: [2,4,6] },
{ topic: T5, partition: 8, replicas: [1,3,5] },
{ topic: T5, partition: 9, replicas: [2,4,6] },

{ topic: T6, partition: 0, replicas: [1,3,5] },
{ topic: T6, partition: 1, replicas: [2,4,6] },
{ topic: T6, partition: 2, replicas: [1,3,5] },
{ topic: T6, partition: 3, replicas: [2,4,6] },
{ topic: T6, partition: 4, replicas: [1,3,5] },
{ topic: T6, partition: 5, replicas: [2,4,6] },
{ topic: T6, partition: 6, replicas: [1,3,5] },
{ topic: T6, partition: 7, replicas: [2,4,6] },
{ topic: T6, partition: 8, replicas: [1,3,5] },
{ topic: T6, partition: 9, replicas: [2,4,6] },

{ topic: Search, partition: 0, replicas: [1,3,5] },
{ topic: Search, partition: 1, replicas: [2,4,6] },
{ topic: Search, partition: 2, replicas: [1,3,5] },
{ topic: Search, partition: 3, replicas: [2,4,6] },
{ topic: Search, partition: 4, replicas: [1,3,5] },
{ topic: Search, partition: 5, replicas: [2,4,6] },
{ topic: Search, partition: 6, replicas: [1,3,5] },
{ topic: Search, partition: 7, replicas: [2,4,6] },
{ topic: Search, partition: 8, replicas: [1,3,5] },
{ topic: Search, partition: 9, replicas: [2,4,6] }
  ],
  version: 1
}

Greetings
Valentin

-Original Message-
From: Gwen Shapira gshap...@cloudera.com
Reply-To: users@kafka.apache.org users@kafka.apache.org
Date: Montag, 15. Juni 2015 18:31
To: users@kafka.apache.org users@kafka.apache.org
Subject: Re: Broken auto leader rebalance after using reassign partitions
tool

Can you share the command you ran for partition reassignment? (and the
JSON)


On Mon, Jun 15, 2015 at 8:41 AM, Valentin kafka-...@sblk.de wrote:

Hi guys,

today I have observed a very strange behavior of the auto leader rebalance
feature after I used the reassign partitions tool.
For some reason only the first two of my six brokers are now used as
leaders.

Example:
# ./kafka-topics.sh --zookeeper xxx --describe --topic Search
Topic:SearchPartitionCount:10   ReplicationFactor:3 Configs:
 Topic: Search   Partition: 0Leader: 1   Replicas: 1,3,5
Isr: 5,3,1
 Topic: Search   Partition: 1Leader: 2   

Re: Log compaction not working as expected

2015-06-16 Thread Manikumar Reddy
Hi,

  Your observation is correct.  we never compact the active segment.
  Some improvements are proposed here,
  https://issues.apache.org/jira/browse/KAFKA-1981


Manikumar

On Tue, Jun 16, 2015 at 5:35 PM, Shayne S shaynest...@gmail.com wrote:

 Some further information, and is this a bug?  I'm using 0.8.2.1.

 Log compaction will only occur on the non active segments.  Intentional or
 not, it seems that the last segment is always the active segment.  In other
 words, an expired segment will not be cleaned until a new segment has been
 created.

 As a result, a log won't be compacted until new data comes in (per
 partition). Does this mean I need to send the equivalent of a pig (
 https://en.wikipedia.org/wiki/Pigging) through each partition in order to
 force compaction?  Or can I force the cleaning somehow?

 Here are the steps to recreate:

 1. Create a new topic with a 5 minute segment.ms:

 kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC
 --replication-factor 1 --partitions 1 --config cleanup.policy=compact
 --config min.cleanable.dirty.ratio=0.01 --config segment.ms=30

 2. Repeatedly add messages with identical keys (3x):

 echo ABC123,{\test\: 1} | kafka-console-producer.sh --broker-list
 localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
 key.separator=, --new-producer

 3. Wait 5+ minutes and confirm no log compaction.
 4. Once satisfied, send a new message:

 echo DEF456,{\test\: 1} | kafka-console-producer.sh --broker-list
 localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
 key.separator=, --new-producer

 5. Log compaction will occur quickly soon after.

 Is my use case of infrequent logs not supported? Is this intentional
 behavior? It's unnecessarily challenging to target each partition with a
 dummy message to trigger compaction.

 Also, I believe there is another issue with logs originally configured
 without a segment timeout that lead to my original issue.  I still cannot
 get those logs to compact.

 Thanks!
 Shayne



Re: Log compaction not working as expected

2015-06-16 Thread Manikumar Reddy
Ok..I got your point. Currently we check the log segment constraints
(segment.bytes, segment.ms)
only before appending new messages. So we will not create a new log segment
until new data comes.

In your case, your approach(sending periodic dummy/ping message) should be
fine.



On Tue, Jun 16, 2015 at 7:19 PM, Shayne S shaynest...@gmail.com wrote:

 Thank you for the response!

 Unfortunately, those improvements would not help.  It is the lack of
 activity resulting in a new segment that prevents compaction.

 I was confused by what qualifies as the active segment. The active segment
 is the last segment as opposed to the segment that would be written to if
 something were received right now.

 On Tue, Jun 16, 2015 at 8:38 AM, Manikumar Reddy ku...@nmsworks.co.in
 wrote:

  Hi,
 
Your observation is correct.  we never compact the active segment.
Some improvements are proposed here,
https://issues.apache.org/jira/browse/KAFKA-1981
 
 
  Manikumar
 
  On Tue, Jun 16, 2015 at 5:35 PM, Shayne S shaynest...@gmail.com wrote:
 
   Some further information, and is this a bug?  I'm using 0.8.2.1.
  
   Log compaction will only occur on the non active segments.  Intentional
  or
   not, it seems that the last segment is always the active segment.  In
  other
   words, an expired segment will not be cleaned until a new segment has
  been
   created.
  
   As a result, a log won't be compacted until new data comes in (per
   partition). Does this mean I need to send the equivalent of a pig (
   https://en.wikipedia.org/wiki/Pigging) through each partition in order
  to
   force compaction?  Or can I force the cleaning somehow?
  
   Here are the steps to recreate:
  
   1. Create a new topic with a 5 minute segment.ms:
  
   kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC
   --replication-factor 1 --partitions 1 --config cleanup.policy=compact
   --config min.cleanable.dirty.ratio=0.01 --config segment.ms=30
  
   2. Repeatedly add messages with identical keys (3x):
  
   echo ABC123,{\test\: 1} | kafka-console-producer.sh --broker-list
   localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
   key.separator=, --new-producer
  
   3. Wait 5+ minutes and confirm no log compaction.
   4. Once satisfied, send a new message:
  
   echo DEF456,{\test\: 1} | kafka-console-producer.sh --broker-list
   localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
   key.separator=, --new-producer
  
   5. Log compaction will occur quickly soon after.
  
   Is my use case of infrequent logs not supported? Is this intentional
   behavior? It's unnecessarily challenging to target each partition with
 a
   dummy message to trigger compaction.
  
   Also, I believe there is another issue with logs originally configured
   without a segment timeout that lead to my original issue.  I still
 cannot
   get those logs to compact.
  
   Thanks!
   Shayne
  
 



Re: Log compaction not working as expected

2015-06-16 Thread Shayne S
Thank you for the response!

Unfortunately, those improvements would not help.  It is the lack of
activity resulting in a new segment that prevents compaction.

I was confused by what qualifies as the active segment. The active segment
is the last segment as opposed to the segment that would be written to if
something were received right now.

On Tue, Jun 16, 2015 at 8:38 AM, Manikumar Reddy ku...@nmsworks.co.in
wrote:

 Hi,

   Your observation is correct.  we never compact the active segment.
   Some improvements are proposed here,
   https://issues.apache.org/jira/browse/KAFKA-1981


 Manikumar

 On Tue, Jun 16, 2015 at 5:35 PM, Shayne S shaynest...@gmail.com wrote:

  Some further information, and is this a bug?  I'm using 0.8.2.1.
 
  Log compaction will only occur on the non active segments.  Intentional
 or
  not, it seems that the last segment is always the active segment.  In
 other
  words, an expired segment will not be cleaned until a new segment has
 been
  created.
 
  As a result, a log won't be compacted until new data comes in (per
  partition). Does this mean I need to send the equivalent of a pig (
  https://en.wikipedia.org/wiki/Pigging) through each partition in order
 to
  force compaction?  Or can I force the cleaning somehow?
 
  Here are the steps to recreate:
 
  1. Create a new topic with a 5 minute segment.ms:
 
  kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC
  --replication-factor 1 --partitions 1 --config cleanup.policy=compact
  --config min.cleanable.dirty.ratio=0.01 --config segment.ms=30
 
  2. Repeatedly add messages with identical keys (3x):
 
  echo ABC123,{\test\: 1} | kafka-console-producer.sh --broker-list
  localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
  key.separator=, --new-producer
 
  3. Wait 5+ minutes and confirm no log compaction.
  4. Once satisfied, send a new message:
 
  echo DEF456,{\test\: 1} | kafka-console-producer.sh --broker-list
  localhost:9092 --topic TEST_TOPIC --property parse.key=true --property
  key.separator=, --new-producer
 
  5. Log compaction will occur quickly soon after.
 
  Is my use case of infrequent logs not supported? Is this intentional
  behavior? It's unnecessarily challenging to target each partition with a
  dummy message to trigger compaction.
 
  Also, I believe there is another issue with logs originally configured
  without a segment timeout that lead to my original issue.  I still cannot
  get those logs to compact.
 
  Thanks!
  Shayne
 



Re: HDD or SSD or EBS for kafka brokers in Amazon EC2

2015-06-16 Thread Wesley Chow
Should not matter. We're running 12.04.

Wes
 On Jun 16, 2015 12:18 PM, Henry Cai h...@pinterest.com.invalid wrote:

 Does it still matter whether we are using Ubuntu 14 or 12?

 On Tue, Jun 16, 2015 at 8:44 AM, Wesley Chow w...@chartbeat.com wrote:

 
  A call with Amazon confirmed instability for d2 and c4 instances
 triggered
  by lots of network activity. They fixed the problem and have since rolled
  it out. We've been running Kafka with d2's for a little while now and so
  far so good.
 
  Wes
 
 
  On Tue, Jun 2, 2015 at 1:39 PM, Wes Chow w...@chartbeat.com wrote:
 
 
  We have run d2 instances with Kafka. They're currently unstable --
 Amazon
  confirmed a host issue with d2 instances that gets tickled by a Kafka
  workload yesterday. Otherwise, it seems the d2 instance type is ideal
 as it
  gets an enormous amount of disk throughput and you'll likely be network
  bottlenecked.
 
  Wes
 
 
Steven Wu stevenz...@gmail.com
   June 2, 2015 at 1:07 PM
  EBS (network attached storage) has got a lot better over the last a few
  years. we don't quite trust it for kafka workload.
 
  At Netflix, we were going with the new d2 instance type (HDD). our
  perf/load testing shows it satisfy our workload. SSD is better in
 latency
  curve but pretty comparable in terms of throughput. we can use the extra
  space from HDD for longer retention period.
 
  On Tue, Jun 2, 2015 at 9:37 AM, Henry Cai h...@pinterest.com.invalid
  h...@pinterest.com.invalid
 
Henry Cai h...@pinterest.com.INVALID
   June 2, 2015 at 12:37 PM
  We have been hosting kafka brokers in Amazon EC2 and we are using EBS
  disk. But periodically we were hit by long I/O wait time on EBS in some
  Availability Zones.
 
  We are thinking to change the instance types to a local HDD or local
 SSD.
  HDD is cheaper and bigger and seems quite fit for the Kafka use case
 which
  is mostly sequential read/write, but some early experiments show the HDD
  cannot catch up with the message producing speed since there are many
  topic/partitions on the broker which actually makes the disk I/O more
  randomly accessed.
 
  How are people's experience of choosing disk types on Amazon?
 
 
 



Re: HDD or SSD or EBS for kafka brokers in Amazon EC2

2015-06-16 Thread Henry Cai
Does it still matter whether we are using Ubuntu 14 or 12?

On Tue, Jun 16, 2015 at 8:44 AM, Wesley Chow w...@chartbeat.com wrote:


 A call with Amazon confirmed instability for d2 and c4 instances triggered
 by lots of network activity. They fixed the problem and have since rolled
 it out. We've been running Kafka with d2's for a little while now and so
 far so good.

 Wes


 On Tue, Jun 2, 2015 at 1:39 PM, Wes Chow w...@chartbeat.com wrote:


 We have run d2 instances with Kafka. They're currently unstable -- Amazon
 confirmed a host issue with d2 instances that gets tickled by a Kafka
 workload yesterday. Otherwise, it seems the d2 instance type is ideal as it
 gets an enormous amount of disk throughput and you'll likely be network
 bottlenecked.

 Wes


   Steven Wu stevenz...@gmail.com
  June 2, 2015 at 1:07 PM
 EBS (network attached storage) has got a lot better over the last a few
 years. we don't quite trust it for kafka workload.

 At Netflix, we were going with the new d2 instance type (HDD). our
 perf/load testing shows it satisfy our workload. SSD is better in latency
 curve but pretty comparable in terms of throughput. we can use the extra
 space from HDD for longer retention period.

 On Tue, Jun 2, 2015 at 9:37 AM, Henry Cai h...@pinterest.com.invalid
 h...@pinterest.com.invalid

   Henry Cai h...@pinterest.com.INVALID
  June 2, 2015 at 12:37 PM
 We have been hosting kafka brokers in Amazon EC2 and we are using EBS
 disk. But periodically we were hit by long I/O wait time on EBS in some
 Availability Zones.

 We are thinking to change the instance types to a local HDD or local SSD.
 HDD is cheaper and bigger and seems quite fit for the Kafka use case which
 is mostly sequential read/write, but some early experiments show the HDD
 cannot catch up with the message producing speed since there are many
 topic/partitions on the broker which actually makes the disk I/O more
 randomly accessed.

 How are people's experience of choosing disk types on Amazon?





Re: HDD or SSD or EBS for kafka brokers in Amazon EC2

2015-06-16 Thread Wesley Chow
A call with Amazon confirmed instability for d2 and c4 instances triggered
by lots of network activity. They fixed the problem and have since rolled
it out. We've been running Kafka with d2's for a little while now and so
far so good.

Wes


On Tue, Jun 2, 2015 at 1:39 PM, Wes Chow w...@chartbeat.com wrote:


 We have run d2 instances with Kafka. They're currently unstable -- Amazon
 confirmed a host issue with d2 instances that gets tickled by a Kafka
 workload yesterday. Otherwise, it seems the d2 instance type is ideal as it
 gets an enormous amount of disk throughput and you'll likely be network
 bottlenecked.

 Wes


   Steven Wu stevenz...@gmail.com
  June 2, 2015 at 1:07 PM
 EBS (network attached storage) has got a lot better over the last a few
 years. we don't quite trust it for kafka workload.

 At Netflix, we were going with the new d2 instance type (HDD). our
 perf/load testing shows it satisfy our workload. SSD is better in latency
 curve but pretty comparable in terms of throughput. we can use the extra
 space from HDD for longer retention period.

 On Tue, Jun 2, 2015 at 9:37 AM, Henry Cai h...@pinterest.com.invalid
 h...@pinterest.com.invalid

   Henry Cai h...@pinterest.com.INVALID
  June 2, 2015 at 12:37 PM
 We have been hosting kafka brokers in Amazon EC2 and we are using EBS
 disk. But periodically we were hit by long I/O wait time on EBS in some
 Availability Zones.

 We are thinking to change the instance types to a local HDD or local SSD.
 HDD is cheaper and bigger and seems quite fit for the Kafka use case which
 is mostly sequential read/write, but some early experiments show the HDD
 cannot catch up with the message producing speed since there are many
 topic/partitions on the broker which actually makes the disk I/O more
 randomly accessed.

 How are people's experience of choosing disk types on Amazon?




Re: QuickStart OK locally, but getting WARN Property topic is not valid and LeaderNotAvailableException remotely

2015-06-16 Thread Gwen Shapira
The topic warning is a bug (i.e the fact that you get a warning on
perfectly valid parameter). We fixed it for next release.

It is also unrelated to the real issue with the LeaderNotAvailable

On Tue, Jun 16, 2015 at 2:08 PM, Mike Bridge m...@bridgecanada.com wrote:
 I am able to get a simple one-node Kafka (kafka_2.11-0.8.2.1) working
 locally on one linux machine, but when I try to run a producer remotely I'm
 getting some confusing errors.

 I'm following the quickstart guide at
 http://kafka.apache.org/documentation.html#quickstart.  I stopped the kafka
 processes and deleted all the zookeeper  karma files in /tmp.  I am on a
 local 10.0.0.0/24 network NAT-ed with an external IP address, so I modified
 `server.properties` to tell zookeeper how to broadcast my external address:

 advertised.host.name=MY.EXTERNAL.IP

 Then I'm running this:

 $ bin/zookeeper-server-start.sh config/zookeeper.properties
 -- ...
 $ export KAFKA_HEAP_OPTS=-Xmx256M -Xms128M # small test server!
 $ bin/kafka-server-start.sh config/server.properties
 -- ...

 I opened up the firewall for my producer on the remote machine, and created
 a new topic and verified it:

 $ bin/kafka-topics.sh --create --zookeeper MY.EXTERNAL.IP:2181
 --replication-factor 1 --partitions 1 --topic test123
 -- Created topic test123.
 $ bin/kafka-topics.sh --list --zookeeper MY.EXTERNAL.IP:2181
 -- test123

 However, the producer I'm running remotely gives me errors:

 $ bin/kafka-console-producer.sh --broker-list MY.EXTERNAL.IP:9092
 --topic test123
 -- [2015-06-16 14:41:19,757] WARN Property topic is not valid
 (kafka.utils.VerifiableProperties)
 My Test Message
 -- [2015-06-16 14:42:43,347] WARN Error while fetching metadata
 [{TopicMetadata for topic test123 -
 No partition metadata for topic test123 due to
 kafka.common.LeaderNotAvailableException}] for topic [test123]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 -- (repeated several times)

 (I disabled the whole firewall to make sure that wasn't the problem.)

 The stdout errors in the karma-startup are repeated: `[2015-06-16
 20:42:42,768] INFO Closing socket connection to /MY.EXTERNAL.IP.
 (kafka.network.Processor)`

 And the controller.log gives me this, several times:

 java.nio.channels.ClosedChannelException
 at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
 at
 kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
 at
 kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 [2015-06-16 20:44:08,128] INFO [Controller-0-to-broker-0-send-thread],
 Controller 0 connected to id:0,host:MY.EXTERNAL.IP,port:9092 for sending
 state change requests (kafka.controller.RequestSendThread)
 [2015-06-16 20:44:08,428] WARN [Controller-0-to-broker-0-send-thread],
 Controller 0 epoch 1 fails to send request
 Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:1;CorrelationId:7;ClientId:id_0-host_null-port_9092;Leaders:id:0,host:MY.EXTERNAL.IP,port:9092;PartitionState:(test123,0)
 -
 (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0)
 to broker id:0,host:MY.EXTERNAL.IP,port:9092. Reconnecting to broker.
 (kafka.controller.RequestSendThread)


 Any idea what might be wrong?

 -Mike


QuickStart OK locally, but getting WARN Property topic is not valid and LeaderNotAvailableException remotely

2015-06-16 Thread Mike Bridge
I am able to get a simple one-node Kafka (kafka_2.11-0.8.2.1) working
locally on one linux machine, but when I try to run a producer remotely I'm
getting some confusing errors.

I'm following the quickstart guide at
http://kafka.apache.org/documentation.html#quickstart.  I stopped the kafka
processes and deleted all the zookeeper  karma files in /tmp.  I am on a
local 10.0.0.0/24 network NAT-ed with an external IP address, so I modified
`server.properties` to tell zookeeper how to broadcast my external address:

advertised.host.name=MY.EXTERNAL.IP

Then I'm running this:

$ bin/zookeeper-server-start.sh config/zookeeper.properties
-- ...
$ export KAFKA_HEAP_OPTS=-Xmx256M -Xms128M # small test server!
$ bin/kafka-server-start.sh config/server.properties
-- ...

I opened up the firewall for my producer on the remote machine, and created
a new topic and verified it:

$ bin/kafka-topics.sh --create --zookeeper MY.EXTERNAL.IP:2181
--replication-factor 1 --partitions 1 --topic test123
-- Created topic test123.
$ bin/kafka-topics.sh --list --zookeeper MY.EXTERNAL.IP:2181
-- test123

However, the producer I'm running remotely gives me errors:

$ bin/kafka-console-producer.sh --broker-list MY.EXTERNAL.IP:9092
--topic test123
-- [2015-06-16 14:41:19,757] WARN Property topic is not valid
(kafka.utils.VerifiableProperties)
My Test Message
-- [2015-06-16 14:42:43,347] WARN Error while fetching metadata
[{TopicMetadata for topic test123 -
No partition metadata for topic test123 due to
kafka.common.LeaderNotAvailableException}] for topic [test123]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
-- (repeated several times)

(I disabled the whole firewall to make sure that wasn't the problem.)

The stdout errors in the karma-startup are repeated: `[2015-06-16
20:42:42,768] INFO Closing socket connection to /MY.EXTERNAL.IP.
(kafka.network.Processor)`

And the controller.log gives me this, several times:

java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
[2015-06-16 20:44:08,128] INFO [Controller-0-to-broker-0-send-thread],
Controller 0 connected to id:0,host:MY.EXTERNAL.IP,port:9092 for sending
state change requests (kafka.controller.RequestSendThread)
[2015-06-16 20:44:08,428] WARN [Controller-0-to-broker-0-send-thread],
Controller 0 epoch 1 fails to send request
Name:LeaderAndIsrRequest;Version:0;Controller:0;ControllerEpoch:1;CorrelationId:7;ClientId:id_0-host_null-port_9092;Leaders:id:0,host:MY.EXTERNAL.IP,port:9092;PartitionState:(test123,0)
-
(LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0)
to broker id:0,host:MY.EXTERNAL.IP,port:9092. Reconnecting to broker.
(kafka.controller.RequestSendThread)


Any idea what might be wrong?

-Mike


Re: Broken auto leader rebalance after using reassign partitions tool

2015-06-16 Thread Gwen Shapira
Ah :)

See how the first replica in your replicas list is always either 1 or 2?
This means that after re-assignment, this will be the leader (and the
preferred leader) for these partitions.
Which means that Kafka will keep trying to rebalance leaders to those
replicas (since they are preferred). You just need to permute the
order of replicas in each set to be more balanced.

Gwen

On Tue, Jun 16, 2015 at 5:58 AM, Valentin kafka-...@sblk.de wrote:

 Hi Gwen,

 sure, the following commands were executed:
 ./kafka-reassign-partitions.sh --zookeeper XXX --reassignment-json-file
 ~/partition_redist.json --execute
 ./kafka-reassign-partitions.sh --zookeeper XXX --reassignment-json-file
 ~/partition_redist.json --verify

 The contents of partition_redist.json are:
 {
   partitions:
   [
 { topic: T1, partition: 0, replicas: [1,3,5] },
 { topic: T1, partition: 1, replicas: [2,4,6] },
 { topic: T1, partition: 2, replicas: [1,3,5] },
 { topic: T1, partition: 3, replicas: [2,4,6] },
 { topic: T1, partition: 4, replicas: [1,3,5] },
 { topic: T1, partition: 5, replicas: [2,4,6] },
 { topic: T1, partition: 6, replicas: [1,3,5] },
 { topic: T1, partition: 7, replicas: [2,4,6] },
 { topic: T1, partition: 8, replicas: [1,3,5] },
 { topic: T1, partition: 9, replicas: [2,4,6] },

 { topic: T2, partition: 0, replicas: [1,3,5] },
 { topic: T2, partition: 1, replicas: [2,4,6] },
 { topic: T2, partition: 2, replicas: [1,3,5] },
 { topic: T2, partition: 3, replicas: [2,4,6] },
 { topic: T2, partition: 4, replicas: [1,3,5] },
 { topic: T2, partition: 5, replicas: [2,4,6] },
 { topic: T2, partition: 6, replicas: [1,3,5] },
 { topic: T2, partition: 7, replicas: [2,4,6] },
 { topic: T2, partition: 8, replicas: [1,3,5] },
 { topic: T2, partition: 9, replicas: [2,4,6] },

 { topic: T3, partition: 0, replicas: [1,3,5] },
 { topic: T3, partition: 1, replicas: [2,4,6] },
 { topic: T3, partition: 2, replicas: [1,3,5] },
 { topic: T3, partition: 3, replicas: [2,4,6] },
 { topic: T3, partition: 4, replicas: [1,3,5] },
 { topic: T3, partition: 5, replicas: [2,4,6] },
 { topic: T3, partition: 6, replicas: [1,3,5] },
 { topic: T3, partition: 7, replicas: [2,4,6] },
 { topic: T3, partition: 8, replicas: [1,3,5] },
 { topic: T3, partition: 9, replicas: [2,4,6] },

 { topic: T4, partition: 0, replicas: [1,3,5] },
 { topic: T4, partition: 1, replicas: [2,4,6] },
 { topic: T4, partition: 2, replicas: [1,3,5] },
 { topic: T4, partition: 3, replicas: [2,4,6] },
 { topic: T4, partition: 4, replicas: [1,3,5] },
 { topic: T4, partition: 5, replicas: [2,4,6] },
 { topic: T4, partition: 6, replicas: [1,3,5] },
 { topic: T4, partition: 7, replicas: [2,4,6] },
 { topic: T4, partition: 8, replicas: [1,3,5] },
 { topic: T4, partition: 9, replicas: [2,4,6] },

 { topic: T5, partition: 0, replicas: [1,3,5] },
 { topic: T5, partition: 1, replicas: [2,4,6] },
 { topic: T5, partition: 2, replicas: [1,3,5] },
 { topic: T5, partition: 3, replicas: [2,4,6] },
 { topic: T5, partition: 4, replicas: [1,3,5] },
 { topic: T5, partition: 5, replicas: [2,4,6] },
 { topic: T5, partition: 6, replicas: [1,3,5] },
 { topic: T5, partition: 7, replicas: [2,4,6] },
 { topic: T5, partition: 8, replicas: [1,3,5] },
 { topic: T5, partition: 9, replicas: [2,4,6] },

 { topic: T6, partition: 0, replicas: [1,3,5] },
 { topic: T6, partition: 1, replicas: [2,4,6] },
 { topic: T6, partition: 2, replicas: [1,3,5] },
 { topic: T6, partition: 3, replicas: [2,4,6] },
 { topic: T6, partition: 4, replicas: [1,3,5] },
 { topic: T6, partition: 5, replicas: [2,4,6] },
 { topic: T6, partition: 6, replicas: [1,3,5] },
 { topic: T6, partition: 7, replicas: [2,4,6] },
 { topic: T6, partition: 8, replicas: [1,3,5] },
 { topic: T6, partition: 9, replicas: [2,4,6] },

 { topic: Search, partition: 0, replicas: [1,3,5] },
 { topic: Search, partition: 1, replicas: [2,4,6] },
 { topic: Search, partition: 2, replicas: [1,3,5] },
 { topic: Search, partition: 3, replicas: [2,4,6] },
 { topic: Search, partition: 4, replicas: [1,3,5] },
 { topic: Search, partition: 5, replicas: [2,4,6] },
 { topic: Search, partition: 6, replicas: [1,3,5] },
 { topic: Search, partition: 7, replicas: [2,4,6] },
 { topic: Search, partition: 8, replicas: [1,3,5] },
 { topic: Search, partition: 9, replicas: [2,4,6] }
   ],
   version: 1
 }

 Greetings
 Valentin

 -Original Message-
 From: Gwen Shapira gshap...@cloudera.com
 Reply-To: users@kafka.apache.org users@kafka.apache.org
 Date: Montag, 15. Juni 2015 18:31
 To: users@kafka.apache.org users@kafka.apache.org
 Subject: Re: Broken auto leader rebalance after using reassign partitions
 tool

 Can you share the command you ran for partition reassignment? (and the
 JSON)


 On Mon, Jun 15, 2015 at 

subscribe kafka

2015-06-16 Thread Jin Wang



duplicate messages at consumer

2015-06-16 Thread Kris K
Hi,

While testing message delivery using kafka, I realized that few duplicate
messages got delivered by the consumers in the same consumer group (two
consumers got the same message with few milli-seconds difference). However,
I do not see any redundancy at the producer or broker. One more observation
is that - this is not happening when I use only one consumer thread.

I am running 3 brokers (0.8.2.1) with 3 Zookeeper nodes. There are 3
partitions in the topic and replication-factor is 3. For producing, am
using New Producer with compression.type=none.

On the consumer end, I have 3 High level consumers in the same consumer
group running with one consumer thread each, on three different hosts. Auto
commit is set to true for consumer.

Size of each message would range anywhere between 0.7 KB and  2 MB. The max
volume for this test is 100 messages/hr.

I looked at controller log for any possibility of consumer rebalance during
this time, but did not find any. In the server log of all the brokers the
error - java.io.IOException: Connection reset by peer is almost being
written continuously.

So, is it possible to achieve exactly-once delivery with the current high
level consumer without needing an extra layer to remove redundancy?

Could you please point me to any settings or logs that would help me tune
the configuration ?

*PS: I tried searching for similar discussions, but could not find any. If
its already been answered, please provide the link.

Thanks,
Kris


Re: Offset management: client vs broker side responsibility

2015-06-16 Thread Stevo Slavić
Found out that there is standard API for retrieving and committing offsets
(see
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
)

Problem is that the server/broker side is not extensible (see
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L142
) - i.e. there is no API one can implement and deploy/configure together
with Kafka binary with support for handling unsupported or overriding
handling of already supported
offsetCommitRequest.versionId/offsetFetchRequest.versionId

It does not prevent one to implement custom offset management on client
side (instead of using standard API to commit and retrieve offsets, one can
directly talk with custom offset store) but then problem arises that no
commercial or FOSS kafka monitoring solution support it out of the box.

I know I would, but the question to Apache Kafka community is would you
like to have Kafka broker commit/fetch extensible, and then also what
committers think about this?

Kind regards,
Stevo Slavic.


On Tue, Jun 2, 2015 at 7:11 PM, Otis Gospodnetic otis.gospodne...@gmail.com
 wrote:

 Hi,

 I haven't followed the changes to offset tracking closely, other than that
 storing them in ZK is not the only option any more.
 I think what Stevo is asking about/suggesting is that there there be a
 single API from which offset information can be retrieved (e.g. by
 monitoring tools), so that monitoring tools work regardless of where one
 chose to store offsets.
 I know we'd love to have this for SPM's Kafka monitoring and can tell you
 that adding support for N different APIs for N different offset storage
 systems would be hard/time-consuming/expensive.
 But maybe this single API already exists?

 Thanks,
 Otis
 --
 Monitoring * Alerting * Anomaly Detection * Centralized Log Management
 Solr  Elasticsearch Support * http://sematext.com/


 On Mon, Jun 1, 2015 at 4:41 PM, Jason Rosenberg j...@squareup.com wrote:

  Stevo,
 
  Both of the main solutions used by the high-level consumer are
 standardized
  and supported directly by the kafka client libraries (e.g. maintaining
  offsets in zookeeper or in kafka itself).  And for the zk case, there is
  the consumer offset checker (which is good for monitoring).  Consumer
  offset checker still needs to be extended for offsets stored in kafka
  _consumer_offset topics though.
 
  Anyway, I'm not sure I understand your question, you want something for
  better monitoring of all possible clients (some of which might choose to
  manage offsets in their own way)?
 
  It's just not part of the kafka design to directly track individual
  consumers.
 
  Jason
 
  On Wed, May 27, 2015 at 7:42 AM, Shady Xu shad...@gmail.com wrote:
 
   I guess adding a new component will increase the complexity of the
 system
   structure. And if the new component consists of one or a few nodes, it
  may
   becomes the bottleneck of the whole system, if it consists of many
 nodes,
   it will make the system even more complex.
  
   Although every solution has its downsides, I think the current one is
   decent.
  
   2015-05-27 17:10 GMT+08:00 Stevo Slavić ssla...@gmail.com:
  
It could be a separate server component, does not have to be
monolith/coupled with broker.
Such solution would have benefits - single API, pluggable
   implementations.
   
On Wed, May 27, 2015 at 8:57 AM, Shady Xu shad...@gmail.com wrote:
   
 Storing and managing offsets by broker will leave high pressure on
  the
 brokers which will affect the performance of the cluster.

 You can use the advanced consumer APIs, then you can get the
 offsets
either
 from zookeeper or the __consumer_offsets__ topic. On the other
 hand,
  if
you
 use the simple consumer APIs, you mean to manage offsets yourself,
  then
you
 should monitor them yourself, simple and plain, right?

 2015-04-22 14:36 GMT+08:00 Stevo Slavić ssla...@gmail.com:

  Hello Apache Kafka community,
 
  Please correct me if wrong, AFAIK currently (Kafka 0.8.2.x)
 offset
  management responsibility is mainly client/consumer side
responsibility.
 
  Wouldn't it be better if it was broker side only responsibility?
 
  E.g. now if one wants to use custom offset management, any of the
   Kafka
  monitoring tools cannot see the offsets - they would need to use
  same
  custom client implementation which is practically not possible.
 
  Kind regards,
  Stevo Slavic.