Matthias,

Yes. We used the reset tool before deploying with 1.1.

-russ

On Wed, Feb 7, 2018 at 4:23 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Did you start the app from scratch, ie, wipe out all state before you
> restarted with 1.1? If not, reusing existing stores would overrule a
> more balanced deployment.
>
> You can set a new application.id or better use the reset tool to reset
> the application completely (maybe just calling KafkaStreams#cleanup();
> would do the trick, too for your case).
>
> And yes, upgrading Streams API to 1.1 is fine -- no need to upgrade the
> brokers.
>
>
> -Matthias
>
> On 2/7/18 3:16 PM, Russell Teabeault wrote:
> > Bill,
> >
> > I may be able to.
> >
> > - What logging level?
> > - Do you need logs from all the instances?
> > - Where should I send them?
> >
> > -russ
> >
> > On Wed, Feb 7, 2018 at 4:12 PM, Bill Bejeck <b...@confluent.io> wrote:
> >
> >> Russell,
> >>
> >> Can you share any log files?
> >>
> >> Thanks,
> >> Bill
> >>
> >>
> >>
> >> On Wed, Feb 7, 2018 at 5:45 PM, Russell Teabeault <
> >> rteabea...@twitter.com.invalid> wrote:
> >>
> >>> Hi Matthias,
> >>>
> >>> Thanks for the prompt reply. We have built the kafka-streams jar from
> the
> >>> 1.1 branch and deployed our instances. We are only able to upgrade the
> >>> Kafka Streams to 1.1
> >>> and can not upgrade to 1.1 for the brokers. I don't think that should
> >>> matter though. Yes?
> >>>
> >>> It does not seem to have helped. We currently have 25 instances with 4
> >>> threads/instance. Our topology has two topics in it, each having 100
> >>> partitions. The input topic feeds into a filtering step that uses an
> >>> in-memory store and that is output via groupBy to an intermediate
> topic.
> >>> The intermediate topic then feeds into an aggregation step which uses
> the
> >>> rocksDB store. So we can see that we have 200 tasks total. After
> >> switching
> >>> to 1.1 the task assignments are still wildly uneven. Some instances
> only
> >>> have tasks from one of the topics. Furthermore, the instances keep
> dying
> >>> due to org.apache.kafka.common.errors.NotLeaderForPartitionException:
> >> This
> >>> server is not the leader for that topic-partition.
> >>>
> >>> Is there something else we need to do to make this updated task
> >> assignment
> >>> work?
> >>>
> >>> Thanks!
> >>> -russ
> >>>
> >>>
> >>>
> >>> On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax <
> matth...@confluent.io>
> >>> wrote:
> >>>
> >>>> It's a know issue and we addressed it already via
> >>>> https://issues.apache.org/jira/browse/KAFKA-4969
> >>>>
> >>>> The fix will be part of upcoming 1.1 release, but you could try it out
> >>>> immediately running from trunk or 1.0 branch. (If you do, feedback
> >> would
> >>>> be very welcome :))
> >>>>
> >>>> Your proposed workarounds should work. I cannot come up with anything
> >>>> else you could do, because the task assignment cannot be influenced.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 2/7/18 10:37 AM, Russell Teabeault wrote:
> >>>>> We are using Kafka Streams for a project and had some questions about
> >>> how
> >>>>> stream tasks are assigned.
> >>>>>
> >>>>> streamBuilder
> >>>>>   .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde))
> >>>>>   ... // Do some stuff here
> >>>>>
> >>>>>   .through("intermediate-topic")
> >>>>>   ... // Do some other stuff here
> >>>>>
> >>>>> In this example we are streaming from "inbound-topic" and then doing
> >>> some
> >>>>> work before writing the results back out to "intermediate-topic".
> >>>>> Then we are reading in from "intermediate-topic" and doing some more
> >>>> work.
> >>>>> If both of these topics contain 100 partitions (200 partitions total)
> >>>> and I
> >>>>> create 10 instances of my application then
> >>>>> what I observe is that there are a total of 20 partitions assigned to
> >>>> each
> >>>>> instance. But the distribution of these partitions across the two
> >>> topics
> >>>> is
> >>>>> not even. For example, one
> >>>>> instance may have 7 partitions from "inbound-topic" and 13 partitions
> >>>> from
> >>>>> "intermediate-topic". I would have hoped that each instance would
> >> have
> >>> 10
> >>>>> partitions from each
> >>>>> topic. Because of this uneven distribution it can make the resource
> >>>>> characteristics from instance to instance very different.
> >>>>>
> >>>>> In a more concrete example we are reading from an input topic, then
> >>> using
> >>>>> an in-memory store to do some filtering, followed by a groupBy, and
> >>>> finally
> >>>>> doing an aggregate.
> >>>>> This results in two topics; the input topic and then the internally
> >>>> created
> >>>>> intermediate topic written to by the groupBy and read from by the
> >>>>> aggregation. What we see is that some
> >>>>> instances are assigned far more partitions/tasks that are using the
> >>>>> in-memory store and some instances that have very few and sometimes
> >> no
> >>>>> tasks that use the in-memory store. This leads to wildly
> >>>>> different memory usage patterns across the instances. In turn this
> >>> leads
> >>>> us
> >>>>> to set our memory much higher than needed if the partitions from each
> >>>> topic
> >>>>> were equally distributed across the instances.
> >>>>>
> >>>>> The two ways we have figured out how to deal with this problem are:
> >>>>> 1. Use a new StreamBuilder anytime an intermediate topic is being
> >> read
> >>>> from
> >>>>> in the application.
> >>>>> 2. Break the topology into separate applications across the boundary
> >> of
> >>>> an
> >>>>> intermediate topic.
> >>>>>
> >>>>> Neither of these seem like great solutions. So I would like to know:
> >>>>>
> >>>>> 1. Is this expected behavior?
> >>>>> 2. Is there some technique to get equal distribution of
> >> task/partition
> >>>>> assignments across instances?
> >>>>>
> >>>>> Thanks for the help.
> >>>>>
> >>>>> --
> >>>>> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
> >>>>>
> >>>>
> >>>>
> >>>
> >>>
> >>> --
> >>> --
> >>> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
> >>>
> >>
> >
> >
> >
>
>


-- 
-- 
Russell Teabeault | Senior Software Engineer | Twitter | @rusticules

Reply via email to