Bill,

I may be able to.

- What logging level?
- Do you need logs from all the instances?
- Where should I send them?

-russ

On Wed, Feb 7, 2018 at 4:12 PM, Bill Bejeck <b...@confluent.io> wrote:

> Russell,
>
> Can you share any log files?
>
> Thanks,
> Bill
>
>
>
> On Wed, Feb 7, 2018 at 5:45 PM, Russell Teabeault <
> rteabea...@twitter.com.invalid> wrote:
>
> > Hi Matthias,
> >
> > Thanks for the prompt reply. We have built the kafka-streams jar from the
> > 1.1 branch and deployed our instances. We are only able to upgrade the
> > Kafka Streams to 1.1
> > and can not upgrade to 1.1 for the brokers. I don't think that should
> > matter though. Yes?
> >
> > It does not seem to have helped. We currently have 25 instances with 4
> > threads/instance. Our topology has two topics in it, each having 100
> > partitions. The input topic feeds into a filtering step that uses an
> > in-memory store and that is output via groupBy to an intermediate topic.
> > The intermediate topic then feeds into an aggregation step which uses the
> > rocksDB store. So we can see that we have 200 tasks total. After
> switching
> > to 1.1 the task assignments are still wildly uneven. Some instances only
> > have tasks from one of the topics. Furthermore, the instances keep dying
> > due to org.apache.kafka.common.errors.NotLeaderForPartitionException:
> This
> > server is not the leader for that topic-partition.
> >
> > Is there something else we need to do to make this updated task
> assignment
> > work?
> >
> > Thanks!
> > -russ
> >
> >
> >
> > On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > It's a know issue and we addressed it already via
> > > https://issues.apache.org/jira/browse/KAFKA-4969
> > >
> > > The fix will be part of upcoming 1.1 release, but you could try it out
> > > immediately running from trunk or 1.0 branch. (If you do, feedback
> would
> > > be very welcome :))
> > >
> > > Your proposed workarounds should work. I cannot come up with anything
> > > else you could do, because the task assignment cannot be influenced.
> > >
> > >
> > > -Matthias
> > >
> > > On 2/7/18 10:37 AM, Russell Teabeault wrote:
> > > > We are using Kafka Streams for a project and had some questions about
> > how
> > > > stream tasks are assigned.
> > > >
> > > > streamBuilder
> > > >   .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde))
> > > >   ... // Do some stuff here
> > > >
> > > >   .through("intermediate-topic")
> > > >   ... // Do some other stuff here
> > > >
> > > > In this example we are streaming from "inbound-topic" and then doing
> > some
> > > > work before writing the results back out to "intermediate-topic".
> > > > Then we are reading in from "intermediate-topic" and doing some more
> > > work.
> > > > If both of these topics contain 100 partitions (200 partitions total)
> > > and I
> > > > create 10 instances of my application then
> > > > what I observe is that there are a total of 20 partitions assigned to
> > > each
> > > > instance. But the distribution of these partitions across the two
> > topics
> > > is
> > > > not even. For example, one
> > > > instance may have 7 partitions from "inbound-topic" and 13 partitions
> > > from
> > > > "intermediate-topic". I would have hoped that each instance would
> have
> > 10
> > > > partitions from each
> > > > topic. Because of this uneven distribution it can make the resource
> > > > characteristics from instance to instance very different.
> > > >
> > > > In a more concrete example we are reading from an input topic, then
> > using
> > > > an in-memory store to do some filtering, followed by a groupBy, and
> > > finally
> > > > doing an aggregate.
> > > > This results in two topics; the input topic and then the internally
> > > created
> > > > intermediate topic written to by the groupBy and read from by the
> > > > aggregation. What we see is that some
> > > > instances are assigned far more partitions/tasks that are using the
> > > > in-memory store and some instances that have very few and sometimes
> no
> > > > tasks that use the in-memory store. This leads to wildly
> > > > different memory usage patterns across the instances. In turn this
> > leads
> > > us
> > > > to set our memory much higher than needed if the partitions from each
> > > topic
> > > > were equally distributed across the instances.
> > > >
> > > > The two ways we have figured out how to deal with this problem are:
> > > > 1. Use a new StreamBuilder anytime an intermediate topic is being
> read
> > > from
> > > > in the application.
> > > > 2. Break the topology into separate applications across the boundary
> of
> > > an
> > > > intermediate topic.
> > > >
> > > > Neither of these seem like great solutions. So I would like to know:
> > > >
> > > > 1. Is this expected behavior?
> > > > 2. Is there some technique to get equal distribution of
> task/partition
> > > > assignments across instances?
> > > >
> > > > Thanks for the help.
> > > >
> > > > --
> > > > Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
> > > >
> > >
> > >
> >
> >
> > --
> > --
> > Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
> >
>



-- 
-- 
Russell Teabeault | Senior Software Engineer | Twitter | @rusticules

Reply via email to