It's a know issue and we addressed it already via
https://issues.apache.org/jira/browse/KAFKA-4969

The fix will be part of upcoming 1.1 release, but you could try it out
immediately running from trunk or 1.0 branch. (If you do, feedback would
be very welcome :))

Your proposed workarounds should work. I cannot come up with anything
else you could do, because the task assignment cannot be influenced.


-Matthias

On 2/7/18 10:37 AM, Russell Teabeault wrote:
> We are using Kafka Streams for a project and had some questions about how
> stream tasks are assigned.
> 
> streamBuilder
>   .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde))
>   ... // Do some stuff here
> 
>   .through("intermediate-topic")
>   ... // Do some other stuff here
> 
> In this example we are streaming from "inbound-topic" and then doing some
> work before writing the results back out to "intermediate-topic".
> Then we are reading in from "intermediate-topic" and doing some more work.
> If both of these topics contain 100 partitions (200 partitions total) and I
> create 10 instances of my application then
> what I observe is that there are a total of 20 partitions assigned to each
> instance. But the distribution of these partitions across the two topics is
> not even. For example, one
> instance may have 7 partitions from "inbound-topic" and 13 partitions from
> "intermediate-topic". I would have hoped that each instance would have 10
> partitions from each
> topic. Because of this uneven distribution it can make the resource
> characteristics from instance to instance very different.
> 
> In a more concrete example we are reading from an input topic, then using
> an in-memory store to do some filtering, followed by a groupBy, and finally
> doing an aggregate.
> This results in two topics; the input topic and then the internally created
> intermediate topic written to by the groupBy and read from by the
> aggregation. What we see is that some
> instances are assigned far more partitions/tasks that are using the
> in-memory store and some instances that have very few and sometimes no
> tasks that use the in-memory store. This leads to wildly
> different memory usage patterns across the instances. In turn this leads us
> to set our memory much higher than needed if the partitions from each topic
> were equally distributed across the instances.
> 
> The two ways we have figured out how to deal with this problem are:
> 1. Use a new StreamBuilder anytime an intermediate topic is being read from
> in the application.
> 2. Break the topology into separate applications across the boundary of an
> intermediate topic.
> 
> Neither of these seem like great solutions. So I would like to know:
> 
> 1. Is this expected behavior?
> 2. Is there some technique to get equal distribution of task/partition
> assignments across instances?
> 
> Thanks for the help.
> 
> --
> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to