Did you start the app from scratch, ie, wipe out all state before you
restarted with 1.1? If not, reusing existing stores would overrule a
more balanced deployment.

You can set a new application.id or better use the reset tool to reset
the application completely (maybe just calling KafkaStreams#cleanup();
would do the trick, too for your case).

And yes, upgrading Streams API to 1.1 is fine -- no need to upgrade the
brokers.


-Matthias

On 2/7/18 3:16 PM, Russell Teabeault wrote:
> Bill,
> 
> I may be able to.
> 
> - What logging level?
> - Do you need logs from all the instances?
> - Where should I send them?
> 
> -russ
> 
> On Wed, Feb 7, 2018 at 4:12 PM, Bill Bejeck <b...@confluent.io> wrote:
> 
>> Russell,
>>
>> Can you share any log files?
>>
>> Thanks,
>> Bill
>>
>>
>>
>> On Wed, Feb 7, 2018 at 5:45 PM, Russell Teabeault <
>> rteabea...@twitter.com.invalid> wrote:
>>
>>> Hi Matthias,
>>>
>>> Thanks for the prompt reply. We have built the kafka-streams jar from the
>>> 1.1 branch and deployed our instances. We are only able to upgrade the
>>> Kafka Streams to 1.1
>>> and can not upgrade to 1.1 for the brokers. I don't think that should
>>> matter though. Yes?
>>>
>>> It does not seem to have helped. We currently have 25 instances with 4
>>> threads/instance. Our topology has two topics in it, each having 100
>>> partitions. The input topic feeds into a filtering step that uses an
>>> in-memory store and that is output via groupBy to an intermediate topic.
>>> The intermediate topic then feeds into an aggregation step which uses the
>>> rocksDB store. So we can see that we have 200 tasks total. After
>> switching
>>> to 1.1 the task assignments are still wildly uneven. Some instances only
>>> have tasks from one of the topics. Furthermore, the instances keep dying
>>> due to org.apache.kafka.common.errors.NotLeaderForPartitionException:
>> This
>>> server is not the leader for that topic-partition.
>>>
>>> Is there something else we need to do to make this updated task
>> assignment
>>> work?
>>>
>>> Thanks!
>>> -russ
>>>
>>>
>>>
>>> On Wed, Feb 7, 2018 at 12:33 PM, Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>
>>>> It's a know issue and we addressed it already via
>>>> https://issues.apache.org/jira/browse/KAFKA-4969
>>>>
>>>> The fix will be part of upcoming 1.1 release, but you could try it out
>>>> immediately running from trunk or 1.0 branch. (If you do, feedback
>> would
>>>> be very welcome :))
>>>>
>>>> Your proposed workarounds should work. I cannot come up with anything
>>>> else you could do, because the task assignment cannot be influenced.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 2/7/18 10:37 AM, Russell Teabeault wrote:
>>>>> We are using Kafka Streams for a project and had some questions about
>>> how
>>>>> stream tasks are assigned.
>>>>>
>>>>> streamBuilder
>>>>>   .stream("inbound-topic", Consumed.`with`(keySerde, valueSerde))
>>>>>   ... // Do some stuff here
>>>>>
>>>>>   .through("intermediate-topic")
>>>>>   ... // Do some other stuff here
>>>>>
>>>>> In this example we are streaming from "inbound-topic" and then doing
>>> some
>>>>> work before writing the results back out to "intermediate-topic".
>>>>> Then we are reading in from "intermediate-topic" and doing some more
>>>> work.
>>>>> If both of these topics contain 100 partitions (200 partitions total)
>>>> and I
>>>>> create 10 instances of my application then
>>>>> what I observe is that there are a total of 20 partitions assigned to
>>>> each
>>>>> instance. But the distribution of these partitions across the two
>>> topics
>>>> is
>>>>> not even. For example, one
>>>>> instance may have 7 partitions from "inbound-topic" and 13 partitions
>>>> from
>>>>> "intermediate-topic". I would have hoped that each instance would
>> have
>>> 10
>>>>> partitions from each
>>>>> topic. Because of this uneven distribution it can make the resource
>>>>> characteristics from instance to instance very different.
>>>>>
>>>>> In a more concrete example we are reading from an input topic, then
>>> using
>>>>> an in-memory store to do some filtering, followed by a groupBy, and
>>>> finally
>>>>> doing an aggregate.
>>>>> This results in two topics; the input topic and then the internally
>>>> created
>>>>> intermediate topic written to by the groupBy and read from by the
>>>>> aggregation. What we see is that some
>>>>> instances are assigned far more partitions/tasks that are using the
>>>>> in-memory store and some instances that have very few and sometimes
>> no
>>>>> tasks that use the in-memory store. This leads to wildly
>>>>> different memory usage patterns across the instances. In turn this
>>> leads
>>>> us
>>>>> to set our memory much higher than needed if the partitions from each
>>>> topic
>>>>> were equally distributed across the instances.
>>>>>
>>>>> The two ways we have figured out how to deal with this problem are:
>>>>> 1. Use a new StreamBuilder anytime an intermediate topic is being
>> read
>>>> from
>>>>> in the application.
>>>>> 2. Break the topology into separate applications across the boundary
>> of
>>>> an
>>>>> intermediate topic.
>>>>>
>>>>> Neither of these seem like great solutions. So I would like to know:
>>>>>
>>>>> 1. Is this expected behavior?
>>>>> 2. Is there some technique to get equal distribution of
>> task/partition
>>>>> assignments across instances?
>>>>>
>>>>> Thanks for the help.
>>>>>
>>>>> --
>>>>> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> --
>>> Russell Teabeault | Senior Software Engineer | Twitter | @rusticules
>>>
>>
> 
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to