We are using Kafka Streams for a project and had some questions about how
stream tasks are assigned.

streamBuilder
  .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde))
  ... // Do some stuff here

  .through("intermediate-topic")
  ... // Do some other stuff here

In this example we are streaming from "inbound-topic" and then doing some
work before writing the results back out to "intermediate-topic".
Then we are reading in from "intermediate-topic" and doing some more work.
If both of these topics contain 100 partitions (200 partitions total) and I
create 10 instances of my application then
what I observe is that there are a total of 20 partitions assigned to each
instance. But the distribution of these partitions across the two topics is
not even. For example, one
instance may have 7 partitions from "inbound-topic" and 13 partitions from
"intermediate-topic". I would have hoped that each instance would have 10
partitions from each
topic. Because of this uneven distribution it can make the resource
characteristics from instance to instance very different.

In a more concrete example we are reading from an input topic, then using
an in-memory store to do some filtering, followed by a groupBy, and finally
doing an aggregate.
This results in two topics; the input topic and then the internally created
intermediate topic written to by the groupBy and read from by the
aggregation. What we see is that some
instances are assigned far more partitions/tasks that are using the
in-memory store and some instances that have very few and sometimes no
tasks that use the in-memory store. This leads to wildly
different memory usage patterns across the instances. In turn this leads us
to set our memory much higher than needed if the partitions from each topic
were equally distributed across the instances.

The two ways we have figured out how to deal with this problem are:
1. Use a new StreamBuilder anytime an intermediate topic is being read from
in the application.
2. Break the topology into separate applications across the boundary of an
intermediate topic.

Neither of these seem like great solutions. So I would like to know:

1. Is this expected behavior?
2. Is there some technique to get equal distribution of task/partition
assignments across instances?

Thanks for the help.

--
Russell Teabeault | Senior Software Engineer | Twitter | @rusticules

Reply via email to