[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330608#comment-15330608 ] Guozhang Wang commented on KAFKA-3775: -- @Yuto Kawamura I dumped my thoughts about memory management in Kafka Streams in this wiki: https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Memory+Management+in+Kafka+Streams Feel free to comment on it. > Throttle maximum number of tasks assigned to a single KafkaStreams > -- > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > As we're using it to process topics which has huge number of partitions and > message traffic, it is a problem that we don't have a way of throttling the > maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has > more than 10MB/sec traffic of each partition we saw that all partitions > assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps so > some users might wanna take an option to start the first single instance and > check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it > accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that > users don't have a away to control maximum "partitions" assigned to a single > shard(an instance of KafkaStreams here). Users should be allowed to provide > the maximum amount of partitions that is considered as possible to be > processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter > {{max.tasks.assigned}}, which limits the number of tasks(a notion of > partition) assigned to the processId(which is the notion of single > KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) to > tolerate the incomplete assignment. That is, Kafka Streams should continue > working for the part of partitions even there are some partitions left > unassigned, in order to satisfy this> "user may want to take an option to > start the first single instance and check if it works as expected with > lesster number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will > continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15327326#comment-15327326 ] Matthias J. Sax commented on KAFKA-3775: [~tenggyut] [~wushujames] Kafka Streams allow to specify Consumer and Producer configurations the same way as for the regular Java consumer/producer via {{StreamConfig}} (some restrictions apply, as Kafka Streams does for example not allow to enable "auto.commit"). But using the quota feature should work. > Throttle maximum number of tasks assigned to a single KafkaStreams > -- > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > As we're using it to process topics which has huge number of partitions and > message traffic, it is a problem that we don't have a way of throttling the > maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has > more than 10MB/sec traffic of each partition we saw that all partitions > assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps so > some users might wanna take an option to start the first single instance and > check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it > accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that > users don't have a away to control maximum "partitions" assigned to a single > shard(an instance of KafkaStreams here). Users should be allowed to provide > the maximum amount of partitions that is considered as possible to be > processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter > {{max.tasks.assigned}}, which limits the number of tasks(a notion of > partition) assigned to the processId(which is the notion of single > KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) to > tolerate the incomplete assignment. That is, Kafka Streams should continue > working for the part of partitions even there are some partitions left > unassigned, in order to satisfy this> "user may want to take an option to > start the first single instance and check if it works as expected with > lesster number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will > continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15326937#comment-15326937 ] James Cheng commented on KAFKA-3775: [~kawamuray], for the network traffic, could you use Kafka's quota feature? http://kafka.apache.org/documentation.html#design_quotas Kafka lets you specify how much network bandwidth a given client-id is allowed to use. If Kafka Streams lets you specify a client id, then this would let you limit how much network traffic your application uses. > Throttle maximum number of tasks assigned to a single KafkaStreams > -- > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > As we're using it to process topics which has huge number of partitions and > message traffic, it is a problem that we don't have a way of throttling the > maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has > more than 10MB/sec traffic of each partition we saw that all partitions > assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps so > some users might wanna take an option to start the first single instance and > check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it > accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that > users don't have a away to control maximum "partitions" assigned to a single > shard(an instance of KafkaStreams here). Users should be allowed to provide > the maximum amount of partitions that is considered as possible to be > processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter > {{max.tasks.assigned}}, which limits the number of tasks(a notion of > partition) assigned to the processId(which is the notion of single > KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) to > tolerate the incomplete assignment. That is, Kafka Streams should continue > working for the part of partitions even there are some partitions left > unassigned, in order to satisfy this> "user may want to take an option to > start the first single instance and check if it works as expected with > lesster number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will > continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
Good to hear! On the network bandwidth issue... if you're running other processes on the box and what to stop your streaming app from monopolizing the available bandwidth on the box, then I'd suggest looking at some QOS / packet shaping tools to control this outside of Kafka. On Sun, 12 Jun 2016, 16:21 ASF GitHub Bot (JIRA),wrote: > > [ > https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15326485#comment-15326485 > ] > > ASF GitHub Bot commented on KAFKA-3775: > --- > > Github user kawamuray closed the pull request at: > > https://github.com/apache/kafka/pull/1460 > > > > Throttle maximum number of tasks assigned to a single KafkaStreams > > -- > > > > Key: KAFKA-3775 > > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > > Project: Kafka > > Issue Type: Improvement > > Components: streams > >Affects Versions: 0.10.0.0 > >Reporter: Yuto Kawamura > >Assignee: Yuto Kawamura > > Fix For: 0.10.1.0 > > > > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > > As we're using it to process topics which has huge number of partitions > and message traffic, it is a problem that we don't have a way of throttling > the maximum amount of partitions assigned to a single instance. > > In fact, when we started a Kafka Streams app which consumes a topic > which has more than 10MB/sec traffic of each partition we saw that all > partitions assigned to the first instance and soon the app dead by OOM. > > I know that there's some workarounds considerable here. for example: > > - Start multiple instances at once so the partitions distributed evenly. > > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps > so some users might wanna take an option to start the first single instance > and check if it works as expected with lesster number of partitions(I want > :p) > > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > > => Maybe works. but still have two problems IMO: > > - Still leads traffic explosion with high throughput processing as it > accepts all incoming messages from hundreads of partitions. > > - In the first place, by the distributed system principle, it's wired > that users don't have a away to control maximum "partitions" assigned to a > single shard(an instance of KafkaStreams here). Users should be allowed to > provide the maximum amount of partitions that is considered as possible to > be processed with single instance(or host). > > Here, I'd like to introduce a new configuration parameter > {{max.tasks.assigned}}, which limits the number of tasks(a notion of > partition) assigned to the processId(which is the notion of single > KafkaStreams instance). > > At the same time we need to change StreamPartitionAssignor(TaskAssignor) > to tolerate the incomplete assignment. That is, Kafka Streams should > continue working for the part of partitions even there are some partitions > left unassigned, in order to satisfy this> "user may want to take an option > to start the first single instance and check if it works as expected with > lesster number of partitions(I want :p)". > > I've implemented the rough POC for this. PTAL and if it make sense I > will continue sophisticating it. > > > > -- > This message was sent by Atlassian JIRA > (v6.3.4#6332) >
[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15326485#comment-15326485 ] ASF GitHub Bot commented on KAFKA-3775: --- Github user kawamuray closed the pull request at: https://github.com/apache/kafka/pull/1460 > Throttle maximum number of tasks assigned to a single KafkaStreams > -- > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > As we're using it to process topics which has huge number of partitions and > message traffic, it is a problem that we don't have a way of throttling the > maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has > more than 10MB/sec traffic of each partition we saw that all partitions > assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps so > some users might wanna take an option to start the first single instance and > check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it > accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that > users don't have a away to control maximum "partitions" assigned to a single > shard(an instance of KafkaStreams here). Users should be allowed to provide > the maximum amount of partitions that is considered as possible to be > processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter > {{max.tasks.assigned}}, which limits the number of tasks(a notion of > partition) assigned to the processId(which is the notion of single > KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) to > tolerate the incomplete assignment. That is, Kafka Streams should continue > working for the part of partitions even there are some partitions left > unassigned, in order to satisfy this> "user may want to take an option to > start the first single instance and check if it works as expected with > lesster number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will > continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15326484#comment-15326484 ] Yuto Kawamura commented on KAFKA-3775: -- [~guozhang] Yes, both of your short and long term measures are sounds reasonable to me. Personally I'm satisfied with this overall discussion in terms of to avoid memory pressure of when there's very less number of instances of Kafka Streams. As I explained, for now, {{max.poll.records}} worked well at least to stop my app from dying by OOM. I'm also thinking that it'd be better to have much explicit way of configuring memory management of Kafka Streams as you said but that's not urgent for me for now. Still don't know what to do with concentrated network traffic once it became a serious level but I will fill up or reopen an issue once it became an actual problem like killing colocated services by exhausting NIC capacity for example. So I'm going to close my PR and this issue as it's not likely a good way of introducing throttling as most of people disagreed to introduce a risk of unassigned partitions. Expecting we'll continue discussing about the way you suggested in the proper KIP or issue for that. Again, thanks for all your help :) > Throttle maximum number of tasks assigned to a single KafkaStreams > -- > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > As we're using it to process topics which has huge number of partitions and > message traffic, it is a problem that we don't have a way of throttling the > maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has > more than 10MB/sec traffic of each partition we saw that all partitions > assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps so > some users might wanna take an option to start the first single instance and > check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it > accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that > users don't have a away to control maximum "partitions" assigned to a single > shard(an instance of KafkaStreams here). Users should be allowed to provide > the maximum amount of partitions that is considered as possible to be > processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter > {{max.tasks.assigned}}, which limits the number of tasks(a notion of > partition) assigned to the processId(which is the notion of single > KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) to > tolerate the incomplete assignment. That is, Kafka Streams should continue > working for the part of partitions even there are some partitions left > unassigned, in order to satisfy this> "user may want to take an option to > start the first single instance and check if it works as expected with > lesster number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will > continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15325258#comment-15325258 ] Guozhang Wang commented on KAFKA-3775: -- Hi [~kawamuray] Thanks for sharing your usage scenarios, it is very helpful for us to make user experience improvements. In the long run, we definitely would like to make convenient memory management in Kafka Streams since 1) many users may start their applications in a container with strict memory limit, and 2) we want to control the case where task migration caused by, say failures, can cause cascading OOMs on other instances because of sudden increase of memory for new tasks; this is a similar scenario with your case but just in an reversed order: changing from multiple instances to less instances. And I agree that the static {{partition.grouper}} config is not best suited here. There are already some discussion in the KIP-63 thread, which I will try to summarize in a centralized wiki. In the near term, we can remove the continuous {{poll(0)}} just for rebalance once KIP-62 is adopted, which will handle the heartbeat mechanism of the consumer and hence streams do not need to worry about frequent polling just for that. After this change, the memory pressure from {{ConsumerRecord}} should be reduced. Does that sound good to you? > Throttle maximum number of tasks assigned to a single KafkaStreams > -- > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > As we're using it to process topics which has huge number of partitions and > message traffic, it is a problem that we don't have a way of throttling the > maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has > more than 10MB/sec traffic of each partition we saw that all partitions > assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps so > some users might wanna take an option to start the first single instance and > check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it > accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that > users don't have a away to control maximum "partitions" assigned to a single > shard(an instance of KafkaStreams here). Users should be allowed to provide > the maximum amount of partitions that is considered as possible to be > processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter > {{max.tasks.assigned}}, which limits the number of tasks(a notion of > partition) assigned to the processId(which is the notion of single > KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) to > tolerate the incomplete assignment. That is, Kafka Streams should continue > working for the part of partitions even there are some partitions left > unassigned, in order to satisfy this> "user may want to take an option to > start the first single instance and check if it works as expected with > lesster number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will > continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15324187#comment-15324187 ] Yuto Kawamura commented on KAFKA-3775: -- Sorry for leaving this discussion for a while and thanks for all your comments. I was busy for other work before half of this week and just yesterday I finished deploying my Kafka Streams app on production. I've got some experience through that so let me summarize it here. So the initial configuration at the first moment was look like this: {code} poll.ms=100(default) buffered.records.per.partition=1000(default) max.poll.records=Integer.MAX_VALUE(default) max.partition.fetch.bytes=1MiB(default) num.stream.threads=1 {code} First I tried to take a heapdump of the Kafka Streams process just before it dies. Then I found that there's 918772 instances of {{ConsumerRecord}}(most of them should be already a garbage as I took the heapdump with -F switch) which consumes more than 500MB of heap with it's referenced byte array at the moment. There was no other significant usage of heap by other objects(which are irrelevant to ConsumerRecord) so I'm sure this was causing OOM of my app. So I tried several configuration adjustment to avoid OOM. Here's the list I've tried: - Decrease {{buffered.records.per.partition}} from 1000(default) to 10 => No luck. Still OOM. - Decrease {{max.partition.fetch.bytes}} => Couldn't as we allow 1MiB size of message at maximum. - Decrease {{max.poll.records}} from Integer.MAX_VALUE(default) to 10 => Worked. No more OOM. Therefore by decreasing {{max.poll.records}} my application stop dying by OOM. Before that on each poll() invocation it might returned all records fetched for each partition so the memory could be exhausted very easy(I was misunderstanding about this point; I was thinking that poll() is never called as long as all tasks keep records more than {{buffered.records.per.partition}} but it was called continually in fact regardless to that because of {{poll.ms}} expiration). Network traffic increased about 300Mbps on that host but still not problematic ATM as the throughput was likely throttled by the single thread({{num.stream.threads=1}}). After the all instances are up I confirmed that the total throughput isn't enough as I saw the consumption lag keep increasing. I increased the {{num.stream.threads}} up to 3 and did the same deployment again(I know that I could perform rolling restart but just wanted to see what will happen with increased number of threads). So again, first instance survived without OOM but this time the traffic on the NIC increased about 600Mbps which was almost critical level on our network spec. As I started rest of instances, all partitions are distributed equally and now they are running pretty well. So my conclusion is: - Decreasing {{max.poll.records}} to the proper value works in terms of OOM. Still it's not intuitive that it controls memory pressure as the heap usage throttling is just a side effect of this adjustment(it's not for this purpose but for adjusting interval to call {{consumer.poll()}} within proper moment to avoid assignment expiration IIUC). - Still couldn't throttle the network traffic. As I wrote above, when I started a single instance with giving {{num.stream.threads=3}}, the traffic on a NIC of that host reached it's maximum capacity(1Gbps) while it's on catch up read. This could be serious in terms of packet dropping as we're deploying other service daemons on the same node. - I'm still not certain what is the best way of doing it but I believe it's worthful if we have an option to throttle the maximum number of incoming messages to a single instance(or in other word, the maximum capability of single KafkaStreams instance) regarding both memory pressure and traffic. So I'm basically +1 on idea that [~jkreps] suggested(global memory allocation throttling) but still wondering what you can suggest me an option for throttling the network traffic. And about PartitionGrouper: So it can be used to reduce the number of target tasks but that can't be changed w/o rewriting configuration(to revert partition.grouper) and restarting an instance right? If so, that's too cumbersome to perform such a 2-step deployment. First I have to deploy a single instance of custom {{partition.grouper}}, then deploy rest of instances, and finally revert the configuration and deploy again the first instance? No way :( > Throttle maximum number of tasks assigned to a single KafkaStreams > -- > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura >
[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15322254#comment-15322254 ] Matthias J. Sax commented on KAFKA-3775: I want to push this discussion further. As [~guozhang] suggested, it might be better to hand in a custom {{PartitionGrouper}} instead of patching {{StreamPartitionAssignor}}. Would this work for you use-case [~kawamuray]? If yes, we could close this as "not a problem" a keep the list of configurable parameters short. > Throttle maximum number of tasks assigned to a single KafkaStreams > -- > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > As we're using it to process topics which has huge number of partitions and > message traffic, it is a problem that we don't have a way of throttling the > maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has > more than 10MB/sec traffic of each partition we saw that all partitions > assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps so > some users might wanna take an option to start the first single instance and > check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it > accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that > users don't have a away to control maximum "partitions" assigned to a single > shard(an instance of KafkaStreams here). Users should be allowed to provide > the maximum amount of partitions that is considered as possible to be > processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter > {{max.tasks.assigned}}, which limits the number of tasks(a notion of > partition) assigned to the processId(which is the notion of single > KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) to > tolerate the incomplete assignment. That is, Kafka Streams should continue > working for the part of partitions even there are some partitions left > unassigned, in order to satisfy this> "user may want to take an option to > start the first single instance and check if it works as expected with > lesster number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will > continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15314721#comment-15314721 ] Jay Kreps commented on KAFKA-3775: -- I'd have to agree I think this is a memory management problem I think we should solve that directly rather than having you throttle tasks which is a very roundabout way to control memory and would then lead to unprocessed partitions. I think the problems are 1. Kafka consumer does a poor job of controlling memory usage (known issue, needs to be fixed) 2. We may exacerbate it by giving configs around memory that are per-task when they should likely be global. 3. Maybe others? > Throttle maximum number of tasks assigned to a single KafkaStreams > -- > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > As we're using it to process topics which has huge number of partitions and > message traffic, it is a problem that we don't have a way of throttling the > maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has > more than 10MB/sec traffic of each partition we saw that all partitions > assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps so > some users might wanna take an option to start the first single instance and > check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it > accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that > users don't have a away to control maximum "partitions" assigned to a single > shard(an instance of KafkaStreams here). Users should be allowed to provide > the maximum amount of partitions that is considered as possible to be > processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter > {{max.tasks.assigned}}, which limits the number of tasks(a notion of > partition) assigned to the processId(which is the notion of single > KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) to > tolerate the incomplete assignment. That is, Kafka Streams should continue > working for the part of partitions even there are some partitions left > unassigned, in order to satisfy this> "user may want to take an option to > start the first single instance and check if it works as expected with > lesster number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will > continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15314606#comment-15314606 ] Guozhang Wang commented on KAFKA-3775: -- Hi [~mjsax] [~kawamuray], great discussion regarding 1) above! Personally I think it is flexible to let users specify which partitions among all the topics the topology defines as sources to be processed, which is better from a user experience point of view. Actually it is not completely true that "users don't have a away to control maximum "partitions" assigned to a single shard(an instance of KafkaStreams here)." In fact, the user customizable {{PartitionGrouper}} is used exactly for that, which takes the list of all topic-partitions as input, generates the tasks with each task assigned with some topic-partitions. The {{DefaultPartitionGrouper}} of course tries to capture all topic-partitions and generates multiple tasks for them. But users can also customize it by, for example, generating only one task which takes one partition for each of the input topic, and this single task will be assigned to the ONLY instance in your case. NOTE that this partition grouper is global, such that if you have two instances, both of them will execute the same {{PartitionGrouper}}, and if only one task is generated, some instance will become completely idle, and this need to be communicated clearly to users. Does that sound good? > Throttle maximum number of tasks assigned to a single KafkaStreams > -- > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > As we're using it to process topics which has huge number of partitions and > message traffic, it is a problem that we don't have a way of throttling the > maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has > more than 10MB/sec traffic of each partition we saw that all partitions > assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps so > some users might wanna take an option to start the first single instance and > check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it > accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that > users don't have a away to control maximum "partitions" assigned to a single > shard(an instance of KafkaStreams here). Users should be allowed to provide > the maximum amount of partitions that is considered as possible to be > processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter > {{max.tasks.assigned}}, which limits the number of tasks(a notion of > partition) assigned to the processId(which is the notion of single > KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) to > tolerate the incomplete assignment. That is, Kafka Streams should continue > working for the part of partitions even there are some partitions left > unassigned, in order to satisfy this> "user may want to take an option to > start the first single instance and check if it works as expected with > lesster number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will > continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313828#comment-15313828 ] Yuto Kawamura commented on KAFKA-3775: -- Thanks for feedback [~BigAndy] . > With the purposed design some partitions would remain with out a consumer. > This seems like a fundamental switch away from Kafka's current model, and a > risky one in IMHO. Some partitions would remain without a consumer *if the number of living instances become lower than the number of {{num of partitions / max.tasks.assigned}}*. Let's say you have 100 partitions and launching 50 KafkaStreams instances with setting {{max.tasks.assigned=5}}. When you started all 50 instances each instance might get 2 partitions assigned, which is the desired distribution. Then what will happen when an instance failed? 2 partitions which were held by the dead instance will be reassigned to remaining instances without any problem as other instances still have plenty number of {{max.tasks.assigned}}. If more than 31 instances dead at the moment, yes, some partitions will be remain unassigned, but this is out of consideration as the value of {{max.tasks.assigned}} was determined with the consideration to the amount of system resources(CPU, mem, network bandwidth), which means these unassigned partitions could never be processed normally even they reassigned to the living instances because of hardware resource is limited. > This seems like a fundamental switch away from Kafka's current model, and a > risky one in IMHO. BTW, may I ask what you meant by "Kafka's current model" and what risk could you expect much concretely?(user won't noticed unassigned partitions existence?) > Could you also elaborate on why settings such as 'max.poll.records' don't > help stop your initial instance going pop? Maybe there are other alternative > solutions here... Because even I set {{max.poll.records}} to lower, it reduced the number of records fetched by single Fetch request but instead the number of Fetch request will be increased. That means the total throughput wouldn't chagne which still leads traffic bursting. At the same time, it doesn't make sense to me that adjusting the value of {{max.poll.records}} with expecting that a single gets all partitions assigned, as I can set that value to much higher practically when other instances join the group and partitions are evenly distributed. > Throttle maximum number of tasks assigned to a single KafkaStreams > -- > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > As we're using it to process topics which has huge number of partitions and > message traffic, it is a problem that we don't have a way of throttling the > maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has > more than 10MB/sec traffic of each partition we saw that all partitions > assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps so > some users might wanna take an option to start the first single instance and > check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it > accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that > users don't have a away to control maximum "partitions" assigned to a single > shard(an instance of KafkaStreams here). Users should be allowed to provide > the maximum amount of partitions that is considered as possible to be > processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter > {{max.tasks.assigned}}, which limits the number of tasks(a notion of > partition) assigned to the processId(which is the notion of single > KafkaStreams instance). >
[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313773#comment-15313773 ] The Data Lorax commented on KAFKA-3775: --- I wonder how surah a design would maintain the level of resiliency currently offered? At the moment, in a running multi-process cluster the other processes pick up the slack if one of them should fail. With the purposed design some partitions would remain with out a consumer. This seems like a fundamental switch away from Kafka's current model. Could you also elaborate on why settings such as 'max.poll.records' don't help stop your initial instance going pop? Maybe there are other alternative solutions here... > Throttle maximum number of tasks assigned to a single KafkaStreams > -- > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > As we're using it to process topics which has huge number of partitions and > message traffic, it is a problem that we don't have a way of throttling the > maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has > more than 10MB/sec traffic of each partition we saw that all partitions > assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps so > some users might wanna take an option to start the first single instance and > check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it > accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that > users don't have a away to control maximum "partitions" assigned to a single > shard(an instance of KafkaStreams here). Users should be allowed to provide > the maximum amount of partitions that is considered as possible to be > processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter > {{max.tasks.assigned}}, which limits the number of tasks(a notion of > partition) assigned to the processId(which is the notion of single > KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) to > tolerate the incomplete assignment. That is, Kafka Streams should continue > working for the part of partitions even there are some partitions left > unassigned, in order to satisfy this> "user may want to take an option to > start the first single instance and check if it works as expected with > lesster number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will > continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15313577#comment-15313577 ] Yuto Kawamura commented on KAFKA-3775: -- Thanks for feedback [~mjsax] . > 1) a KStreams application should process the whole topic and not parts of it > – limiting the number of partitions is kinda artificial from my point of view So the question is what "KStreams application" consists of. I know that Kafka Streams is designed to work evenly with standalone but the main purpose of making it able to work as standalone is about easy development and testing IIUC. Practially, if we try to run it with the production traffic which consists of hundreads of partitions, it is practially impossible to assign all partitions to a single instance transparently. Indeed restricting the maximum number of partition per instance is an artificial control but that should be given as Kafka Streams is not an execution framework as I said. Users have almost full control of how to construct the Kafka Streams app cluster, that is, it should be allowed to run instances gradually one by one instead of starting necessary number of instances at once, but it's impossible with the existing impl by the reason I described. > 2) even if we limit the number of partitions, it is quite random which would > get processed which not – I would assume that users would like to have a more > transparent assignment I think Kafka Streams partition assignment already isn't transparent. Unless the sticky partition assignment strategy enabled, StreamPartitionAssignor chooses which task(partition) assigned to which instance in round robin with intorducing randomness. That is, we have no control of which partition assigned to which instance by nature. At least you can ensure that all partitions are being assigned if you start instances more than {{partitions / `max.assigned.tasks`}}, and also it's remain possible to not take this option by leaving the configuration with default value(Interger.MAX_VALUE) which guarantees that single instance still accepts all tasks(partitions) assigned. > 3) last but not least, under the hood we are using the standard Java > KafkaConsumer: looking at your patch (just briefly), it seems you changed the > task assignment – however, this is independent from the partitions assignment > of the used consumer – thus, the consumer would still poll all partitions but > would not be able to assign records for some partitions as the corresponding > tasks are missing. Hmm, not sure if I'm understanding your explanation correctly but this sounds different from what I know. First, KafkaStreams is providing custom PartitionAssignor; StreamPartitionAssignor which takes full control of which partition to assign which consumer thread of which instance. Second, the consuemr polls only partitions which it gets assigned by group coordinator that relies on PartitionAssignor to decide the actual assignment. So that is, an instance will never get a record from the partition which isn't being assigned to it, therefore what you've concerned will never happend IIUC. Am I misunderstand something? > Throttle maximum number of tasks assigned to a single KafkaStreams > -- > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > As we're using it to process topics which has huge number of partitions and > message traffic, it is a problem that we don't have a way of throttling the > maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has > more than 10MB/sec traffic of each partition we saw that all partitions > assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps so > some users might wanna take an option to start the first single instance and > check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > => Maybe works. but
[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15312250#comment-15312250 ] Matthias J. Sax commented on KAFKA-3775: I have some concerns about this: 1) a KStreams application should process the whole topic and not parts of it -- limiting the number of partitions is kinda artificial from my point of view 2) even if we limit the number of partitions, it is quite random which would get processed which not -- I would assume that users would like to have a more transparent assignment 3) last but not least, under the hood we are using the standard Java KafkaConsumer: looking at your patch (just briefly), it seems you changed the task assignment -- however, this is independent from the partitions assignment of the used consumer -- thus, the consumer would still poll all partitions but would not be able to assign records for some partitions as the corresponding tasks are missing. Haven elaborated 3) this raises a more complex problem: Right now, KafkaStreams relies on Kafka's internal partition assignment. If you want to assign only some partitions, we cannot use standard high level Java KafkaConsumer and would need to implement an own assignment strategy to allow for partial assignment within a consumer group (ie, allow a consumer group that does not assign all partitions). > Throttle maximum number of tasks assigned to a single KafkaStreams > -- > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > As we're using it to process topics which has huge number of partitions and > message traffic, it is a problem that we don't have a way of throttling the > maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has > more than 10MB/sec traffic of each partition we saw that all partitions > assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps so > some users might wanna take an option to start the first single instance and > check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it > accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that > users don't have a away to control maximum "partitions" assigned to a single > shard(an instance of KafkaStreams here). Users should be allowed to provide > the maximum amount of partitions that is considered as possible to be > processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter > {{max.tasks.assigned}}, which limits the number of tasks(a notion of > partition) assigned to the processId(which is the notion of single > KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) to > tolerate the incomplete assignment. That is, Kafka Streams should continue > working for the part of partitions even there are some partitions left > unassigned, in order to satisfy this> "user may want to take an option to > start the first single instance and check if it works as expected with > lesster number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will > continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311838#comment-15311838 ] Yuto Kawamura commented on KAFKA-3775: -- [~guozhang] What do you think? > Throttle maximum number of tasks assigned to a single KafkaStreams > -- > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > As we're using it to process topics which has huge number of partitions and > message traffic, it is a problem that we don't have a way of throttling the > maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has > more than 10MB/sec traffic of each partition we saw that all partitions > assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps so > some users might wanna take an option to start the first single instance and > check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it > accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that > users don't have a away to control maximum "partitions" assigned to a single > shard(an instance of KafkaStreams here). Users should be allowed to provide > the maximum amount of partitions that is considered as possible to be > processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter > {{max.tasks.assigned}}, which limits the number of tasks(a notion of > partition) assigned to the processId(which is the notion of single > KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) to > tolerate the incomplete assignment. That is, Kafka Streams should continue > working for the part of partitions even there are some partitions left > unassigned, in order to satisfy this> "user may want to take an option to > start the first single instance and check if it works as expected with > lesster number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will > continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3775) Throttle maximum number of tasks assigned to a single KafkaStreams
[ https://issues.apache.org/jira/browse/KAFKA-3775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15311836#comment-15311836 ] ASF GitHub Bot commented on KAFKA-3775: --- GitHub user kawamuray opened a pull request: https://github.com/apache/kafka/pull/1460 KAFKA-3775: Throttle maximum number of tasks assigned to a single KafkaStreams Issue: https://issues.apache.org/jira/browse/KAFKA-3775 POC. Discussion in progress. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kawamuray/kafka KAFKA-3775-throttle-tasks Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1460.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1460 commit fefe259b2c97bb1bbf14b572533ca74348651c0d Author: Yuto KawamuraDate: 2016-06-02T03:46:51Z MINOR: Add toString() to ClientState for debugging commit c4f363d32d9a496c0f4b4e66ee846429a2a2eda5 Author: Yuto Kawamura Date: 2016-06-02T03:51:34Z MINOR: Remove meanglessly repeated assertions in unit test commit 3c173fa5d029277e5d1974c104d7e66939b5cd17 Author: Yuto Kawamura Date: 2016-06-02T03:55:10Z KAFKA-3775: Intorduce new streams configuration max.tasks.assigned This configuration limits the maximum number of tasks assigned to a single KafkaStreams instance. As a task consists of single partition for more than 1 topic, setting this value to lower is useful to prevent huge number of partitions are assigned to an instance which started first. > Throttle maximum number of tasks assigned to a single KafkaStreams > -- > > Key: KAFKA-3775 > URL: https://issues.apache.org/jira/browse/KAFKA-3775 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura > Fix For: 0.10.1.0 > > > As of today, if I start a Kafka Streams app on a single machine which > consists of single KafkaStreams instance, that instance gets all partitions > of the target topic assigned. > As we're using it to process topics which has huge number of partitions and > message traffic, it is a problem that we don't have a way of throttling the > maximum amount of partitions assigned to a single instance. > In fact, when we started a Kafka Streams app which consumes a topic which has > more than 10MB/sec traffic of each partition we saw that all partitions > assigned to the first instance and soon the app dead by OOM. > I know that there's some workarounds considerable here. for example: > - Start multiple instances at once so the partitions distributed evenly. > => Maybe works. but as Kafka Streams is a library but not an execution > framework, there's no predefined procedure of starting Kafka Streams apps so > some users might wanna take an option to start the first single instance and > check if it works as expected with lesster number of partitions(I want :p) > - Adjust config parameters such as {{buffered.records.per.partition}}, > {{max.partition.fetch.bytes}} and {{max.poll.records}} to reduce the heap > pressure. > => Maybe works. but still have two problems IMO: > - Still leads traffic explosion with high throughput processing as it > accepts all incoming messages from hundreads of partitions. > - In the first place, by the distributed system principle, it's wired that > users don't have a away to control maximum "partitions" assigned to a single > shard(an instance of KafkaStreams here). Users should be allowed to provide > the maximum amount of partitions that is considered as possible to be > processed with single instance(or host). > Here, I'd like to introduce a new configuration parameter > {{max.tasks.assigned}}, which limits the number of tasks(a notion of > partition) assigned to the processId(which is the notion of single > KafkaStreams instance). > At the same time we need to change StreamPartitionAssignor(TaskAssignor) to > tolerate the incomplete assignment. That is, Kafka Streams should continue > working for the part of partitions even there are some partitions left > unassigned, in order to satisfy this> "user may want to take an option to > start the first single instance and check if it works as expected with > lesster number of partitions(I want :p)". > I've implemented the rough POC for this. PTAL and if it make sense I will > continue sophisticating it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)