Matthias, Yes. We used the reset tool before deploying with 1.1.
-russ On Wed, Feb 7, 2018 at 4:23 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Did you start the app from scratch, ie, wipe out all state before you > restarted with 1.1? If not, reusing existing stores would overrule a > more balanced deployment. > > You can set a new application.id or better use the reset tool to reset > the application completely (maybe just calling KafkaStreams#cleanup(); > would do the trick, too for your case). > > And yes, upgrading Streams API to 1.1 is fine -- no need to upgrade the > brokers. > > > -Matthias > > On 2/7/18 3:16 PM, Russell Teabeault wrote: > > Bill, > > > > I may be able to. > > > > - What logging level? > > - Do you need logs from all the instances? > > - Where should I send them? > > > > -russ > > > > On Wed, Feb 7, 2018 at 4:12 PM, Bill Bejeck <b...@confluent.io> wrote: > > > >> Russell, > >> > >> Can you share any log files? > >> > >> Thanks, > >> Bill > >> > >> > >> > >> On Wed, Feb 7, 2018 at 5:45 PM, Russell Teabeault < > >> rteabea...@twitter.com.invalid> wrote: > >> > >>> Hi Matthias, > >>> > >>> Thanks for the prompt reply. We have built the kafka-streams jar from > the > >>> 1.1 branch and deployed our instances. We are only able to upgrade the > >>> Kafka Streams to 1.1 > >>> and can not upgrade to 1.1 for the brokers. I don't think that should > >>> matter though. Yes? > >>> > >>> It does not seem to have helped. We currently have 25 instances with 4 > >>> threads/instance. Our topology has two topics in it, each having 100 > >>> partitions. The input topic feeds into a filtering step that uses an > >>> in-memory store and that is output via groupBy to an intermediate > topic. > >>> The intermediate topic then feeds into an aggregation step which uses > the > >>> rocksDB store. So we can see that we have 200 tasks total. After > >> switching > >>> to 1.1 the task assignments are still wildly uneven. Some instances > only > >>> have tasks from one of the topics. Furthermore, the instances keep > dying > >>> due to org.apache.kafka.common.errors.NotLeaderForPartitionException: > >> This > >>> server is not the leader for that topic-partition. > >>> > >>> Is there something else we need to do to make this updated task > >> assignment > >>> work? > >>> > >>> Thanks! > >>> -russ > >>> > >>> > >>> > >>> On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax < > matth...@confluent.io> > >>> wrote: > >>> > >>>> It's a know issue and we addressed it already via > >>>> https://issues.apache.org/jira/browse/KAFKA-4969 > >>>> > >>>> The fix will be part of upcoming 1.1 release, but you could try it out > >>>> immediately running from trunk or 1.0 branch. (If you do, feedback > >> would > >>>> be very welcome :)) > >>>> > >>>> Your proposed workarounds should work. I cannot come up with anything > >>>> else you could do, because the task assignment cannot be influenced. > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> On 2/7/18 10:37 AM, Russell Teabeault wrote: > >>>>> We are using Kafka Streams for a project and had some questions about > >>> how > >>>>> stream tasks are assigned. > >>>>> > >>>>> streamBuilder > >>>>> .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde)) > >>>>> ... // Do some stuff here > >>>>> > >>>>> .through("intermediate-topic") > >>>>> ... // Do some other stuff here > >>>>> > >>>>> In this example we are streaming from "inbound-topic" and then doing > >>> some > >>>>> work before writing the results back out to "intermediate-topic". > >>>>> Then we are reading in from "intermediate-topic" and doing some more > >>>> work. > >>>>> If both of these topics contain 100 partitions (200 partitions total) > >>>> and I > >>>>> create 10 instances of my application then > >>>>> what I observe is that there are a total of 20 partitions assigned to > >>>> each > >>>>> instance. But the distribution of these partitions across the two > >>> topics > >>>> is > >>>>> not even. For example, one > >>>>> instance may have 7 partitions from "inbound-topic" and 13 partitions > >>>> from > >>>>> "intermediate-topic". I would have hoped that each instance would > >> have > >>> 10 > >>>>> partitions from each > >>>>> topic. Because of this uneven distribution it can make the resource > >>>>> characteristics from instance to instance very different. > >>>>> > >>>>> In a more concrete example we are reading from an input topic, then > >>> using > >>>>> an in-memory store to do some filtering, followed by a groupBy, and > >>>> finally > >>>>> doing an aggregate. > >>>>> This results in two topics; the input topic and then the internally > >>>> created > >>>>> intermediate topic written to by the groupBy and read from by the > >>>>> aggregation. What we see is that some > >>>>> instances are assigned far more partitions/tasks that are using the > >>>>> in-memory store and some instances that have very few and sometimes > >> no > >>>>> tasks that use the in-memory store. This leads to wildly > >>>>> different memory usage patterns across the instances. In turn this > >>> leads > >>>> us > >>>>> to set our memory much higher than needed if the partitions from each > >>>> topic > >>>>> were equally distributed across the instances. > >>>>> > >>>>> The two ways we have figured out how to deal with this problem are: > >>>>> 1. Use a new StreamBuilder anytime an intermediate topic is being > >> read > >>>> from > >>>>> in the application. > >>>>> 2. Break the topology into separate applications across the boundary > >> of > >>>> an > >>>>> intermediate topic. > >>>>> > >>>>> Neither of these seem like great solutions. So I would like to know: > >>>>> > >>>>> 1. Is this expected behavior? > >>>>> 2. Is there some technique to get equal distribution of > >> task/partition > >>>>> assignments across instances? > >>>>> > >>>>> Thanks for the help. > >>>>> > >>>>> -- > >>>>> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules > >>>>> > >>>> > >>>> > >>> > >>> > >>> -- > >>> -- > >>> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules > >>> > >> > > > > > > > > -- -- Russell Teabeault | Senior Software Engineer | Twitter | @rusticules