Russell,

INFO level is fine and it could be just the portion of the logs right after
streams has finished rebalancing.

You can tar them up and attach to this mailing list unless you'd prefer not
to do so, in which case I can send you my email address directly.

Thanks,
Bill

On Wed, Feb 7, 2018 at 6:16 PM, Russell Teabeault <
rteabea...@twitter.com.invalid> wrote:

> Bill,
>
> I may be able to.
>
> - What logging level?
> - Do you need logs from all the instances?
> - Where should I send them?
>
> -russ
>
> On Wed, Feb 7, 2018 at 4:12 PM, Bill Bejeck <b...@confluent.io> wrote:
>
> > Russell,
> >
> > Can you share any log files?
> >
> > Thanks,
> > Bill
> >
> >
> >
> > On Wed, Feb 7, 2018 at 5:45 PM, Russell Teabeault <
> > rteabea...@twitter.com.invalid> wrote:
> >
> > > Hi Matthias,
> > >
> > > Thanks for the prompt reply. We have built the kafka-streams jar from
> the
> > > 1.1 branch and deployed our instances. We are only able to upgrade the
> > > Kafka Streams to 1.1
> > > and can not upgrade to 1.1 for the brokers. I don't think that should
> > > matter though. Yes?
> > >
> > > It does not seem to have helped. We currently have 25 instances with 4
> > > threads/instance. Our topology has two topics in it, each having 100
> > > partitions. The input topic feeds into a filtering step that uses an
> > > in-memory store and that is output via groupBy to an intermediate
> topic.
> > > The intermediate topic then feeds into an aggregation step which uses
> the
> > > rocksDB store. So we can see that we have 200 tasks total. After
> > switching
> > > to 1.1 the task assignments are still wildly uneven. Some instances
> only
> > > have tasks from one of the topics. Furthermore, the instances keep
> dying
> > > due to org.apache.kafka.common.errors.NotLeaderForPartitionException:
> > This
> > > server is not the leader for that topic-partition.
> > >
> > > Is there something else we need to do to make this updated task
> > assignment
> > > work?
> > >
> > > Thanks!
> > > -russ
> > >
> > >
> > >
> > > On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > > > It's a know issue and we addressed it already via
> > > > https://issues.apache.org/jira/browse/KAFKA-4969
> > > >
> > > > The fix will be part of upcoming 1.1 release, but you could try it
> out
> > > > immediately running from trunk or 1.0 branch. (If you do, feedback
> > would
> > > > be very welcome :))
> > > >
> > > > Your proposed workarounds should work. I cannot come up with anything
> > > > else you could do, because the task assignment cannot be influenced.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 2/7/18 10:37 AM, Russell Teabeault wrote:
> > > > > We are using Kafka Streams for a project and had some questions
> about
> > > how
> > > > > stream tasks are assigned.
> > > > >
> > > > > streamBuilder
> > > > >   .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde))
> > > > >   ... // Do some stuff here
> > > > >
> > > > >   .through("intermediate-topic")
> > > > >   ... // Do some other stuff here
> > > > >
> > > > > In this example we are streaming from "inbound-topic" and then
> doing
> > > some
> > > > > work before writing the results back out to "intermediate-topic".
> > > > > Then we are reading in from "intermediate-topic" and doing some
> more
> > > > work.
> > > > > If both of these topics contain 100 partitions (200 partitions
> total)
> > > > and I
> > > > > create 10 instances of my application then
> > > > > what I observe is that there are a total of 20 partitions assigned
> to
> > > > each
> > > > > instance. But the distribution of these partitions across the two
> > > topics
> > > > is
> > > > > not even. For example, one
> > > > > instance may have 7 partitions from "inbound-topic" and 13
> partitions
> > > > from
> > > > > "intermediate-topic". I would have hoped that each instance would
> > have
> > > 10
> > > > > partitions from each
> > > > > topic. Because of this uneven distribution it can make the resource
> > > > > characteristics from instance to instance very different.
> > > > >
> > > > > In a more concrete example we are reading from an input topic, then
> > > using
> > > > > an in-memory store to do some filtering, followed by a groupBy, and
> > > > finally
> > > > > doing an aggregate.
> > > > > This results in two topics; the input topic and then the internally
> > > > created
> > > > > intermediate topic written to by the groupBy and read from by the
> > > > > aggregation. What we see is that some
> > > > > instances are assigned far more partitions/tasks that are using the
> > > > > in-memory store and some instances that have very few and sometimes
> > no
> > > > > tasks that use the in-memory store. This leads to wildly
> > > > > different memory usage patterns across the instances. In turn this
> > > leads
> > > > us
> > > > > to set our memory much higher than needed if the partitions from
> each
> > > > topic
> > > > > were equally distributed across the instances.
> > > > >
> > > > > The two ways we have figured out how to deal with this problem are:
> > > > > 1. Use a new StreamBuilder anytime an intermediate topic is being
> > read
> > > > from
> > > > > in the application.
> > > > > 2. Break the topology into separate applications across the
> boundary
> > of
> > > > an
> > > > > intermediate topic.
> > > > >
> > > > > Neither of these seem like great solutions. So I would like to
> know:
> > > > >
> > > > > 1. Is this expected behavior?
> > > > > 2. Is there some technique to get equal distribution of
> > task/partition
> > > > > assignments across instances?
> > > > >
> > > > > Thanks for the help.
> > > > >
> > > > > --
> > > > > Russell Teabeault | Senior Software Engineer | Twitter |
> @rusticules
> > > > >
> > > >
> > > >
> > >
> > >
> > > --
> > > --
> > > Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
> > >
> >
>
>
>
> --
> --
> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
>

Reply via email to