Bill, I may be able to.
- What logging level? - Do you need logs from all the instances? - Where should I send them? -russ On Wed, Feb 7, 2018 at 4:12 PM, Bill Bejeck <b...@confluent.io> wrote: > Russell, > > Can you share any log files? > > Thanks, > Bill > > > > On Wed, Feb 7, 2018 at 5:45 PM, Russell Teabeault < > rteabea...@twitter.com.invalid> wrote: > > > Hi Matthias, > > > > Thanks for the prompt reply. We have built the kafka-streams jar from the > > 1.1 branch and deployed our instances. We are only able to upgrade the > > Kafka Streams to 1.1 > > and can not upgrade to 1.1 for the brokers. I don't think that should > > matter though. Yes? > > > > It does not seem to have helped. We currently have 25 instances with 4 > > threads/instance. Our topology has two topics in it, each having 100 > > partitions. The input topic feeds into a filtering step that uses an > > in-memory store and that is output via groupBy to an intermediate topic. > > The intermediate topic then feeds into an aggregation step which uses the > > rocksDB store. So we can see that we have 200 tasks total. After > switching > > to 1.1 the task assignments are still wildly uneven. Some instances only > > have tasks from one of the topics. Furthermore, the instances keep dying > > due to org.apache.kafka.common.errors.NotLeaderForPartitionException: > This > > server is not the leader for that topic-partition. > > > > Is there something else we need to do to make this updated task > assignment > > work? > > > > Thanks! > > -russ > > > > > > > > On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > It's a know issue and we addressed it already via > > > https://issues.apache.org/jira/browse/KAFKA-4969 > > > > > > The fix will be part of upcoming 1.1 release, but you could try it out > > > immediately running from trunk or 1.0 branch. (If you do, feedback > would > > > be very welcome :)) > > > > > > Your proposed workarounds should work. I cannot come up with anything > > > else you could do, because the task assignment cannot be influenced. > > > > > > > > > -Matthias > > > > > > On 2/7/18 10:37 AM, Russell Teabeault wrote: > > > > We are using Kafka Streams for a project and had some questions about > > how > > > > stream tasks are assigned. > > > > > > > > streamBuilder > > > > .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde)) > > > > ... // Do some stuff here > > > > > > > > .through("intermediate-topic") > > > > ... // Do some other stuff here > > > > > > > > In this example we are streaming from "inbound-topic" and then doing > > some > > > > work before writing the results back out to "intermediate-topic". > > > > Then we are reading in from "intermediate-topic" and doing some more > > > work. > > > > If both of these topics contain 100 partitions (200 partitions total) > > > and I > > > > create 10 instances of my application then > > > > what I observe is that there are a total of 20 partitions assigned to > > > each > > > > instance. But the distribution of these partitions across the two > > topics > > > is > > > > not even. For example, one > > > > instance may have 7 partitions from "inbound-topic" and 13 partitions > > > from > > > > "intermediate-topic". I would have hoped that each instance would > have > > 10 > > > > partitions from each > > > > topic. Because of this uneven distribution it can make the resource > > > > characteristics from instance to instance very different. > > > > > > > > In a more concrete example we are reading from an input topic, then > > using > > > > an in-memory store to do some filtering, followed by a groupBy, and > > > finally > > > > doing an aggregate. > > > > This results in two topics; the input topic and then the internally > > > created > > > > intermediate topic written to by the groupBy and read from by the > > > > aggregation. What we see is that some > > > > instances are assigned far more partitions/tasks that are using the > > > > in-memory store and some instances that have very few and sometimes > no > > > > tasks that use the in-memory store. This leads to wildly > > > > different memory usage patterns across the instances. In turn this > > leads > > > us > > > > to set our memory much higher than needed if the partitions from each > > > topic > > > > were equally distributed across the instances. > > > > > > > > The two ways we have figured out how to deal with this problem are: > > > > 1. Use a new StreamBuilder anytime an intermediate topic is being > read > > > from > > > > in the application. > > > > 2. Break the topology into separate applications across the boundary > of > > > an > > > > intermediate topic. > > > > > > > > Neither of these seem like great solutions. So I would like to know: > > > > > > > > 1. Is this expected behavior? > > > > 2. Is there some technique to get equal distribution of > task/partition > > > > assignments across instances? > > > > > > > > Thanks for the help. > > > > > > > > -- > > > > Russell Teabeault | Senior Software Engineer | Twitter | @rusticules > > > > > > > > > > > > > > > > -- > > -- > > Russell Teabeault | Senior Software Engineer | Twitter | @rusticules > > > -- -- Russell Teabeault | Senior Software Engineer | Twitter | @rusticules