Hi Matthias,

Thanks for the prompt reply. We have built the kafka-streams jar from the
1.1 branch and deployed our instances. We are only able to upgrade the
Kafka Streams to 1.1
and can not upgrade to 1.1 for the brokers. I don't think that should
matter though. Yes?

It does not seem to have helped. We currently have 25 instances with 4
threads/instance. Our topology has two topics in it, each having 100
partitions. The input topic feeds into a filtering step that uses an
in-memory store and that is output via groupBy to an intermediate topic.
The intermediate topic then feeds into an aggregation step which uses the
rocksDB store. So we can see that we have 200 tasks total. After switching
to 1.1 the task assignments are still wildly uneven. Some instances only
have tasks from one of the topics. Furthermore, the instances keep dying
due to org.apache.kafka.common.errors.NotLeaderForPartitionException: This
server is not the leader for that topic-partition.

Is there something else we need to do to make this updated task assignment
work?

Thanks!
-russ



On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> It's a know issue and we addressed it already via
> https://issues.apache.org/jira/browse/KAFKA-4969
>
> The fix will be part of upcoming 1.1 release, but you could try it out
> immediately running from trunk or 1.0 branch. (If you do, feedback would
> be very welcome :))
>
> Your proposed workarounds should work. I cannot come up with anything
> else you could do, because the task assignment cannot be influenced.
>
>
> -Matthias
>
> On 2/7/18 10:37 AM, Russell Teabeault wrote:
> > We are using Kafka Streams for a project and had some questions about how
> > stream tasks are assigned.
> >
> > streamBuilder
> >   .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde))
> >   ... // Do some stuff here
> >
> >   .through("intermediate-topic")
> >   ... // Do some other stuff here
> >
> > In this example we are streaming from "inbound-topic" and then doing some
> > work before writing the results back out to "intermediate-topic".
> > Then we are reading in from "intermediate-topic" and doing some more
> work.
> > If both of these topics contain 100 partitions (200 partitions total)
> and I
> > create 10 instances of my application then
> > what I observe is that there are a total of 20 partitions assigned to
> each
> > instance. But the distribution of these partitions across the two topics
> is
> > not even. For example, one
> > instance may have 7 partitions from "inbound-topic" and 13 partitions
> from
> > "intermediate-topic". I would have hoped that each instance would have 10
> > partitions from each
> > topic. Because of this uneven distribution it can make the resource
> > characteristics from instance to instance very different.
> >
> > In a more concrete example we are reading from an input topic, then using
> > an in-memory store to do some filtering, followed by a groupBy, and
> finally
> > doing an aggregate.
> > This results in two topics; the input topic and then the internally
> created
> > intermediate topic written to by the groupBy and read from by the
> > aggregation. What we see is that some
> > instances are assigned far more partitions/tasks that are using the
> > in-memory store and some instances that have very few and sometimes no
> > tasks that use the in-memory store. This leads to wildly
> > different memory usage patterns across the instances. In turn this leads
> us
> > to set our memory much higher than needed if the partitions from each
> topic
> > were equally distributed across the instances.
> >
> > The two ways we have figured out how to deal with this problem are:
> > 1. Use a new StreamBuilder anytime an intermediate topic is being read
> from
> > in the application.
> > 2. Break the topology into separate applications across the boundary of
> an
> > intermediate topic.
> >
> > Neither of these seem like great solutions. So I would like to know:
> >
> > 1. Is this expected behavior?
> > 2. Is there some technique to get equal distribution of task/partition
> > assignments across instances?
> >
> > Thanks for the help.
> >
> > --
> > Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
> >
>
>


-- 
-- 
Russell Teabeault | Senior Software Engineer | Twitter | @rusticules

Reply via email to