[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-14 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330608#comment-15330608
 ] 

Guozhang Wang commented on KAFKA-3775:
--

@Yuto Kawamura I dumped my thoughts about memory management in Kafka Streams in 
this wiki: 
https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Memory+Management+in+Kafka+Streams

Feel free to comment on it.

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-13 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327326#comment-15327326
 ] 

Matthias J. Sax commented on KAFKA-3775:


[~tenggyut] [~wushujames] Kafka Streams allow to specify Consumer and Producer 
configurations the same way as for the regular Java consumer/producer via 
{{StreamConfig}} (some restrictions apply, as Kafka Streams does for example 
not allow to enable "auto.commit"). But using the quota feature should work.

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-13 Thread James Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15326937#comment-15326937
 ] 

James Cheng commented on KAFKA-3775:


[~kawamuray], for the network traffic, could you use Kafka's quota feature? 
http://kafka.apache.org/documentation.html#design_quotas Kafka lets you specify 
how much network bandwidth a given client-id is allowed to use. If Kafka 
Streams lets you specify a client id, then this would let you limit how much 
network traffic your application uses.

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-12 Thread Andrew Coates
Good to hear!

On the network bandwidth issue... if you're running other processes on the
box and what to stop your streaming app from monopolizing the available
bandwidth on the box, then I'd suggest looking at some QOS / packet shaping
tools to control this outside of Kafka.

On Sun, 12 Jun 2016, 16:21 ASF GitHub Bot (JIRA),  wrote:

>
> [
> https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15326485#comment-15326485
> ]
>
> ASF GitHub Bot commented on KAFKA-3775:
> ---
>
> Github user kawamuray closed the pull request at:
>
> https://github.com/apache/kafka/pull/1460
>
>
> > Throttle maximum number of tasks assigned to a single KafkaStreams
> > --
> >
> > Key: KAFKA-3775
> > URL: https://issues.apache.org/jira/browse/KAFKA-3775
> > Project: Kafka
> >  Issue Type: Improvement
> >  Components: streams
> >Affects Versions: 0.10.0.0
> >Reporter: Yuto Kawamura
> >Assignee: Yuto Kawamura
> > Fix For: 0.10.1.0
> >
> >
> > As of today, if I start a Kafka Streams app on a single machine which
> consists of single KafkaStreams instance, that instance gets all partitions
> of the target topic assigned.
> > As we're using it to process topics which has huge number of partitions
> and message traffic, it is a problem that we don't have a way of throttling
> the maximum amount of partitions assigned to a single instance.
> > In fact, when we started a Kafka Streams app which consumes a topic
> which has more than 10MB/sec traffic of each partition we saw that all
> partitions assigned to the first instance and soon the app dead by OOM.
> > I know that there's some workarounds considerable here. for example:
> > - Start multiple instances at once so the partitions distributed evenly.
> >   => Maybe works. but as Kafka Streams is a library but not an execution
> framework, there's no predefined procedure of starting Kafka Streams apps
> so some users might wanna take an option to start the first single instance
> and check if it works as expected with lesster number of partitions(I want
> :p)
> > - Adjust config parameters such as {{buffered.records.per.partition}},
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap
> pressure.
> >   => Maybe works. but still have two problems IMO:
> >   - Still leads traffic explosion with high throughput processing as it
> accepts all incoming messages from hundreads of partitions.
> >   - In the first place, by the distributed system principle, it's wired
> that users don't have a away to control maximum "partitions" assigned to a
> single shard(an instance of KafkaStreams here). Users should be allowed to
> provide the maximum amount of partitions that is considered as possible to
> be processed with single instance(or host).
> > Here, I'd like to introduce a new configuration parameter
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of
> partition) assigned to the processId(which is the notion of single
> KafkaStreams instance).
> > At the same time we need to change StreamPartitionAssignor(TaskAssignor)
> to tolerate the incomplete assignment. That is, Kafka Streams should
> continue working for the part of partitions even there are some partitions
> left unassigned, in order to satisfy this> "user may want to take an option
> to start the first single instance and check if it works as expected with
> lesster number of partitions(I want :p)".
> > I've implemented the rough POC for this. PTAL and if it make sense I
> will continue sophisticating it.
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.3.4#6332)
>


[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15326485#comment-15326485
 ] 

ASF GitHub Bot commented on KAFKA-3775:
---

Github user kawamuray closed the pull request at:

https://github.com/apache/kafka/pull/1460


> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-12 Thread Yuto Kawamura (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15326484#comment-15326484
 ] 

Yuto Kawamura commented on KAFKA-3775:
--

[~guozhang] Yes, both of your short and long term measures are sounds 
reasonable to me.

Personally I'm satisfied with this overall discussion in terms of to avoid 
memory pressure of when there's very less number of instances of Kafka Streams.
As I explained, for now, {{max.poll.records}} worked well at least to stop my 
app from dying by OOM. I'm also thinking that it'd be better to have much 
explicit way of configuring memory management of Kafka Streams as you said but 
that's not urgent for me for now.

Still don't know what to do with concentrated network traffic once it became a 
serious level but I will fill up or reopen an issue once it became an actual 
problem like killing colocated services by exhausting NIC capacity for example.

So I'm going to close my PR and this issue as it's not likely a good way of 
introducing throttling as most of people disagreed to introduce a risk of 
unassigned partitions. Expecting we'll continue discussing about the way you 
suggested in the proper KIP or issue for that.

Again, thanks for all your help :)

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-10 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325258#comment-15325258
 ] 

Guozhang Wang commented on KAFKA-3775:
--

Hi [~kawamuray] Thanks for sharing your usage scenarios, it is very helpful for 
us to make user experience improvements.

In the long run, we definitely would like to make convenient memory management 
in Kafka Streams since 1) many users may start their applications in a 
container with strict memory limit, and 2) we want to control the case where 
task migration caused by, say failures, can cause cascading OOMs on other 
instances because of sudden increase of memory for new tasks; this is a similar 
scenario with your case but just in an reversed order: changing from multiple 
instances to less instances. And I agree that the static {{partition.grouper}} 
config is not best suited here. There are already some discussion in the KIP-63 
thread, which I will try to summarize in a centralized wiki.

In the near term, we can remove the continuous {{poll(0)}} just for rebalance 
once KIP-62 is adopted, which will handle the heartbeat mechanism of the 
consumer and hence streams do not need to worry about frequent polling just for 
that. After this change, the memory pressure from {{ConsumerRecord}} should be 
reduced.

Does that sound good to you?

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-10 Thread Yuto Kawamura (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324187#comment-15324187
 ] 

Yuto Kawamura commented on KAFKA-3775:
--

Sorry for leaving this discussion for a while and thanks for all your comments. 
I was busy for other work before half of this week and just yesterday I 
finished deploying my Kafka Streams app on production. I've got some experience 
through that so let me summarize it here.

So the initial configuration at the first moment was look like this:
{code}
poll.ms=100(default)
buffered.records.per.partition=1000(default)
max.poll.records=Integer.MAX_VALUE(default)
max.partition.fetch.bytes=1MiB(default)
num.stream.threads=1
{code}

First I tried to take a heapdump of the Kafka Streams process just before it 
dies. Then I found that there's 918772 instances of {{ConsumerRecord}}(most of 
them should be already a garbage as I took the heapdump with -F switch) which 
consumes more than 500MB of heap with it's referenced byte array at the moment. 
There was no other significant usage of heap by other objects(which are 
irrelevant to ConsumerRecord) so I'm sure this was causing OOM of my app.

So I tried several configuration adjustment to avoid OOM. Here's the list I've 
tried:

- Decrease {{buffered.records.per.partition}} from 1000(default) to 10 => No 
luck. Still OOM.
- Decrease {{max.partition.fetch.bytes}} => Couldn't as we allow 1MiB size of 
message at maximum.
- Decrease {{max.poll.records}} from Integer.MAX_VALUE(default) to 10 => 
Worked. No more OOM.

Therefore by decreasing {{max.poll.records}} my application stop dying by OOM. 
Before that on each poll() invocation it might returned all records fetched for 
each partition so the memory could be exhausted very easy(I was 
misunderstanding about this point; I was thinking that poll() is never called 
as long as all tasks keep records more than {{buffered.records.per.partition}} 
but it was called continually in fact regardless to that because of {{poll.ms}} 
expiration).
Network traffic increased about 300Mbps on that host but still not problematic 
ATM as the throughput was likely throttled by the single 
thread({{num.stream.threads=1}}).

After the all instances are up I confirmed that the total throughput isn't 
enough as I saw the consumption lag keep increasing. I increased the 
{{num.stream.threads}} up to 3 and did the same deployment again(I know that I 
could perform rolling restart but just wanted to see what will happen with 
increased number of threads).
So again, first instance survived without OOM but this time the traffic on the 
NIC increased about 600Mbps which was almost critical level on our network 
spec. As I started rest of instances, all partitions are distributed equally 
and now they are running pretty well.

So my conclusion is:

- Decreasing {{max.poll.records}} to the proper value works in terms of OOM. 
Still it's not intuitive that it controls memory pressure as the heap usage 
throttling is just a side effect of this adjustment(it's not for this purpose 
but for adjusting interval to call {{consumer.poll()}} within proper moment to 
avoid assignment expiration IIUC).
- Still couldn't throttle the network traffic. As I wrote above, when I started 
a single instance with giving {{num.stream.threads=3}}, the traffic on a NIC of 
that host reached it's maximum capacity(1Gbps) while it's on catch up read. 
This could be serious in terms of packet dropping as we're deploying other 
service daemons on the same node.
- I'm still not certain what is the best way of doing it but I believe it's 
worthful if we have an option to throttle the maximum number of incoming 
messages to a single instance(or in other word, the maximum capability of 
single KafkaStreams instance) regarding both memory pressure and traffic. So 
I'm basically +1 on idea that [~jkreps] suggested(global memory allocation 
throttling) but still wondering what you can suggest me an option for 
throttling the network traffic.

And about PartitionGrouper:
So it can be used to reduce the number of target tasks but that can't be 
changed w/o rewriting configuration(to revert partition.grouper) and restarting 
an instance right?
If so, that's too cumbersome to perform such a 2-step deployment. First I have 
to deploy a single instance of custom {{partition.grouper}}, then deploy rest 
of instances, and finally revert the configuration and deploy again the first 
instance? No way :(


> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> 

[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-09 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322254#comment-15322254
 ] 

Matthias J. Sax commented on KAFKA-3775:


I want to push this discussion further. As [~guozhang] suggested, it might be 
better to hand in a custom {{PartitionGrouper}} instead of patching 
{{StreamPartitionAssignor}}. Would this work for you use-case [~kawamuray]? If 
yes, we could close this as "not a problem" a keep the list of configurable 
parameters short.

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-03 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15314721#comment-15314721
 ] 

Jay Kreps commented on KAFKA-3775:
--

I'd have to agree I think this is a memory management problem I think we should 
solve that directly rather than having you throttle tasks which is a very 
roundabout way to control memory and would then lead to unprocessed partitions. 
I think the problems are
1. Kafka consumer does a poor job of controlling memory usage (known issue, 
needs to be fixed)
2. We may exacerbate it by giving configs around memory that are per-task when 
they should likely be global.
3. Maybe others?

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-03 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15314606#comment-15314606
 ] 

Guozhang Wang commented on KAFKA-3775:
--

Hi [~mjsax] [~kawamuray], great discussion regarding 1) above! Personally I 
think it is flexible to let users specify which partitions among all the topics 
the topology defines as sources to be processed, which is better from a user 
experience point of view.

Actually it is not completely true that "users don't have a away to control 
maximum "partitions" assigned to a single shard(an instance of KafkaStreams 
here)." In fact, the user customizable {{PartitionGrouper}} is used exactly for 
that, which takes the list of all topic-partitions as input, generates the 
tasks with each task assigned with some topic-partitions. The 
{{DefaultPartitionGrouper}} of course tries to capture all topic-partitions and 
generates multiple tasks for them. But users can also customize it by, for 
example, generating only one task which takes one partition for each of the 
input topic, and this single task will be assigned to the ONLY instance in your 
case. NOTE that this partition grouper is global, such that if you have two 
instances, both of them will execute the same {{PartitionGrouper}}, and if only 
one task is generated, some instance will become completely idle, and this need 
to be communicated clearly to users. Does that sound good?

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-03 Thread Yuto Kawamura (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313828#comment-15313828
 ] 

Yuto Kawamura commented on KAFKA-3775:
--

Thanks for feedback [~BigAndy] .

> With the purposed design some partitions would remain with out a consumer. 
> This seems like a fundamental switch away from Kafka's current model, and a 
> risky one in IMHO.

Some partitions would remain without a consumer *if the number of living 
instances become lower than the number of {{num of partitions / 
max.tasks.assigned}}*.
Let's say you have 100 partitions and launching 50 KafkaStreams instances with 
setting {{max.tasks.assigned=5}}. When you started all 50 instances each 
instance might get 2 partitions assigned, which is the desired distribution.
Then what will happen when an instance failed? 2 partitions which were held by 
the dead instance will be reassigned to remaining instances without any problem 
as other instances still have plenty number of {{max.tasks.assigned}}.
If more than 31 instances dead at the moment, yes, some partitions will be 
remain unassigned, but this is out of consideration as the value of 
{{max.tasks.assigned}} was determined with the consideration to the amount of 
system resources(CPU, mem, network bandwidth), which means these unassigned 
partitions could never be processed normally even they reassigned to the living 
instances because of hardware resource is limited.

> This seems like a fundamental switch away from Kafka's current model, and a 
> risky one in IMHO.

BTW, may I ask what you meant by "Kafka's current model" and what risk could 
you expect much concretely?(user won't noticed unassigned partitions existence?)

> Could you also elaborate on why settings such as 'max.poll.records' don't 
> help stop your initial instance going pop? Maybe there are other alternative 
> solutions here...

Because even I set {{max.poll.records}} to lower, it reduced the number of 
records fetched by single Fetch request but instead the number of Fetch request 
will be increased. That means the total throughput wouldn't chagne which still 
leads traffic bursting.
At the same time, it doesn't make sense to me that adjusting the value of 
{{max.poll.records}} with expecting that a single gets all partitions assigned, 
as I can set that value to much higher practically when other instances join 
the group and partitions are evenly distributed.


> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> 

[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-03 Thread The Data Lorax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313773#comment-15313773
 ] 

The Data Lorax commented on KAFKA-3775:
---

I wonder how surah a design would maintain the level of resiliency currently 
offered? At the moment, in a running multi-process cluster the other processes 
pick up the slack if one of them should fail. With the purposed design some 
partitions would remain with out a consumer. This seems like a fundamental 
switch away from Kafka's current model.

Could you also elaborate on why settings such as 'max.poll.records' don't help 
stop your initial instance going pop? Maybe there are other alternative 
solutions here... 

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-02 Thread Yuto Kawamura (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313577#comment-15313577
 ] 

Yuto Kawamura commented on KAFKA-3775:
--

Thanks for feedback [~mjsax] .

> 1) a KStreams application should process the whole topic and not parts of it 
> – limiting the number of partitions is kinda artificial from my point of view

So the question is what "KStreams application" consists of. I know that Kafka 
Streams is designed to work evenly with standalone but the main purpose of 
making it able to work as standalone is about easy development and testing 
IIUC. Practially, if we try to run it with the production traffic which 
consists of hundreads of partitions, it is practially impossible to assign all 
partitions to a single instance transparently. Indeed restricting the maximum 
number of partition per instance is an artificial control but that should be 
given as Kafka Streams is not an execution framework as I said. Users have 
almost full control of how to construct the Kafka Streams app cluster, that is, 
it should be allowed to run instances gradually one by one instead of starting 
necessary number of instances at once, but it's impossible with the existing 
impl by the reason I described.

> 2) even if we limit the number of partitions, it is quite random which would 
> get processed which not – I would assume that users would like to have a more 
> transparent assignment

I think Kafka Streams partition assignment already isn't transparent. Unless 
the sticky partition assignment strategy enabled, StreamPartitionAssignor 
chooses which task(partition) assigned to which instance in round robin with 
intorducing randomness. That is, we have no control of which partition assigned 
to which instance by nature.
At least you can ensure that all partitions are being assigned if you start 
instances more than {{partitions / `max.assigned.tasks`}}, and also it's remain 
possible to not take this option by leaving the configuration with default 
value(Interger.MAX_VALUE) which guarantees that single instance still accepts 
all tasks(partitions) assigned.

> 3) last but not least, under the hood we are using the standard Java 
> KafkaConsumer: looking at your patch (just briefly), it seems you changed the 
> task assignment – however, this is independent from the partitions assignment 
> of the used consumer – thus, the consumer would still poll all partitions but 
> would not be able to assign records for some partitions as the corresponding 
> tasks are missing.

Hmm, not sure if I'm understanding your explanation correctly but this sounds 
different from what I know.
First, KafkaStreams is providing custom PartitionAssignor; 
StreamPartitionAssignor which takes full control of which partition to assign 
which consumer thread of which instance.
Second, the consuemr polls only partitions which it gets assigned by group 
coordinator that relies on PartitionAssignor to decide the actual assignment. 
So that is, an instance will never get a record from the partition which isn't 
being assigned to it, therefore what you've concerned will never happend IIUC.
Am I misunderstand something?


> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but 

[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-02 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312250#comment-15312250
 ] 

Matthias J. Sax commented on KAFKA-3775:


I have some concerns about this:
 1) a KStreams application should process the whole topic and not parts of it 
-- limiting the number of partitions is kinda artificial from my point of view
 2) even if we limit the number of partitions, it is quite random which would 
get processed which not -- I would assume that users would like to have a more 
transparent assignment
 3) last but not least, under the hood we are using the standard Java 
KafkaConsumer: looking at your patch (just briefly), it seems you changed the 
task assignment -- however, this is independent from the partitions assignment 
of the used consumer -- thus, the consumer would still poll all partitions but 
would not be able to assign records for some partitions as the corresponding 
tasks are missing.

Haven elaborated 3) this raises a more complex problem: Right now, KafkaStreams 
relies on Kafka's internal partition assignment. If you want to assign only 
some partitions, we cannot use standard high level Java KafkaConsumer and would 
need to implement an own assignment strategy to allow for partial assignment 
within a consumer group (ie, allow a consumer group that does not assign all 
partitions).
 

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-02 Thread Yuto Kawamura (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311838#comment-15311838
 ] 

Yuto Kawamura commented on KAFKA-3775:
--

[~guozhang] What do you think?

> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams

2016-06-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311836#comment-15311836
 ] 

ASF GitHub Bot commented on KAFKA-3775:
---

GitHub user kawamuray opened a pull request:

https://github.com/apache/kafka/pull/1460

KAFKA-3775: Throttle maximum number of tasks assigned to a single 
KafkaStreams

Issue: https://issues.apache.org/jira/browse/KAFKA-3775

POC. Discussion in progress.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kawamuray/kafka KAFKA-3775-throttle-tasks

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1460.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1460


commit fefe259b2c97bb1bbf14b572533ca74348651c0d
Author: Yuto Kawamura 
Date:   2016-06-02T03:46:51Z

MINOR: Add toString() to ClientState for debugging

commit c4f363d32d9a496c0f4b4e66ee846429a2a2eda5
Author: Yuto Kawamura 
Date:   2016-06-02T03:51:34Z

MINOR: Remove meanglessly repeated assertions in unit test

commit 3c173fa5d029277e5d1974c104d7e66939b5cd17
Author: Yuto Kawamura 
Date:   2016-06-02T03:55:10Z

KAFKA-3775: Intorduce new streams configuration max.tasks.assigned

This configuration limits the maximum number of tasks assigned to a single 
KafkaStreams instance.
As a task consists of single partition for more than 1 topic, setting this 
value to lower is useful
to prevent huge number of partitions are assigned to an instance which 
started first.




> Throttle maximum number of tasks assigned to a single KafkaStreams
> --
>
> Key: KAFKA-3775
> URL: https://issues.apache.org/jira/browse/KAFKA-3775
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
> Fix For: 0.10.1.0
>
>
> As of today, if I start a Kafka Streams app on a single machine which 
> consists of single KafkaStreams instance, that instance gets all partitions 
> of the target topic assigned.
> As we're using it to process topics which has huge number of partitions and 
> message traffic, it is a problem that we don't have a way of throttling the 
> maximum amount of partitions assigned to a single instance.
> In fact, when we started a Kafka Streams app which consumes a topic which has 
> more than 10MB/sec traffic of each partition we saw that all partitions 
> assigned to the first instance and soon the app dead by OOM.
> I know that there's some workarounds considerable here. for example:
> - Start multiple instances at once so the partitions distributed evenly.
>   => Maybe works. but as Kafka Streams is a library but not an execution 
> framework, there's no predefined procedure of starting Kafka Streams apps so 
> some users might wanna take an option to start the first single instance and 
> check if it works as expected with lesster number of partitions(I want :p)
> - Adjust config parameters such as {{buffered.records.per.partition}}, 
> {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap 
> pressure.
>   => Maybe works. but still have two problems IMO:
>   - Still leads traffic explosion with high throughput processing as it 
> accepts all incoming messages from hundreads of partitions.
>   - In the first place, by the distributed system principle, it's wired that 
> users don't have a away to control maximum "partitions" assigned to a single 
> shard(an instance of KafkaStreams here). Users should be allowed to provide 
> the maximum amount of partitions that is considered as possible to be 
> processed with single instance(or host).
> Here, I'd like to introduce a new configuration parameter 
> {{max.tasks.assigned}}, which limits the number of tasks(a notion of 
> partition) assigned to the processId(which is the notion of single 
> KafkaStreams instance).
> At the same time we need to change StreamPartitionAssignor(TaskAssignor) to 
> tolerate the incomplete assignment. That is, Kafka Streams should continue 
> working for the part of partitions even there are some partitions left 
> unassigned, in order to satisfy this> "user may want to take an option to 
> start the first single instance and check if it works as expected with 
> lesster number of partitions(I want :p)".
> I've implemented the rough POC for this. PTAL and if it make sense I will 
> continue sophisticating it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)