Re: JMX prot

2015-11-11 Thread Shaun Senecal
Take a look here:

Basically, you need to set the JMX_PORT variable before starting Kafka

From: , Roy 
Sent: November 11, 2015 3:19 PM
Subject: JMX prot


We are using kafka 8.2.1

want to know if we need to configure jmx port in kafka config ?

if no then how do I tell kafka to expose JMX metric on specific port ?

any idea how to do this ?


Re: It's 5.41am, we're after 20+ hours of debugging our prod cluster. See NotAssignedReplicaException and UnknownException errors. Help?

2015-10-20 Thread Shaun Senecal
I can't say this is the same issue, but it sounds similar to a situation we 
experienced with Kafka 0.8.2.[1-2].  After restarting a broker, the cluster 
would never really recover (ISRs constantly changing, replication failing, 
etc).  We found the only way to fully recover the cluster was to stop all 
producers and consumers, restart the kafka cluster, the once the cluster was 
back up, restart the producers/consumers.  Obviously thats not acceptable for a 
production cluster, but that was the only thing we could find that would get us 
going again.


From: Szymon Sobczak 
Sent: October 19, 2015 9:52 PM
Cc: Big Data
Subject: It's 5.41am, we're after 20+ hours of debugging our prod cluster. See 
NotAssignedReplicaException and UnknownException errors. Help?


We're running a 5-machine production Kafka cluster on version
Yesterday we had some disk problems on one of the replicas and decided to
replace that node with a clean one. That's when we started experiencing
many different problems:

- partition replicas are still assigned to the old node and we can't remove
it form the replica list
- replicas are lagging behind, most of the topics have only one ISR
- most of the leaders are on a single node
- CPU load on the machines is constantly high

We've tried to rebalance the cluster by moving the leaders, decreasing
number of replicas and some others, but it doesn't seem to help. In the
meantime I've noticed very weird errors in the kafka.log

For partition 0 of topic product_templates with the following description:

Topic:product_templates PartitionCount:2 ReplicationFactor:3 Configs:
Topic: product_templates Partition: 0 Leader: 135 Replicas: 135,163,68 Isr:
Topic: product_templates Partition: 1 Leader: 155 Replicas: 163,68,164 Isr:

On machine 135 (which is a leader of product_templates,0) in kafka.log I

kafka.common.NotAssignedReplicaException: Leader 135 failed to record
follower 155's position 0 for partition [product_templates,0] since the
replica 155 is not recognized to be one of the assigned replicas 68,163,135
for partition [product_templates,0]

And the complimentary, on 155 - NOT a replica product_templates,0:

ERROR [ReplicaFetcherThread-0-135] 2015-10-20 04:41:47,011 Logging.scala
kafka.server.ReplicaFetcherThread [ReplicaFetcherThread-0-135], Error for
partition [product_templates,0] to broker 135:class

Both of those happen for multiple topics, on multiple machines. Every
single one happens multiple times per second...

How to approach this? Any help is appreciated!


Shrinking ISR with no load on brokers or incoming messages

2015-10-01 Thread Shaun Senecal

I have noticed that when our brokers have no incoming connections (just 
connections to other brokers and to the ZK cluster) we get messages about 
shrinking the ISR for some partitions

[2015-10-02 00:58:31,239] INFO Partition [lia.stage.raw_events,9] on broker 1: 
Shrinking ISR for partition [lia.stage.raw_events,9] from 1,0,2 to 1 


[2015-10-02 00:58:31,335] INFO Partition [lia.stage.raw_events,9] on broker 1: 
Expanding ISR for partition [lia.stage.raw_events,9] from 1 to 1,0 

[2015-10-02 00:58:31,430] INFO Partition [lia.stage.raw_events,9] on broker 1: 
Expanding ISR for partition [lia.stage.raw_events,9] from 1,0 to 1,0,2 

It seems weird to me that the ISR would ever change when there is no load on 
the brokers and no incoming messages at all.  Does this indicate a problem with 
the cluster, or is this normal?



Re: number of topics given many consumers and groups within the data

2015-09-30 Thread Shaun Senecal
Thanks Ben, Todd

We'll go with the 400 topics and see how it goes.  Currently we have lots of 
problems bringing the brokers back up after a crash/restart and there was 
concern that it was being caused by having too many topics.  From what you have 
said, it seems that 400 topics should not be an issue for a broker, so that 
means our recovery issues are caused by something else and we need to look into 
it further.


From: Ben Stopford <>
Sent: September 30, 2015 11:26 AM
Subject: Re: number of topics given many consumers and groups within the data

I agree. The only reason I can think of for the custom partitioning route would 
be if your group concept were to grow to a point where a topic-per-category 
strategy become prohibitive. This seems unlikely based on what you’ve said. I 
should also add that Todd is spot on regarding the SimpleConsumer not being 
something you’d want to pursue at this time. There is however a new consumer on 
trunk which makes these things a little easier.

> On 30 Sep 2015, at 19:05, Pradeep Gollakota <> wrote:
> To add a little more context to Shaun's question, we have around 400
> customers. Each customer has a stream of events. Some customers generate a
> lot of data while others don't. We need to ensure that each customer's data
> is sorted globally by timestamp.
> We have two use cases around consumption:
> 1. A user may consume an individual customers data
> 2. A user may consume data for all customers
> Given these two use cases, I think the better strategy is to have a
> separate topic per customer as Todd suggested.
> On Wed, Sep 30, 2015 at 9:26 AM, Todd Palino <> wrote:
>> So I disagree with the idea to use custom partitioning, depending on your
>> requirements. Having a consumer consume from a single partition is not
>> (currently) that easy. If you don't care which consumer gets which
>> partition (group), then it's not that bad. You have 20 partitions, you have
>> 20 consumers, and you use custom partitioning as noted. The consumers use
>> the high level consumer with a single group, each one will get one
>> partition each, and it's pretty straightforward. If a consumer crashes, you
>> will end up with two partitions on one of the remaining consumers. If this
>> is OK, this is a decent solution.
>> If, however, you require that each consumer always have the same group of
>> data, and you need to know what that group is beforehand, it's more
>> difficult. You need to use the simple consumer to do it, which means you
>> need to implement a lot of logic for error and status code handling
>> yourself, and do it right. In this case, I think your idea of using 400
>> separate topics is sound. This way you can still use the high level
>> consumer, which takes care of the error handling for you, and your data is
>> separated out by topic.
>> Provided it is not an issue to implement it in your producer, I would go
>> with the separate topics. Alternately, if you're not sure you always want
>> separate topics, you could go with something similar to your second idea,
>> but have a consumer read the single topic and split the data out into 400
>> separate topics in Kafka (no need for Cassandra or Redis or anything else).
>> Then your real consumers can all consume their separate topics. Reading and
>> writing the data one extra time is much better than rereading all of it 400
>> times and throwing most of it away.
>> -Todd
>> On Wed, Sep 30, 2015 at 9:06 AM, Ben Stopford <> wrote:
>>> Hi Shaun
>>> You might consider using a custom partition assignment strategy to push
>>> your different “groups" to different partitions. This would allow you
>> walk
>>> the middle ground between "all consumers consume everything” and “one
>> topic
>>> per consumer” as you vary the number of partitions in the topic, albeit
>> at
>>> the cost of a little extra complexity.
>>> Also, not sure if you’ve seen it but there is quite a good section in the
>>> FAQ here <
>> ?>
>>> on topic and partition sizing.
>>> B
>>>> On 29 Sep 2015, at 18:48, Shaun Senecal <>
>>> wrote:
>>>> Hi
>>>> I heave read Jay Kreps post regarding the number of topics tha

Re: number of topics given many consumers and groups within the data

2015-09-30 Thread Shaun Senecal
Thanks for the link.  I heave come across that at some point in the past, but I 
dont think it quite addresses the issue I'm looking at.

I think the custom partitioner strategy doesn't work either though.  The number 
of groups we have changes over time, so we can't have a fixed strategy.  We can 
use hashing and just create a large number of partitions so that "most of the 
time" there is only 1 group per partition, however, as far as I can tell, this 
is exactly the same as having 1 topic per group (but with more complexity).  Am 
I wrong?  I am under the impression that having 1000 topics with 1 partition 
incurs the same load/costs on the kafka brokers that 1 topic with 1000 
partitions has.


From: Ben Stopford <>
Sent: September 30, 2015 9:06 AM
Subject: Re: number of topics given many consumers and groups within the data

Hi Shaun

You might consider using a custom partition assignment strategy to push your 
different “groups" to different partitions. This would allow you walk the 
middle ground between "all consumers consume everything” and “one topic per 
consumer” as you vary the number of partitions in the topic, albeit at the cost 
of a little extra complexity.

Also, not sure if you’ve seen it but there is quite a good section in the FAQ 
 on topic and partition sizing.


> On 29 Sep 2015, at 18:48, Shaun Senecal <> wrote:
> Hi
> I heave read Jay Kreps post regarding the number of topics that can be 
> handled by a broker 
> (, and 
> it has left me with more questions that I dont see answered anywhere else.
> We have a data stream which will be consumed by many consumers (~400).  We 
> also have many "groups" within our data.  A group in the data corresponds 1:1 
> with what the consumers would consume, so consumer A only ever see group A 
> messages, consumer B only consumes group B messages, etc.
> The downstream consumers will be consuming via a websocket API, so the API 
> server will be the thing consuming from kafka.
> If I use a single topic with, say, 20 partitions, the consumers in the API 
> server would need to re-read the same messages over and over for each 
> consumer, which seems like a waste of network and a potential bottleneck.
> Alternatively, I could use a single topic with 20 partitions and have a 
> single consumer in the API put the messages into cassandra/redis (as 
> suggested by Jay), and serve out the downstream consumer streams that way.  
> However, that requires using a secondary sorted storage, which seems like a 
> waste (and added complexity) given that Kafka already has the data exactly as 
> I need it.  Especially if cassandra/redis are required to maintain a long TTL 
> on the stream.
> Finally, I could use 1 topic per group, each with a single partition.  This 
> would result in 400 topics on the broker, but would allow the API server to 
> simply serve the stream for each consumer directly from kafka and wont 
> require additional machinery to serve out the requests.
> The 400 topic solution makes the most sense to me (doesnt require extra 
> services, doesnt waste resources), but seem to conflict with best practices, 
> so I wanted to ask the community for input.  Has anyone done this before?  
> What makes the most sense here?
> Thanks
> Shaun

number of topics given many consumers and groups within the data

2015-09-29 Thread Shaun Senecal

I heave read Jay Kreps post regarding the number of topics that can be handled 
by a broker 
(, and it 
has left me with more questions that I dont see answered anywhere else.

We have a data stream which will be consumed by many consumers (~400).  We also 
have many "groups" within our data.  A group in the data corresponds 1:1 with 
what the consumers would consume, so consumer A only ever see group A messages, 
consumer B only consumes group B messages, etc.

The downstream consumers will be consuming via a websocket API, so the API 
server will be the thing consuming from kafka.

If I use a single topic with, say, 20 partitions, the consumers in the API 
server would need to re-read the same messages over and over for each consumer, 
which seems like a waste of network and a potential bottleneck.

Alternatively, I could use a single topic with 20 partitions and have a single 
consumer in the API put the messages into cassandra/redis (as suggested by 
Jay), and serve out the downstream consumer streams that way.  However, that 
requires using a secondary sorted storage, which seems like a waste (and added 
complexity) given that Kafka already has the data exactly as I need it.  
Especially if cassandra/redis are required to maintain a long TTL on the stream.

Finally, I could use 1 topic per group, each with a single partition.  This 
would result in 400 topics on the broker, but would allow the API server to 
simply serve the stream for each consumer directly from kafka and wont require 
additional machinery to serve out the requests.

The 400 topic solution makes the most sense to me (doesnt require extra 
services, doesnt waste resources), but seem to conflict with best practices, so 
I wanted to ask the community for input.  Has anyone done this before?  What 
makes the most sense here?

