Did you start the app from scratch, ie, wipe out all state before you restarted with 1.1? If not, reusing existing stores would overrule a more balanced deployment.
You can set a new application.id or better use the reset tool to reset the application completely (maybe just calling KafkaStreams#cleanup(); would do the trick, too for your case). And yes, upgrading Streams API to 1.1 is fine -- no need to upgrade the brokers. -Matthias On 2/7/18 3:16 PM, Russell Teabeault wrote: > Bill, > > I may be able to. > > - What logging level? > - Do you need logs from all the instances? > - Where should I send them? > > -russ > > On Wed, Feb 7, 2018 at 4:12 PM, Bill Bejeck <b...@confluent.io> wrote: > >> Russell, >> >> Can you share any log files? >> >> Thanks, >> Bill >> >> >> >> On Wed, Feb 7, 2018 at 5:45 PM, Russell Teabeault < >> rteabea...@twitter.com.invalid> wrote: >> >>> Hi Matthias, >>> >>> Thanks for the prompt reply. We have built the kafka-streams jar from the >>> 1.1 branch and deployed our instances. We are only able to upgrade the >>> Kafka Streams to 1.1 >>> and can not upgrade to 1.1 for the brokers. I don't think that should >>> matter though. Yes? >>> >>> It does not seem to have helped. We currently have 25 instances with 4 >>> threads/instance. Our topology has two topics in it, each having 100 >>> partitions. The input topic feeds into a filtering step that uses an >>> in-memory store and that is output via groupBy to an intermediate topic. >>> The intermediate topic then feeds into an aggregation step which uses the >>> rocksDB store. So we can see that we have 200 tasks total. After >> switching >>> to 1.1 the task assignments are still wildly uneven. Some instances only >>> have tasks from one of the topics. Furthermore, the instances keep dying >>> due to org.apache.kafka.common.errors.NotLeaderForPartitionException: >> This >>> server is not the leader for that topic-partition. >>> >>> Is there something else we need to do to make this updated task >> assignment >>> work? >>> >>> Thanks! >>> -russ >>> >>> >>> >>> On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax <matth...@confluent.io> >>> wrote: >>> >>>> It's a know issue and we addressed it already via >>>> https://issues.apache.org/jira/browse/KAFKA-4969 >>>> >>>> The fix will be part of upcoming 1.1 release, but you could try it out >>>> immediately running from trunk or 1.0 branch. (If you do, feedback >> would >>>> be very welcome :)) >>>> >>>> Your proposed workarounds should work. I cannot come up with anything >>>> else you could do, because the task assignment cannot be influenced. >>>> >>>> >>>> -Matthias >>>> >>>> On 2/7/18 10:37 AM, Russell Teabeault wrote: >>>>> We are using Kafka Streams for a project and had some questions about >>> how >>>>> stream tasks are assigned. >>>>> >>>>> streamBuilder >>>>> .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde)) >>>>> ... // Do some stuff here >>>>> >>>>> .through("intermediate-topic") >>>>> ... // Do some other stuff here >>>>> >>>>> In this example we are streaming from "inbound-topic" and then doing >>> some >>>>> work before writing the results back out to "intermediate-topic". >>>>> Then we are reading in from "intermediate-topic" and doing some more >>>> work. >>>>> If both of these topics contain 100 partitions (200 partitions total) >>>> and I >>>>> create 10 instances of my application then >>>>> what I observe is that there are a total of 20 partitions assigned to >>>> each >>>>> instance. But the distribution of these partitions across the two >>> topics >>>> is >>>>> not even. For example, one >>>>> instance may have 7 partitions from "inbound-topic" and 13 partitions >>>> from >>>>> "intermediate-topic". I would have hoped that each instance would >> have >>> 10 >>>>> partitions from each >>>>> topic. Because of this uneven distribution it can make the resource >>>>> characteristics from instance to instance very different. >>>>> >>>>> In a more concrete example we are reading from an input topic, then >>> using >>>>> an in-memory store to do some filtering, followed by a groupBy, and >>>> finally >>>>> doing an aggregate. >>>>> This results in two topics; the input topic and then the internally >>>> created >>>>> intermediate topic written to by the groupBy and read from by the >>>>> aggregation. What we see is that some >>>>> instances are assigned far more partitions/tasks that are using the >>>>> in-memory store and some instances that have very few and sometimes >> no >>>>> tasks that use the in-memory store. This leads to wildly >>>>> different memory usage patterns across the instances. In turn this >>> leads >>>> us >>>>> to set our memory much higher than needed if the partitions from each >>>> topic >>>>> were equally distributed across the instances. >>>>> >>>>> The two ways we have figured out how to deal with this problem are: >>>>> 1. Use a new StreamBuilder anytime an intermediate topic is being >> read >>>> from >>>>> in the application. >>>>> 2. Break the topology into separate applications across the boundary >> of >>>> an >>>>> intermediate topic. >>>>> >>>>> Neither of these seem like great solutions. So I would like to know: >>>>> >>>>> 1. Is this expected behavior? >>>>> 2. Is there some technique to get equal distribution of >> task/partition >>>>> assignments across instances? >>>>> >>>>> Thanks for the help. >>>>> >>>>> -- >>>>> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules >>>>> >>>> >>>> >>> >>> >>> -- >>> -- >>> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules >>> >> > > >
signature.asc
Description: OpenPGP digital signature