Matthias, Disregard the exception I mentioned. I think that was a transient error caused by our broker cluster re-spinning.
-russ On Wed, Feb 7, 2018 at 3:45 PM, Russell Teabeault <rteabea...@twitter.com> wrote: > Hi Matthias, > > Thanks for the prompt reply. We have built the kafka-streams jar from the > 1.1 branch and deployed our instances. We are only able to upgrade the > Kafka Streams to 1.1 > and can not upgrade to 1.1 for the brokers. I don't think that should > matter though. Yes? > > It does not seem to have helped. We currently have 25 instances with 4 > threads/instance. Our topology has two topics in it, each having 100 > partitions. The input topic feeds into a filtering step that uses an > in-memory store and that is output via groupBy to an intermediate topic. > The intermediate topic then feeds into an aggregation step which uses the > rocksDB store. So we can see that we have 200 tasks total. After switching > to 1.1 the task assignments are still wildly uneven. Some instances only > have tasks from one of the topics. Furthermore, the instances keep dying > due to org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > > Is there something else we need to do to make this updated task assignment > work? > > Thanks! > -russ > > > > On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> It's a know issue and we addressed it already via >> https://issues.apache.org/jira/browse/KAFKA-4969 >> >> The fix will be part of upcoming 1.1 release, but you could try it out >> immediately running from trunk or 1.0 branch. (If you do, feedback would >> be very welcome :)) >> >> Your proposed workarounds should work. I cannot come up with anything >> else you could do, because the task assignment cannot be influenced. >> >> >> -Matthias >> >> On 2/7/18 10:37 AM, Russell Teabeault wrote: >> > We are using Kafka Streams for a project and had some questions about >> how >> > stream tasks are assigned. >> > >> > streamBuilder >> > .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde)) >> > ... // Do some stuff here >> > >> > .through("intermediate-topic") >> > ... // Do some other stuff here >> > >> > In this example we are streaming from "inbound-topic" and then doing >> some >> > work before writing the results back out to "intermediate-topic". >> > Then we are reading in from "intermediate-topic" and doing some more >> work. >> > If both of these topics contain 100 partitions (200 partitions total) >> and I >> > create 10 instances of my application then >> > what I observe is that there are a total of 20 partitions assigned to >> each >> > instance. But the distribution of these partitions across the two >> topics is >> > not even. For example, one >> > instance may have 7 partitions from "inbound-topic" and 13 partitions >> from >> > "intermediate-topic". I would have hoped that each instance would have >> 10 >> > partitions from each >> > topic. Because of this uneven distribution it can make the resource >> > characteristics from instance to instance very different. >> > >> > In a more concrete example we are reading from an input topic, then >> using >> > an in-memory store to do some filtering, followed by a groupBy, and >> finally >> > doing an aggregate. >> > This results in two topics; the input topic and then the internally >> created >> > intermediate topic written to by the groupBy and read from by the >> > aggregation. What we see is that some >> > instances are assigned far more partitions/tasks that are using the >> > in-memory store and some instances that have very few and sometimes no >> > tasks that use the in-memory store. This leads to wildly >> > different memory usage patterns across the instances. In turn this >> leads us >> > to set our memory much higher than needed if the partitions from each >> topic >> > were equally distributed across the instances. >> > >> > The two ways we have figured out how to deal with this problem are: >> > 1. Use a new StreamBuilder anytime an intermediate topic is being read >> from >> > in the application. >> > 2. Break the topology into separate applications across the boundary of >> an >> > intermediate topic. >> > >> > Neither of these seem like great solutions. So I would like to know: >> > >> > 1. Is this expected behavior? >> > 2. Is there some technique to get equal distribution of task/partition >> > assignments across instances? >> > >> > Thanks for the help. >> > >> > -- >> > Russell Teabeault | Senior Software Engineer | Twitter | @rusticules >> > >> >> > > > -- > -- > Russell Teabeault | Senior Software Engineer | Twitter | @rusticules > -- -- Russell Teabeault | Senior Software Engineer | Twitter | @rusticules