Matthias,

Disregard the exception I mentioned. I think that was a transient error
caused by our broker cluster re-spinning.

-russ

On Wed, Feb 7, 2018 at 3:45 PM, Russell Teabeault <rteabea...@twitter.com>
wrote:

> Hi Matthias,
>
> Thanks for the prompt reply. We have built the kafka-streams jar from the
> 1.1 branch and deployed our instances. We are only able to upgrade the
> Kafka Streams to 1.1
> and can not upgrade to 1.1 for the brokers. I don't think that should
> matter though. Yes?
>
> It does not seem to have helped. We currently have 25 instances with 4
> threads/instance. Our topology has two topics in it, each having 100
> partitions. The input topic feeds into a filtering step that uses an
> in-memory store and that is output via groupBy to an intermediate topic.
> The intermediate topic then feeds into an aggregation step which uses the
> rocksDB store. So we can see that we have 200 tasks total. After switching
> to 1.1 the task assignments are still wildly uneven. Some instances only
> have tasks from one of the topics. Furthermore, the instances keep dying
> due to org.apache.kafka.common.errors.NotLeaderForPartitionException:
> This server is not the leader for that topic-partition.
>
> Is there something else we need to do to make this updated task assignment
> work?
>
> Thanks!
> -russ
>
>
>
> On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
>> It's a know issue and we addressed it already via
>> https://issues.apache.org/jira/browse/KAFKA-4969
>>
>> The fix will be part of upcoming 1.1 release, but you could try it out
>> immediately running from trunk or 1.0 branch. (If you do, feedback would
>> be very welcome :))
>>
>> Your proposed workarounds should work. I cannot come up with anything
>> else you could do, because the task assignment cannot be influenced.
>>
>>
>> -Matthias
>>
>> On 2/7/18 10:37 AM, Russell Teabeault wrote:
>> > We are using Kafka Streams for a project and had some questions about
>> how
>> > stream tasks are assigned.
>> >
>> > streamBuilder
>> >   .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde))
>> >   ... // Do some stuff here
>> >
>> >   .through("intermediate-topic")
>> >   ... // Do some other stuff here
>> >
>> > In this example we are streaming from "inbound-topic" and then doing
>> some
>> > work before writing the results back out to "intermediate-topic".
>> > Then we are reading in from "intermediate-topic" and doing some more
>> work.
>> > If both of these topics contain 100 partitions (200 partitions total)
>> and I
>> > create 10 instances of my application then
>> > what I observe is that there are a total of 20 partitions assigned to
>> each
>> > instance. But the distribution of these partitions across the two
>> topics is
>> > not even. For example, one
>> > instance may have 7 partitions from "inbound-topic" and 13 partitions
>> from
>> > "intermediate-topic". I would have hoped that each instance would have
>> 10
>> > partitions from each
>> > topic. Because of this uneven distribution it can make the resource
>> > characteristics from instance to instance very different.
>> >
>> > In a more concrete example we are reading from an input topic, then
>> using
>> > an in-memory store to do some filtering, followed by a groupBy, and
>> finally
>> > doing an aggregate.
>> > This results in two topics; the input topic and then the internally
>> created
>> > intermediate topic written to by the groupBy and read from by the
>> > aggregation. What we see is that some
>> > instances are assigned far more partitions/tasks that are using the
>> > in-memory store and some instances that have very few and sometimes no
>> > tasks that use the in-memory store. This leads to wildly
>> > different memory usage patterns across the instances. In turn this
>> leads us
>> > to set our memory much higher than needed if the partitions from each
>> topic
>> > were equally distributed across the instances.
>> >
>> > The two ways we have figured out how to deal with this problem are:
>> > 1. Use a new StreamBuilder anytime an intermediate topic is being read
>> from
>> > in the application.
>> > 2. Break the topology into separate applications across the boundary of
>> an
>> > intermediate topic.
>> >
>> > Neither of these seem like great solutions. So I would like to know:
>> >
>> > 1. Is this expected behavior?
>> > 2. Is there some technique to get equal distribution of task/partition
>> > assignments across instances?
>> >
>> > Thanks for the help.
>> >
>> > --
>> > Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
>> >
>>
>>
>
>
> --
> --
> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
>



-- 
-- 
Russell Teabeault | Senior Software Engineer | Twitter | @rusticules

Reply via email to