Re: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade

2018-03-22 Thread Matthias J. Sax
Yes, absolutely.

I'll reply to the other thread.


-Matthias

On 3/21/18 8:45 PM, Richard Yu wrote:
> Hi Matthias,
> 
> Just wondering, once this KIP goes through. Could I restart my older KIP
> to update SubscriptionInfo?
> 
> Thanks
> Richard
> 
> On Wed, Mar 21, 2018 at 11:18 AM, Matthias J. Sax 
> wrote:
> 
>> Thanks for following up James.
>>
>>> Is this the procedure that happens during every rebalance? The reason I
>> ask is that this step:
>> As long as the leader (before or after upgrade) receives at least
>> one old version X Subscription it always sends version Assignment X back
>> (the encoded supported version is X before the leader is upgrade and Y
>> after the leader is upgraded).
>>
>> Yes, that would be the consequence.
>>
>>> This implies that the leader receives all Subscriptions before sending
>> back any responses. Is that what actually happens? Is it possible that it
>> would receive say 4 out of 5 Subscriptions of Y, send back a response Y,
>> and then later receive a Subscription X? What happens in that case? Would
>> that Subscription X then trigger another rebalance, and the whole thing
>> starts again?
>>
>> That sounds correct. A 'delayed' Subscription could always happen --
>> even before KIP-268 -- and would trigger a new rebalance. With this
>> regard, the behavior does not change. The difference is, that we would
>> automatically downgrade the Assignment from Y to X again -- but the
>> application would not fail (as it would before the KIP).
>>
>> Do you see an issue with this behavior. The idea of the design is to
>> make Kafka Streams robust against those scenarios. Thus, if 4 apps are
>> upgraded but no.5 is not yet and no.5 is late, Kafka Streams would first
>> upgrade from X to Y and downgrade from Y to X in the second rebalance
>> when no.5 joins the group. If no.5 gets upgraded, a third rebalance
>> would upgrade to Y again.
>>
>> Thus, as long as not all instances are on the newest version,
>> upgrades/donwgrades of the exchanged rebalance metadata could happen
>> multiple times. However, this should not be an issue from my understanding.
>>
>>
>> Let us know what you think about it.
>>
>>
>> -Matthias
>>
>>
>> On 3/20/18 11:10 PM, James Cheng wrote:
>>> Sorry, I see that the VOTE started already, but I have a late question
>> on this KIP.
>>>
>>> In the "version probing" protocol:
 Detailed upgrade protocol from metadata version X to Y (with X >= 1.2):
 On startup/rolling-bounce, an instance does not know what version the
>> leader understands and (optimistically) sends an Subscription with the
>> latest version Y
 (Old, ie, not yet upgraded) Leader sends empty Assignment back to the
>> corresponding instance that sent the newer Subscription it does not
>> understand. The Assignment metadata only encodes both version numbers
>> (used-version == supported-version) as leader's supported-version X.
 For all other instances the leader sends a regular Assignment in
>> version X back.
 If an upgrade follower sends new version number Y Subscription and
>> receives version X  Assignment with "supported-version = X", it can
>> downgrade to X (in-memory flag) and resends a new Subscription with old
>> version X to retry joining the group. To force an immediate second
>> rebalance, the follower does an "unsubscribe()/subscribe()/poll()"
>> sequence.
 As long as the leader (before or after upgrade) receives at least one
>> old version X Subscription it always sends version Assignment X back (the
>> encoded supported version is X before the leader is upgrade and Y after the
>> leader is upgraded).
 If an upgraded instance receives an Assigment it always checks the
>> leaders supported-version and update its downgraded "used-version" if
>> possible
>>>
>>> Is this the procedure that happens during every rebalance? The reason I
>> ask is that this step:
> As long as the leader (before or after upgrade) receives at least one
>> old version X Subscription it always sends version Assignment X back (the
>> encoded supported version is X before the leader is upgrade and Y after the
>> leader is upgraded).
>>>
>>> This implies that the leader receives all Subscriptions before sending
>> back any responses. Is that what actually happens? Is it possible that it
>> would receive say 4 out of 5 Subscriptions of Y, send back a response Y,
>> and then later receive a Subscription X? What happens in that case? Would
>> that Subscription X then trigger another rebalance, and the whole thing
>> starts again?
>>>
>>> Thanks,
>>> -James
>>>
 On Mar 19, 2018, at 5:04 PM, Matthias J. Sax 
>> wrote:

 Guozhang,

 thanks for your comments.

 2: I think my main concern is, that 1.2 would be "special" release that
 everybody need to use to upgrade. As an alternative, we could say that
 we add the config in 1.2 and keep it for 2 additional releases (1.3 and
 1.4) but remove it in 

Re: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade

2018-03-22 Thread Matthias J. Sax
Sure.

Glad you like this KIP. (KIP-258 will be more difficult... It seems that
is was a good decision into two).

-Matthias


On 3/22/18 11:35 AM, James Cheng wrote:
> 
> 
>> On Mar 21, 2018, at 11:45 PM, Matthias J. Sax  wrote:
>>
>> Yes, it only affects the metadata. KIP-268 targets metadata upgrade
>> without store upgrade.
>>
>> We can discuss store upgrade further in KIP-258: I think in general, the
>> upgrade/downgrade behavior might be an issue for upgrading stores.
>> However, this upgrade/downgrade can only happen when upgrading from 1.2
>> to a future version. Thus, it won't affect an upgrade to 1.2.
>>
>> For an upgrade to 1.2, we introduce the "upgrade.from" parameter
>> (because we don't have "version probing" for 1.1 yet) and this ensures
>> that upgrading cannot happen "too early", and no downgrade can happen
>> either for this case.
>>
>> Let me know what you think.
>>
> 
> I think yes, we can discuss upgrade/downgrade issues (to versions after 1.2) 
> in the other KIP (KIP-258).
> 
> However, this KIP-268 looks fine. It gives us the mechanism to properly 
> detect and automatically upgrade/downgrade the topology and allows the 
> new/old code to co-exist within a topology, which is something we didn't have 
> before.
> 
> KIP-268 looks good to me.
> 
> Thanks for all the answers to my questions.
> 
> -James
> 
>>
>> -Matthias
>>
>> On 3/21/18 11:16 PM, James Cheng wrote:
>>>
>>>
>>>
 On Mar 21, 2018, at 11:18 AM, Matthias J. Sax  
 wrote:

 Thanks for following up James.

> Is this the procedure that happens during every rebalance? The reason I 
> ask is that this step:
 As long as the leader (before or after upgrade) receives at least
 one old version X Subscription it always sends version Assignment X back
 (the encoded supported version is X before the leader is upgrade and Y
 after the leader is upgraded).

 Yes, that would be the consequence.

> This implies that the leader receives all Subscriptions before sending 
> back any responses. Is that what actually happens? Is it possible that it 
> would receive say 4 out of 5 Subscriptions of Y, send back a response Y, 
> and then later receive a Subscription X? What happens in that case? Would 
> that Subscription X then trigger another rebalance, and the whole thing 
> starts again?

 That sounds correct. A 'delayed' Subscription could always happen --
 even before KIP-268 -- and would trigger a new rebalance. With this
 regard, the behavior does not change. The difference is, that we would
 automatically downgrade the Assignment from Y to X again -- but the
 application would not fail (as it would before the KIP).

 Do you see an issue with this behavior. The idea of the design is to
 make Kafka Streams robust against those scenarios. Thus, if 4 apps are
 upgraded but no.5 is not yet and no.5 is late, Kafka Streams would first
 upgrade from X to Y and downgrade from Y to X in the second rebalance
 when no.5 joins the group. If no.5 gets upgraded, a third rebalance
 would upgrade to Y again.

>>>
>>> Sounds good. 
>>>
>>>
 Thus, as long as not all instances are on the newest version,
 upgrades/donwgrades of the exchanged rebalance metadata could happen
 multiple times. However, this should not be an issue from my understanding.
>>>
>>> About “this should not be an issue”: this upgrade/downgrade is just about 
>>> the rebalance metadata, right? Are there other associated things that will 
>>> also have to upgrade/downgrade in sync with the rebalance metadata? For 
>>> example, the idea for this KIP originally came up during the discussion 
>>> about adding timestamps to RockDB state stores, which required updating the 
>>> on-disk schema. In the case of an updated RocksDB state store but with a 
>>> downgraded rebalance metadata... that should work, right? Because we still 
>>> have updated code (which understands the on-disk format) but that it simply 
>>> gets its partition assignments via the downgraded rebalance metadata?
>>>
>>> Thanks,
>>> -James
>>>
>>> Sent from my iPhone
>>>
 Let us know what you think about it.


 -Matthias


> On 3/20/18 11:10 PM, James Cheng wrote:
> Sorry, I see that the VOTE started already, but I have a late question on 
> this KIP.
>
> In the "version probing" protocol:
>> Detailed upgrade protocol from metadata version X to Y (with X >= 1.2):
>> On startup/rolling-bounce, an instance does not know what version the 
>> leader understands and (optimistically) sends an Subscription with the 
>> latest version Y
>> (Old, ie, not yet upgraded) Leader sends empty Assignment back to the 
>> corresponding instance that sent the newer Subscription it does not 
>> understand. The Assignment metadata only encodes both version numbers 
>> 

Re: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade

2018-03-22 Thread James Cheng


> On Mar 21, 2018, at 11:45 PM, Matthias J. Sax  wrote:
> 
> Yes, it only affects the metadata. KIP-268 targets metadata upgrade
> without store upgrade.
> 
> We can discuss store upgrade further in KIP-258: I think in general, the
> upgrade/downgrade behavior might be an issue for upgrading stores.
> However, this upgrade/downgrade can only happen when upgrading from 1.2
> to a future version. Thus, it won't affect an upgrade to 1.2.
> 
> For an upgrade to 1.2, we introduce the "upgrade.from" parameter
> (because we don't have "version probing" for 1.1 yet) and this ensures
> that upgrading cannot happen "too early", and no downgrade can happen
> either for this case.
> 
> Let me know what you think.
> 

I think yes, we can discuss upgrade/downgrade issues (to versions after 1.2) in 
the other KIP (KIP-258).

However, this KIP-268 looks fine. It gives us the mechanism to properly detect 
and automatically upgrade/downgrade the topology and allows the new/old code to 
co-exist within a topology, which is something we didn't have before.

KIP-268 looks good to me.

Thanks for all the answers to my questions.

-James

> 
> -Matthias
> 
> On 3/21/18 11:16 PM, James Cheng wrote:
>> 
>> 
>> 
>>> On Mar 21, 2018, at 11:18 AM, Matthias J. Sax  wrote:
>>> 
>>> Thanks for following up James.
>>> 
 Is this the procedure that happens during every rebalance? The reason I 
 ask is that this step:
>>> As long as the leader (before or after upgrade) receives at least
>>> one old version X Subscription it always sends version Assignment X back
>>> (the encoded supported version is X before the leader is upgrade and Y
>>> after the leader is upgraded).
>>> 
>>> Yes, that would be the consequence.
>>> 
 This implies that the leader receives all Subscriptions before sending 
 back any responses. Is that what actually happens? Is it possible that it 
 would receive say 4 out of 5 Subscriptions of Y, send back a response Y, 
 and then later receive a Subscription X? What happens in that case? Would 
 that Subscription X then trigger another rebalance, and the whole thing 
 starts again?
>>> 
>>> That sounds correct. A 'delayed' Subscription could always happen --
>>> even before KIP-268 -- and would trigger a new rebalance. With this
>>> regard, the behavior does not change. The difference is, that we would
>>> automatically downgrade the Assignment from Y to X again -- but the
>>> application would not fail (as it would before the KIP).
>>> 
>>> Do you see an issue with this behavior. The idea of the design is to
>>> make Kafka Streams robust against those scenarios. Thus, if 4 apps are
>>> upgraded but no.5 is not yet and no.5 is late, Kafka Streams would first
>>> upgrade from X to Y and downgrade from Y to X in the second rebalance
>>> when no.5 joins the group. If no.5 gets upgraded, a third rebalance
>>> would upgrade to Y again.
>>> 
>> 
>> Sounds good. 
>> 
>> 
>>> Thus, as long as not all instances are on the newest version,
>>> upgrades/donwgrades of the exchanged rebalance metadata could happen
>>> multiple times. However, this should not be an issue from my understanding.
>> 
>> About “this should not be an issue”: this upgrade/downgrade is just about 
>> the rebalance metadata, right? Are there other associated things that will 
>> also have to upgrade/downgrade in sync with the rebalance metadata? For 
>> example, the idea for this KIP originally came up during the discussion 
>> about adding timestamps to RockDB state stores, which required updating the 
>> on-disk schema. In the case of an updated RocksDB state store but with a 
>> downgraded rebalance metadata... that should work, right? Because we still 
>> have updated code (which understands the on-disk format) but that it simply 
>> gets its partition assignments via the downgraded rebalance metadata?
>> 
>> Thanks,
>> -James
>> 
>> Sent from my iPhone
>> 
>>> Let us know what you think about it.
>>> 
>>> 
>>> -Matthias
>>> 
>>> 
 On 3/20/18 11:10 PM, James Cheng wrote:
 Sorry, I see that the VOTE started already, but I have a late question on 
 this KIP.
 
 In the "version probing" protocol:
> Detailed upgrade protocol from metadata version X to Y (with X >= 1.2):
> On startup/rolling-bounce, an instance does not know what version the 
> leader understands and (optimistically) sends an Subscription with the 
> latest version Y
> (Old, ie, not yet upgraded) Leader sends empty Assignment back to the 
> corresponding instance that sent the newer Subscription it does not 
> understand. The Assignment metadata only encodes both version numbers 
> (used-version == supported-version) as leader's supported-version X.
> For all other instances the leader sends a regular Assignment in version 
> X back.
> If an upgrade follower sends new version number Y Subscription and 
> receives version X  Assignment with 

Re: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade

2018-03-22 Thread 孙振亚
so expected!
> 在 2018年3月22日,下午2:16,James Cheng  写道:
> 
> 
> 
> 
>> On Mar 21, 2018, at 11:18 AM, Matthias J. Sax  wrote:
>> 
>> Thanks for following up James.
>> 
>>> Is this the procedure that happens during every rebalance? The reason I ask 
>>> is that this step:
>> As long as the leader (before or after upgrade) receives at least
>> one old version X Subscription it always sends version Assignment X back
>> (the encoded supported version is X before the leader is upgrade and Y
>> after the leader is upgraded).
>> 
>> Yes, that would be the consequence.
>> 
>>> This implies that the leader receives all Subscriptions before sending back 
>>> any responses. Is that what actually happens? Is it possible that it would 
>>> receive say 4 out of 5 Subscriptions of Y, send back a response Y, and then 
>>> later receive a Subscription X? What happens in that case? Would that 
>>> Subscription X then trigger another rebalance, and the whole thing starts 
>>> again?
>> 
>> That sounds correct. A 'delayed' Subscription could always happen --
>> even before KIP-268 -- and would trigger a new rebalance. With this
>> regard, the behavior does not change. The difference is, that we would
>> automatically downgrade the Assignment from Y to X again -- but the
>> application would not fail (as it would before the KIP).
>> 
>> Do you see an issue with this behavior. The idea of the design is to
>> make Kafka Streams robust against those scenarios. Thus, if 4 apps are
>> upgraded but no.5 is not yet and no.5 is late, Kafka Streams would first
>> upgrade from X to Y and downgrade from Y to X in the second rebalance
>> when no.5 joins the group. If no.5 gets upgraded, a third rebalance
>> would upgrade to Y again.
>> 
> 
> Sounds good. 
> 
> 
>> Thus, as long as not all instances are on the newest version,
>> upgrades/donwgrades of the exchanged rebalance metadata could happen
>> multiple times. However, this should not be an issue from my understanding.
> 
> About “this should not be an issue”: this upgrade/downgrade is just about the 
> rebalance metadata, right? Are there other associated things that will also 
> have to upgrade/downgrade in sync with the rebalance metadata? For example, 
> the idea for this KIP originally came up during the discussion about adding 
> timestamps to RockDB state stores, which required updating the on-disk 
> schema. In the case of an updated RocksDB state store but with a downgraded 
> rebalance metadata... that should work, right? Because we still have updated 
> code (which understands the on-disk format) but that it simply gets its 
> partition assignments via the downgraded rebalance metadata?
> 
> Thanks,
> -James
> 
> Sent from my iPhone
> 
>> Let us know what you think about it.
>> 
>> 
>> -Matthias
>> 
>> 
>>> On 3/20/18 11:10 PM, James Cheng wrote:
>>> Sorry, I see that the VOTE started already, but I have a late question on 
>>> this KIP.
>>> 
>>> In the "version probing" protocol:
 Detailed upgrade protocol from metadata version X to Y (with X >= 1.2):
 On startup/rolling-bounce, an instance does not know what version the 
 leader understands and (optimistically) sends an Subscription with the 
 latest version Y
 (Old, ie, not yet upgraded) Leader sends empty Assignment back to the 
 corresponding instance that sent the newer Subscription it does not 
 understand. The Assignment metadata only encodes both version numbers 
 (used-version == supported-version) as leader's supported-version X.
 For all other instances the leader sends a regular Assignment in version X 
 back.
 If an upgrade follower sends new version number Y Subscription and 
 receives version X  Assignment with "supported-version = X", it can 
 downgrade to X (in-memory flag) and resends a new Subscription with old 
 version X to retry joining the group. To force an immediate second 
 rebalance, the follower does an "unsubscribe()/subscribe()/poll()" 
 sequence.
 As long as the leader (before or after upgrade) receives at least one old 
 version X Subscription it always sends version Assignment X back (the 
 encoded supported version is X before the leader is upgrade and Y after 
 the leader is upgraded).
 If an upgraded instance receives an Assigment it always checks the leaders 
 supported-version and update its downgraded "used-version" if possible
>>> 
>>> Is this the procedure that happens during every rebalance? The reason I ask 
>>> is that this step:
> As long as the leader (before or after upgrade) receives at least one old 
> version X Subscription it always sends version Assignment X back (the 
> encoded supported version is X before the leader is upgrade and Y after 
> the leader is upgraded).
>>> 
>>> This implies that the leader receives all Subscriptions before sending back 
>>> any responses. Is that what actually happens? Is it possible that it 

Re: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade

2018-03-22 Thread Matthias J. Sax
Yes, it only affects the metadata. KIP-268 targets metadata upgrade
without store upgrade.

We can discuss store upgrade further in KIP-258: I think in general, the
upgrade/downgrade behavior might be an issue for upgrading stores.
However, this upgrade/downgrade can only happen when upgrading from 1.2
to a future version. Thus, it won't affect an upgrade to 1.2.

For an upgrade to 1.2, we introduce the "upgrade.from" parameter
(because we don't have "version probing" for 1.1 yet) and this ensures
that upgrading cannot happen "too early", and no downgrade can happen
either for this case.

Let me know what you think.


-Matthias

On 3/21/18 11:16 PM, James Cheng wrote:
> 
> 
> 
>> On Mar 21, 2018, at 11:18 AM, Matthias J. Sax  wrote:
>>
>> Thanks for following up James.
>>
>>> Is this the procedure that happens during every rebalance? The reason I ask 
>>> is that this step:
>> As long as the leader (before or after upgrade) receives at least
>> one old version X Subscription it always sends version Assignment X back
>> (the encoded supported version is X before the leader is upgrade and Y
>> after the leader is upgraded).
>>
>> Yes, that would be the consequence.
>>
>>> This implies that the leader receives all Subscriptions before sending back 
>>> any responses. Is that what actually happens? Is it possible that it would 
>>> receive say 4 out of 5 Subscriptions of Y, send back a response Y, and then 
>>> later receive a Subscription X? What happens in that case? Would that 
>>> Subscription X then trigger another rebalance, and the whole thing starts 
>>> again?
>>
>> That sounds correct. A 'delayed' Subscription could always happen --
>> even before KIP-268 -- and would trigger a new rebalance. With this
>> regard, the behavior does not change. The difference is, that we would
>> automatically downgrade the Assignment from Y to X again -- but the
>> application would not fail (as it would before the KIP).
>>
>> Do you see an issue with this behavior. The idea of the design is to
>> make Kafka Streams robust against those scenarios. Thus, if 4 apps are
>> upgraded but no.5 is not yet and no.5 is late, Kafka Streams would first
>> upgrade from X to Y and downgrade from Y to X in the second rebalance
>> when no.5 joins the group. If no.5 gets upgraded, a third rebalance
>> would upgrade to Y again.
>>
> 
> Sounds good. 
> 
> 
>> Thus, as long as not all instances are on the newest version,
>> upgrades/donwgrades of the exchanged rebalance metadata could happen
>> multiple times. However, this should not be an issue from my understanding.
> 
> About “this should not be an issue”: this upgrade/downgrade is just about the 
> rebalance metadata, right? Are there other associated things that will also 
> have to upgrade/downgrade in sync with the rebalance metadata? For example, 
> the idea for this KIP originally came up during the discussion about adding 
> timestamps to RockDB state stores, which required updating the on-disk 
> schema. In the case of an updated RocksDB state store but with a downgraded 
> rebalance metadata... that should work, right? Because we still have updated 
> code (which understands the on-disk format) but that it simply gets its 
> partition assignments via the downgraded rebalance metadata?
> 
> Thanks,
> -James
> 
> Sent from my iPhone
> 
>> Let us know what you think about it.
>>
>>
>> -Matthias
>>
>>
>>> On 3/20/18 11:10 PM, James Cheng wrote:
>>> Sorry, I see that the VOTE started already, but I have a late question on 
>>> this KIP.
>>>
>>> In the "version probing" protocol:
 Detailed upgrade protocol from metadata version X to Y (with X >= 1.2):
 On startup/rolling-bounce, an instance does not know what version the 
 leader understands and (optimistically) sends an Subscription with the 
 latest version Y
 (Old, ie, not yet upgraded) Leader sends empty Assignment back to the 
 corresponding instance that sent the newer Subscription it does not 
 understand. The Assignment metadata only encodes both version numbers 
 (used-version == supported-version) as leader's supported-version X.
 For all other instances the leader sends a regular Assignment in version X 
 back.
 If an upgrade follower sends new version number Y Subscription and 
 receives version X  Assignment with "supported-version = X", it can 
 downgrade to X (in-memory flag) and resends a new Subscription with old 
 version X to retry joining the group. To force an immediate second 
 rebalance, the follower does an "unsubscribe()/subscribe()/poll()" 
 sequence.
 As long as the leader (before or after upgrade) receives at least one old 
 version X Subscription it always sends version Assignment X back (the 
 encoded supported version is X before the leader is upgrade and Y after 
 the leader is upgraded).
 If an upgraded instance receives an Assigment it always checks the leaders 
 supported-version 

Re: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade

2018-03-22 Thread James Cheng



> On Mar 21, 2018, at 11:18 AM, Matthias J. Sax  wrote:
> 
> Thanks for following up James.
> 
>> Is this the procedure that happens during every rebalance? The reason I ask 
>> is that this step:
> As long as the leader (before or after upgrade) receives at least
> one old version X Subscription it always sends version Assignment X back
> (the encoded supported version is X before the leader is upgrade and Y
> after the leader is upgraded).
> 
> Yes, that would be the consequence.
> 
>> This implies that the leader receives all Subscriptions before sending back 
>> any responses. Is that what actually happens? Is it possible that it would 
>> receive say 4 out of 5 Subscriptions of Y, send back a response Y, and then 
>> later receive a Subscription X? What happens in that case? Would that 
>> Subscription X then trigger another rebalance, and the whole thing starts 
>> again?
> 
> That sounds correct. A 'delayed' Subscription could always happen --
> even before KIP-268 -- and would trigger a new rebalance. With this
> regard, the behavior does not change. The difference is, that we would
> automatically downgrade the Assignment from Y to X again -- but the
> application would not fail (as it would before the KIP).
> 
> Do you see an issue with this behavior. The idea of the design is to
> make Kafka Streams robust against those scenarios. Thus, if 4 apps are
> upgraded but no.5 is not yet and no.5 is late, Kafka Streams would first
> upgrade from X to Y and downgrade from Y to X in the second rebalance
> when no.5 joins the group. If no.5 gets upgraded, a third rebalance
> would upgrade to Y again.
> 

Sounds good. 


> Thus, as long as not all instances are on the newest version,
> upgrades/donwgrades of the exchanged rebalance metadata could happen
> multiple times. However, this should not be an issue from my understanding.

About “this should not be an issue”: this upgrade/downgrade is just about the 
rebalance metadata, right? Are there other associated things that will also 
have to upgrade/downgrade in sync with the rebalance metadata? For example, the 
idea for this KIP originally came up during the discussion about adding 
timestamps to RockDB state stores, which required updating the on-disk schema. 
In the case of an updated RocksDB state store but with a downgraded rebalance 
metadata... that should work, right? Because we still have updated code (which 
understands the on-disk format) but that it simply gets its partition 
assignments via the downgraded rebalance metadata?

Thanks,
-James

Sent from my iPhone

> Let us know what you think about it.
> 
> 
> -Matthias
> 
> 
>> On 3/20/18 11:10 PM, James Cheng wrote:
>> Sorry, I see that the VOTE started already, but I have a late question on 
>> this KIP.
>> 
>> In the "version probing" protocol:
>>> Detailed upgrade protocol from metadata version X to Y (with X >= 1.2):
>>> On startup/rolling-bounce, an instance does not know what version the 
>>> leader understands and (optimistically) sends an Subscription with the 
>>> latest version Y
>>> (Old, ie, not yet upgraded) Leader sends empty Assignment back to the 
>>> corresponding instance that sent the newer Subscription it does not 
>>> understand. The Assignment metadata only encodes both version numbers 
>>> (used-version == supported-version) as leader's supported-version X.
>>> For all other instances the leader sends a regular Assignment in version X 
>>> back.
>>> If an upgrade follower sends new version number Y Subscription and receives 
>>> version X  Assignment with "supported-version = X", it can downgrade to X 
>>> (in-memory flag) and resends a new Subscription with old version X to retry 
>>> joining the group. To force an immediate second rebalance, the follower 
>>> does an "unsubscribe()/subscribe()/poll()" sequence.
>>> As long as the leader (before or after upgrade) receives at least one old 
>>> version X Subscription it always sends version Assignment X back (the 
>>> encoded supported version is X before the leader is upgrade and Y after the 
>>> leader is upgraded).
>>> If an upgraded instance receives an Assigment it always checks the leaders 
>>> supported-version and update its downgraded "used-version" if possible
>> 
>> Is this the procedure that happens during every rebalance? The reason I ask 
>> is that this step:
 As long as the leader (before or after upgrade) receives at least one old 
 version X Subscription it always sends version Assignment X back (the 
 encoded supported version is X before the leader is upgrade and Y after 
 the leader is upgraded).
>> 
>> This implies that the leader receives all Subscriptions before sending back 
>> any responses. Is that what actually happens? Is it possible that it would 
>> receive say 4 out of 5 Subscriptions of Y, send back a response Y, and then 
>> later receive a Subscription X? What happens in that case? Would that 
>> Subscription X then trigger another 

Re: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade

2018-03-21 Thread Richard Yu
Hi Matthias,

Just wondering, once this KIP goes through. Could I restart my older KIP
to update SubscriptionInfo?

Thanks
Richard

On Wed, Mar 21, 2018 at 11:18 AM, Matthias J. Sax 
wrote:

> Thanks for following up James.
>
> > Is this the procedure that happens during every rebalance? The reason I
> ask is that this step:
>  As long as the leader (before or after upgrade) receives at least
> one old version X Subscription it always sends version Assignment X back
> (the encoded supported version is X before the leader is upgrade and Y
> after the leader is upgraded).
>
> Yes, that would be the consequence.
>
> > This implies that the leader receives all Subscriptions before sending
> back any responses. Is that what actually happens? Is it possible that it
> would receive say 4 out of 5 Subscriptions of Y, send back a response Y,
> and then later receive a Subscription X? What happens in that case? Would
> that Subscription X then trigger another rebalance, and the whole thing
> starts again?
>
> That sounds correct. A 'delayed' Subscription could always happen --
> even before KIP-268 -- and would trigger a new rebalance. With this
> regard, the behavior does not change. The difference is, that we would
> automatically downgrade the Assignment from Y to X again -- but the
> application would not fail (as it would before the KIP).
>
> Do you see an issue with this behavior. The idea of the design is to
> make Kafka Streams robust against those scenarios. Thus, if 4 apps are
> upgraded but no.5 is not yet and no.5 is late, Kafka Streams would first
> upgrade from X to Y and downgrade from Y to X in the second rebalance
> when no.5 joins the group. If no.5 gets upgraded, a third rebalance
> would upgrade to Y again.
>
> Thus, as long as not all instances are on the newest version,
> upgrades/donwgrades of the exchanged rebalance metadata could happen
> multiple times. However, this should not be an issue from my understanding.
>
>
> Let us know what you think about it.
>
>
> -Matthias
>
>
> On 3/20/18 11:10 PM, James Cheng wrote:
> > Sorry, I see that the VOTE started already, but I have a late question
> on this KIP.
> >
> > In the "version probing" protocol:
> >> Detailed upgrade protocol from metadata version X to Y (with X >= 1.2):
> >> On startup/rolling-bounce, an instance does not know what version the
> leader understands and (optimistically) sends an Subscription with the
> latest version Y
> >> (Old, ie, not yet upgraded) Leader sends empty Assignment back to the
> corresponding instance that sent the newer Subscription it does not
> understand. The Assignment metadata only encodes both version numbers
> (used-version == supported-version) as leader's supported-version X.
> >> For all other instances the leader sends a regular Assignment in
> version X back.
> >> If an upgrade follower sends new version number Y Subscription and
> receives version X  Assignment with "supported-version = X", it can
> downgrade to X (in-memory flag) and resends a new Subscription with old
> version X to retry joining the group. To force an immediate second
> rebalance, the follower does an "unsubscribe()/subscribe()/poll()"
> sequence.
> >> As long as the leader (before or after upgrade) receives at least one
> old version X Subscription it always sends version Assignment X back (the
> encoded supported version is X before the leader is upgrade and Y after the
> leader is upgraded).
> >> If an upgraded instance receives an Assigment it always checks the
> leaders supported-version and update its downgraded "used-version" if
> possible
> >
> > Is this the procedure that happens during every rebalance? The reason I
> ask is that this step:
> >>> As long as the leader (before or after upgrade) receives at least one
> old version X Subscription it always sends version Assignment X back (the
> encoded supported version is X before the leader is upgrade and Y after the
> leader is upgraded).
> >
> > This implies that the leader receives all Subscriptions before sending
> back any responses. Is that what actually happens? Is it possible that it
> would receive say 4 out of 5 Subscriptions of Y, send back a response Y,
> and then later receive a Subscription X? What happens in that case? Would
> that Subscription X then trigger another rebalance, and the whole thing
> starts again?
> >
> > Thanks,
> > -James
> >
> >> On Mar 19, 2018, at 5:04 PM, Matthias J. Sax 
> wrote:
> >>
> >> Guozhang,
> >>
> >> thanks for your comments.
> >>
> >> 2: I think my main concern is, that 1.2 would be "special" release that
> >> everybody need to use to upgrade. As an alternative, we could say that
> >> we add the config in 1.2 and keep it for 2 additional releases (1.3 and
> >> 1.4) but remove it in 1.5. This gives users more flexibility and does
> >> force not force user to upgrade to a specific version but also allows us
> >> to not carry the tech debt forever. WDYT about this? If users 

Re: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade

2018-03-21 Thread Matthias J. Sax
Thanks for following up James.

> Is this the procedure that happens during every rebalance? The reason I ask 
> is that this step:
 As long as the leader (before or after upgrade) receives at least
one old version X Subscription it always sends version Assignment X back
(the encoded supported version is X before the leader is upgrade and Y
after the leader is upgraded).

Yes, that would be the consequence.

> This implies that the leader receives all Subscriptions before sending back 
> any responses. Is that what actually happens? Is it possible that it would 
> receive say 4 out of 5 Subscriptions of Y, send back a response Y, and then 
> later receive a Subscription X? What happens in that case? Would that 
> Subscription X then trigger another rebalance, and the whole thing starts 
> again?

That sounds correct. A 'delayed' Subscription could always happen --
even before KIP-268 -- and would trigger a new rebalance. With this
regard, the behavior does not change. The difference is, that we would
automatically downgrade the Assignment from Y to X again -- but the
application would not fail (as it would before the KIP).

Do you see an issue with this behavior. The idea of the design is to
make Kafka Streams robust against those scenarios. Thus, if 4 apps are
upgraded but no.5 is not yet and no.5 is late, Kafka Streams would first
upgrade from X to Y and downgrade from Y to X in the second rebalance
when no.5 joins the group. If no.5 gets upgraded, a third rebalance
would upgrade to Y again.

Thus, as long as not all instances are on the newest version,
upgrades/donwgrades of the exchanged rebalance metadata could happen
multiple times. However, this should not be an issue from my understanding.


Let us know what you think about it.


-Matthias


On 3/20/18 11:10 PM, James Cheng wrote:
> Sorry, I see that the VOTE started already, but I have a late question on 
> this KIP.
> 
> In the "version probing" protocol:
>> Detailed upgrade protocol from metadata version X to Y (with X >= 1.2):
>> On startup/rolling-bounce, an instance does not know what version the leader 
>> understands and (optimistically) sends an Subscription with the latest 
>> version Y
>> (Old, ie, not yet upgraded) Leader sends empty Assignment back to the 
>> corresponding instance that sent the newer Subscription it does not 
>> understand. The Assignment metadata only encodes both version numbers 
>> (used-version == supported-version) as leader's supported-version X.
>> For all other instances the leader sends a regular Assignment in version X 
>> back.
>> If an upgrade follower sends new version number Y Subscription and receives 
>> version X  Assignment with "supported-version = X", it can downgrade to X 
>> (in-memory flag) and resends a new Subscription with old version X to retry 
>> joining the group. To force an immediate second rebalance, the follower does 
>> an "unsubscribe()/subscribe()/poll()" sequence.
>> As long as the leader (before or after upgrade) receives at least one old 
>> version X Subscription it always sends version Assignment X back (the 
>> encoded supported version is X before the leader is upgrade and Y after the 
>> leader is upgraded).
>> If an upgraded instance receives an Assigment it always checks the leaders 
>> supported-version and update its downgraded "used-version" if possible
> 
> Is this the procedure that happens during every rebalance? The reason I ask 
> is that this step:
>>> As long as the leader (before or after upgrade) receives at least one old 
>>> version X Subscription it always sends version Assignment X back (the 
>>> encoded supported version is X before the leader is upgrade and Y after the 
>>> leader is upgraded).
> 
> This implies that the leader receives all Subscriptions before sending back 
> any responses. Is that what actually happens? Is it possible that it would 
> receive say 4 out of 5 Subscriptions of Y, send back a response Y, and then 
> later receive a Subscription X? What happens in that case? Would that 
> Subscription X then trigger another rebalance, and the whole thing starts 
> again?
> 
> Thanks,
> -James
> 
>> On Mar 19, 2018, at 5:04 PM, Matthias J. Sax  wrote:
>>
>> Guozhang,
>>
>> thanks for your comments.
>>
>> 2: I think my main concern is, that 1.2 would be "special" release that
>> everybody need to use to upgrade. As an alternative, we could say that
>> we add the config in 1.2 and keep it for 2 additional releases (1.3 and
>> 1.4) but remove it in 1.5. This gives users more flexibility and does
>> force not force user to upgrade to a specific version but also allows us
>> to not carry the tech debt forever. WDYT about this? If users upgrade on
>> an regular basis, this approach could avoid a forces update with high
>> probability as the will upgrade to either 1.2/1.3/1.4 anyway at some
>> point. Thus, only if users don't upgrade for a very long time, they are
>> forces to do 2 upgrades with an intermediate version.

Re: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade

2018-03-21 Thread James Cheng
Sorry, I see that the VOTE started already, but I have a late question on this 
KIP.

In the "version probing" protocol:
> Detailed upgrade protocol from metadata version X to Y (with X >= 1.2):
> On startup/rolling-bounce, an instance does not know what version the leader 
> understands and (optimistically) sends an Subscription with the latest 
> version Y
> (Old, ie, not yet upgraded) Leader sends empty Assignment back to the 
> corresponding instance that sent the newer Subscription it does not 
> understand. The Assignment metadata only encodes both version numbers 
> (used-version == supported-version) as leader's supported-version X.
> For all other instances the leader sends a regular Assignment in version X 
> back.
> If an upgrade follower sends new version number Y Subscription and receives 
> version X  Assignment with "supported-version = X", it can downgrade to X 
> (in-memory flag) and resends a new Subscription with old version X to retry 
> joining the group. To force an immediate second rebalance, the follower does 
> an "unsubscribe()/subscribe()/poll()" sequence.
> As long as the leader (before or after upgrade) receives at least one old 
> version X Subscription it always sends version Assignment X back (the encoded 
> supported version is X before the leader is upgrade and Y after the leader is 
> upgraded).
> If an upgraded instance receives an Assigment it always checks the leaders 
> supported-version and update its downgraded "used-version" if possible

Is this the procedure that happens during every rebalance? The reason I ask is 
that this step:
>> As long as the leader (before or after upgrade) receives at least one old 
>> version X Subscription it always sends version Assignment X back (the 
>> encoded supported version is X before the leader is upgrade and Y after the 
>> leader is upgraded).

This implies that the leader receives all Subscriptions before sending back any 
responses. Is that what actually happens? Is it possible that it would receive 
say 4 out of 5 Subscriptions of Y, send back a response Y, and then later 
receive a Subscription X? What happens in that case? Would that Subscription X 
then trigger another rebalance, and the whole thing starts again?

Thanks,
-James

> On Mar 19, 2018, at 5:04 PM, Matthias J. Sax  wrote:
> 
> Guozhang,
> 
> thanks for your comments.
> 
> 2: I think my main concern is, that 1.2 would be "special" release that
> everybody need to use to upgrade. As an alternative, we could say that
> we add the config in 1.2 and keep it for 2 additional releases (1.3 and
> 1.4) but remove it in 1.5. This gives users more flexibility and does
> force not force user to upgrade to a specific version but also allows us
> to not carry the tech debt forever. WDYT about this? If users upgrade on
> an regular basis, this approach could avoid a forces update with high
> probability as the will upgrade to either 1.2/1.3/1.4 anyway at some
> point. Thus, only if users don't upgrade for a very long time, they are
> forces to do 2 upgrades with an intermediate version.
> 
> 4. Updated the KIP to remove the ".x" suffix
> 
> 5. Updated the KIP accordingly.
> 
> -Matthias
> 
> On 3/19/18 10:33 AM, Guozhang Wang wrote:
>> Yup :)
>> 
>> On Mon, Mar 19, 2018 at 10:01 AM, Ted Yu  wrote:
>> 
>>> bq. some snippet like ProduceRequest / ProduceRequest
>>> 
>>> Did you mean ProduceRequest / Response ?
>>> 
>>> Cheers
>>> 
>>> On Mon, Mar 19, 2018 at 9:51 AM, Guozhang Wang  wrote:
>>> 
 Hi Matthias,
 
 About 2: yeah I guess this is a subjective preference. My main concern
 about keeping the config / handling code beyond 1.2 release is that it
>>> will
 become a non-cleanable tech debt forever, as fewer and fewer users would
 need to upgrade from 0.10.x and 1.1.x, and eventually we will need to
 maintain this for nearly no one. On the other hand, I agree that this
>>> tech
 debt is not too large. So if more people feel this is a good tradeoff to
 pay for not enforcing users from older versions to upgrade twice I'm
>>> happen
 to change my opinion.
 
 A few more minor comments:
 
 4. For the values of "upgrade.from", could we simply to only major.minor?
 I.e. "0.10.0" than "0.10.0.x" ? Since we never changed compatibility
 behavior in bug fix releases we would not need to specify a bug-fix
>>> version
 to distinguish ever.
 
 5. Could you also present the encoding format in subscription /
>>> assignment
 metadata bytes in version 2, and in future versions (i.e. which first
>>> bytes
 would be kept moving forward), for readers to better understand the
 proposal? some snippet like ProduceRequest / ProduceRequest in
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-
 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
 would be very helpful.
 
 
 
 Guozhang
 
 
 On Fri, Mar 

Re: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade

2018-03-19 Thread Matthias J. Sax
Guozhang,

thanks for your comments.

2: I think my main concern is, that 1.2 would be "special" release that
everybody need to use to upgrade. As an alternative, we could say that
we add the config in 1.2 and keep it for 2 additional releases (1.3 and
1.4) but remove it in 1.5. This gives users more flexibility and does
force not force user to upgrade to a specific version but also allows us
to not carry the tech debt forever. WDYT about this? If users upgrade on
an regular basis, this approach could avoid a forces update with high
probability as the will upgrade to either 1.2/1.3/1.4 anyway at some
point. Thus, only if users don't upgrade for a very long time, they are
forces to do 2 upgrades with an intermediate version.

4. Updated the KIP to remove the ".x" suffix

5. Updated the KIP accordingly.

-Matthias

On 3/19/18 10:33 AM, Guozhang Wang wrote:
> Yup :)
> 
> On Mon, Mar 19, 2018 at 10:01 AM, Ted Yu  wrote:
> 
>> bq. some snippet like ProduceRequest / ProduceRequest
>>
>> Did you mean ProduceRequest / Response ?
>>
>> Cheers
>>
>> On Mon, Mar 19, 2018 at 9:51 AM, Guozhang Wang  wrote:
>>
>>> Hi Matthias,
>>>
>>> About 2: yeah I guess this is a subjective preference. My main concern
>>> about keeping the config / handling code beyond 1.2 release is that it
>> will
>>> become a non-cleanable tech debt forever, as fewer and fewer users would
>>> need to upgrade from 0.10.x and 1.1.x, and eventually we will need to
>>> maintain this for nearly no one. On the other hand, I agree that this
>> tech
>>> debt is not too large. So if more people feel this is a good tradeoff to
>>> pay for not enforcing users from older versions to upgrade twice I'm
>> happen
>>> to change my opinion.
>>>
>>> A few more minor comments:
>>>
>>> 4. For the values of "upgrade.from", could we simply to only major.minor?
>>> I.e. "0.10.0" than "0.10.0.x" ? Since we never changed compatibility
>>> behavior in bug fix releases we would not need to specify a bug-fix
>> version
>>> to distinguish ever.
>>>
>>> 5. Could you also present the encoding format in subscription /
>> assignment
>>> metadata bytes in version 2, and in future versions (i.e. which first
>> bytes
>>> would be kept moving forward), for readers to better understand the
>>> proposal? some snippet like ProduceRequest / ProduceRequest in
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
>>> would be very helpful.
>>>
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Fri, Mar 16, 2018 at 2:58 PM, Matthias J. Sax 
>>> wrote:
>>>
 Thanks for your comments.

 1. Because the old leader cannot decode the new Subscription it can
>> only
 send an empty assignment back. The idea to send empty assignments to
>> all
 members is interesting. I will try this out in an PR to see how it
>>> behaves.

 2. I don't see an issue with keeping config `upgrade.from` for future
 releases. Personally, I would prefer to not force users to do two
 upgrades if they want to go from pre-1.2 to post-1.2 version. Is there
>> a
 technical argument why you want to get rid of the config? What
 disadvantages do you see keeping `upgrade.from` beyond 1.2 release?

 Keeping the config is just a few lines of code in `StreamsConfig` as
 well we a single `if` statement in `StreamsPartitionAssignor` to force
>> a
 downgrade (cf
 https://github.com/apache/kafka/pull/4636/files#diff-
 392371c29384e33bb09ed342e7696c68R201)


 3. I updated the KIP accordingly.


 -Matthias

 On 3/15/18 3:19 PM, Guozhang Wang wrote:
> Hello Matthias, thanks for the KIP. Here are some comments:
>
> 1. "For all other instances the leader sends a regular Assignment in
> version X back." Does that mean the leader will exclude any member of
>>> the
> group whose protocol version that it does not understand? For
>> example,
>>> if
> we have A, B, C with A the leader, and B bounced with the newer
>>> version.
 In
> the first rebalance, A will only consider {A, C} for assignment while
> sending empty assignment to B. And then later when B downgrades will
>> it
> re-assign the tasks to it again? I felt this is unnecessarily
>>> increasing
> the num. rebalances and the total latency. Could the leader just
>> sends
> empty assignment to everyone, and since upon receiving the empty
 assignment
> each thread will not create / restore any tasks and will not clean up
>>> its
> local state (so that the prevCachedTasks are not lost in future
 rebalances)
> and re-joins immediately, if users choose to bounce an instance once
>> it
 is
> in RUNNING state the total time of rolling upgrades will be reduced.
>
> 2. If we want to allow upgrading from 1.1- versions to any of the
>>> future
> versions beyond 1.2, then we'd always need to keep the 

Re: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade

2018-03-19 Thread Guozhang Wang
Yup :)

On Mon, Mar 19, 2018 at 10:01 AM, Ted Yu  wrote:

> bq. some snippet like ProduceRequest / ProduceRequest
>
> Did you mean ProduceRequest / Response ?
>
> Cheers
>
> On Mon, Mar 19, 2018 at 9:51 AM, Guozhang Wang  wrote:
>
> > Hi Matthias,
> >
> > About 2: yeah I guess this is a subjective preference. My main concern
> > about keeping the config / handling code beyond 1.2 release is that it
> will
> > become a non-cleanable tech debt forever, as fewer and fewer users would
> > need to upgrade from 0.10.x and 1.1.x, and eventually we will need to
> > maintain this for nearly no one. On the other hand, I agree that this
> tech
> > debt is not too large. So if more people feel this is a good tradeoff to
> > pay for not enforcing users from older versions to upgrade twice I'm
> happen
> > to change my opinion.
> >
> > A few more minor comments:
> >
> > 4. For the values of "upgrade.from", could we simply to only major.minor?
> > I.e. "0.10.0" than "0.10.0.x" ? Since we never changed compatibility
> > behavior in bug fix releases we would not need to specify a bug-fix
> version
> > to distinguish ever.
> >
> > 5. Could you also present the encoding format in subscription /
> assignment
> > metadata bytes in version 2, and in future versions (i.e. which first
> bytes
> > would be kept moving forward), for readers to better understand the
> > proposal? some snippet like ProduceRequest / ProduceRequest in
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > would be very helpful.
> >
> >
> >
> > Guozhang
> >
> >
> > On Fri, Mar 16, 2018 at 2:58 PM, Matthias J. Sax 
> > wrote:
> >
> > > Thanks for your comments.
> > >
> > > 1. Because the old leader cannot decode the new Subscription it can
> only
> > > send an empty assignment back. The idea to send empty assignments to
> all
> > > members is interesting. I will try this out in an PR to see how it
> > behaves.
> > >
> > > 2. I don't see an issue with keeping config `upgrade.from` for future
> > > releases. Personally, I would prefer to not force users to do two
> > > upgrades if they want to go from pre-1.2 to post-1.2 version. Is there
> a
> > > technical argument why you want to get rid of the config? What
> > > disadvantages do you see keeping `upgrade.from` beyond 1.2 release?
> > >
> > > Keeping the config is just a few lines of code in `StreamsConfig` as
> > > well we a single `if` statement in `StreamsPartitionAssignor` to force
> a
> > > downgrade (cf
> > > https://github.com/apache/kafka/pull/4636/files#diff-
> > > 392371c29384e33bb09ed342e7696c68R201)
> > >
> > >
> > > 3. I updated the KIP accordingly.
> > >
> > >
> > > -Matthias
> > >
> > > On 3/15/18 3:19 PM, Guozhang Wang wrote:
> > > > Hello Matthias, thanks for the KIP. Here are some comments:
> > > >
> > > > 1. "For all other instances the leader sends a regular Assignment in
> > > > version X back." Does that mean the leader will exclude any member of
> > the
> > > > group whose protocol version that it does not understand? For
> example,
> > if
> > > > we have A, B, C with A the leader, and B bounced with the newer
> > version.
> > > In
> > > > the first rebalance, A will only consider {A, C} for assignment while
> > > > sending empty assignment to B. And then later when B downgrades will
> it
> > > > re-assign the tasks to it again? I felt this is unnecessarily
> > increasing
> > > > the num. rebalances and the total latency. Could the leader just
> sends
> > > > empty assignment to everyone, and since upon receiving the empty
> > > assignment
> > > > each thread will not create / restore any tasks and will not clean up
> > its
> > > > local state (so that the prevCachedTasks are not lost in future
> > > rebalances)
> > > > and re-joins immediately, if users choose to bounce an instance once
> it
> > > is
> > > > in RUNNING state the total time of rolling upgrades will be reduced.
> > > >
> > > > 2. If we want to allow upgrading from 1.1- versions to any of the
> > future
> > > > versions beyond 1.2, then we'd always need to keep the special
> handling
> > > > logic for this two rolling-bounce mechanism plus a config that we
> would
> > > > never be able to deprecate; on the other hand, if the version probing
> > > > procedure is fast, I think the extra operational cost from upgrading
> > from
> > > > 1.1- to a future version, to upgrading from 1.1- to 1.2, and then
> > another
> > > > upgrade from 1.2 to a future version could be small. So depending on
> > the
> > > > experimental result of the upgrade latency, I'd suggest considering
> the
> > > > trade-off of the extra code/config needed maintaining for the special
> > > > handling.
> > > >
> > > > 3. Testing plan: could you elaborate a bit more on the actual
> > > upgrade-paths
> > > > we should test? For example, I'm thinking the following:
> > > >
> > > > a. 0.10.0 -> 1.2
> > > > b. 1.1 -> 1.2
> > > 

Re: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade

2018-03-19 Thread Ted Yu
bq. some snippet like ProduceRequest / ProduceRequest

Did you mean ProduceRequest / Response ?

Cheers

On Mon, Mar 19, 2018 at 9:51 AM, Guozhang Wang  wrote:

> Hi Matthias,
>
> About 2: yeah I guess this is a subjective preference. My main concern
> about keeping the config / handling code beyond 1.2 release is that it will
> become a non-cleanable tech debt forever, as fewer and fewer users would
> need to upgrade from 0.10.x and 1.1.x, and eventually we will need to
> maintain this for nearly no one. On the other hand, I agree that this tech
> debt is not too large. So if more people feel this is a good tradeoff to
> pay for not enforcing users from older versions to upgrade twice I'm happen
> to change my opinion.
>
> A few more minor comments:
>
> 4. For the values of "upgrade.from", could we simply to only major.minor?
> I.e. "0.10.0" than "0.10.0.x" ? Since we never changed compatibility
> behavior in bug fix releases we would not need to specify a bug-fix version
> to distinguish ever.
>
> 5. Could you also present the encoding format in subscription / assignment
> metadata bytes in version 2, and in future versions (i.e. which first bytes
> would be kept moving forward), for readers to better understand the
> proposal? some snippet like ProduceRequest / ProduceRequest in
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> would be very helpful.
>
>
>
> Guozhang
>
>
> On Fri, Mar 16, 2018 at 2:58 PM, Matthias J. Sax 
> wrote:
>
> > Thanks for your comments.
> >
> > 1. Because the old leader cannot decode the new Subscription it can only
> > send an empty assignment back. The idea to send empty assignments to all
> > members is interesting. I will try this out in an PR to see how it
> behaves.
> >
> > 2. I don't see an issue with keeping config `upgrade.from` for future
> > releases. Personally, I would prefer to not force users to do two
> > upgrades if they want to go from pre-1.2 to post-1.2 version. Is there a
> > technical argument why you want to get rid of the config? What
> > disadvantages do you see keeping `upgrade.from` beyond 1.2 release?
> >
> > Keeping the config is just a few lines of code in `StreamsConfig` as
> > well we a single `if` statement in `StreamsPartitionAssignor` to force a
> > downgrade (cf
> > https://github.com/apache/kafka/pull/4636/files#diff-
> > 392371c29384e33bb09ed342e7696c68R201)
> >
> >
> > 3. I updated the KIP accordingly.
> >
> >
> > -Matthias
> >
> > On 3/15/18 3:19 PM, Guozhang Wang wrote:
> > > Hello Matthias, thanks for the KIP. Here are some comments:
> > >
> > > 1. "For all other instances the leader sends a regular Assignment in
> > > version X back." Does that mean the leader will exclude any member of
> the
> > > group whose protocol version that it does not understand? For example,
> if
> > > we have A, B, C with A the leader, and B bounced with the newer
> version.
> > In
> > > the first rebalance, A will only consider {A, C} for assignment while
> > > sending empty assignment to B. And then later when B downgrades will it
> > > re-assign the tasks to it again? I felt this is unnecessarily
> increasing
> > > the num. rebalances and the total latency. Could the leader just sends
> > > empty assignment to everyone, and since upon receiving the empty
> > assignment
> > > each thread will not create / restore any tasks and will not clean up
> its
> > > local state (so that the prevCachedTasks are not lost in future
> > rebalances)
> > > and re-joins immediately, if users choose to bounce an instance once it
> > is
> > > in RUNNING state the total time of rolling upgrades will be reduced.
> > >
> > > 2. If we want to allow upgrading from 1.1- versions to any of the
> future
> > > versions beyond 1.2, then we'd always need to keep the special handling
> > > logic for this two rolling-bounce mechanism plus a config that we would
> > > never be able to deprecate; on the other hand, if the version probing
> > > procedure is fast, I think the extra operational cost from upgrading
> from
> > > 1.1- to a future version, to upgrading from 1.1- to 1.2, and then
> another
> > > upgrade from 1.2 to a future version could be small. So depending on
> the
> > > experimental result of the upgrade latency, I'd suggest considering the
> > > trade-off of the extra code/config needed maintaining for the special
> > > handling.
> > >
> > > 3. Testing plan: could you elaborate a bit more on the actual
> > upgrade-paths
> > > we should test? For example, I'm thinking the following:
> > >
> > > a. 0.10.0 -> 1.2
> > > b. 1.1 -> 1.2
> > > c. 1.2 -> 1.3 (simulated v4)
> > > d. 0.10.0 -> 1.3 (simulated v4)
> > > e. 1.1 -> 1.3 (simulated v4)
> > >
> > > Guozhang
> > >
> > >
> > >
> > >
> > > On Wed, Mar 14, 2018 at 11:17 PM, Matthias J. Sax <
> matth...@confluent.io
> > >
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> I want to propose KIP-268 to allow rebalance metadata 

Re: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade

2018-03-19 Thread Guozhang Wang
Hi Matthias,

About 2: yeah I guess this is a subjective preference. My main concern
about keeping the config / handling code beyond 1.2 release is that it will
become a non-cleanable tech debt forever, as fewer and fewer users would
need to upgrade from 0.10.x and 1.1.x, and eventually we will need to
maintain this for nearly no one. On the other hand, I agree that this tech
debt is not too large. So if more people feel this is a good tradeoff to
pay for not enforcing users from older versions to upgrade twice I'm happen
to change my opinion.

A few more minor comments:

4. For the values of "upgrade.from", could we simply to only major.minor?
I.e. "0.10.0" than "0.10.0.x" ? Since we never changed compatibility
behavior in bug fix releases we would not need to specify a bug-fix version
to distinguish ever.

5. Could you also present the encoding format in subscription / assignment
metadata bytes in version 2, and in future versions (i.e. which first bytes
would be kept moving forward), for readers to better understand the
proposal? some snippet like ProduceRequest / ProduceRequest in
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
would be very helpful.



Guozhang


On Fri, Mar 16, 2018 at 2:58 PM, Matthias J. Sax 
wrote:

> Thanks for your comments.
>
> 1. Because the old leader cannot decode the new Subscription it can only
> send an empty assignment back. The idea to send empty assignments to all
> members is interesting. I will try this out in an PR to see how it behaves.
>
> 2. I don't see an issue with keeping config `upgrade.from` for future
> releases. Personally, I would prefer to not force users to do two
> upgrades if they want to go from pre-1.2 to post-1.2 version. Is there a
> technical argument why you want to get rid of the config? What
> disadvantages do you see keeping `upgrade.from` beyond 1.2 release?
>
> Keeping the config is just a few lines of code in `StreamsConfig` as
> well we a single `if` statement in `StreamsPartitionAssignor` to force a
> downgrade (cf
> https://github.com/apache/kafka/pull/4636/files#diff-
> 392371c29384e33bb09ed342e7696c68R201)
>
>
> 3. I updated the KIP accordingly.
>
>
> -Matthias
>
> On 3/15/18 3:19 PM, Guozhang Wang wrote:
> > Hello Matthias, thanks for the KIP. Here are some comments:
> >
> > 1. "For all other instances the leader sends a regular Assignment in
> > version X back." Does that mean the leader will exclude any member of the
> > group whose protocol version that it does not understand? For example, if
> > we have A, B, C with A the leader, and B bounced with the newer version.
> In
> > the first rebalance, A will only consider {A, C} for assignment while
> > sending empty assignment to B. And then later when B downgrades will it
> > re-assign the tasks to it again? I felt this is unnecessarily increasing
> > the num. rebalances and the total latency. Could the leader just sends
> > empty assignment to everyone, and since upon receiving the empty
> assignment
> > each thread will not create / restore any tasks and will not clean up its
> > local state (so that the prevCachedTasks are not lost in future
> rebalances)
> > and re-joins immediately, if users choose to bounce an instance once it
> is
> > in RUNNING state the total time of rolling upgrades will be reduced.
> >
> > 2. If we want to allow upgrading from 1.1- versions to any of the future
> > versions beyond 1.2, then we'd always need to keep the special handling
> > logic for this two rolling-bounce mechanism plus a config that we would
> > never be able to deprecate; on the other hand, if the version probing
> > procedure is fast, I think the extra operational cost from upgrading from
> > 1.1- to a future version, to upgrading from 1.1- to 1.2, and then another
> > upgrade from 1.2 to a future version could be small. So depending on the
> > experimental result of the upgrade latency, I'd suggest considering the
> > trade-off of the extra code/config needed maintaining for the special
> > handling.
> >
> > 3. Testing plan: could you elaborate a bit more on the actual
> upgrade-paths
> > we should test? For example, I'm thinking the following:
> >
> > a. 0.10.0 -> 1.2
> > b. 1.1 -> 1.2
> > c. 1.2 -> 1.3 (simulated v4)
> > d. 0.10.0 -> 1.3 (simulated v4)
> > e. 1.1 -> 1.3 (simulated v4)
> >
> > Guozhang
> >
> >
> >
> >
> > On Wed, Mar 14, 2018 at 11:17 PM, Matthias J. Sax  >
> > wrote:
> >
> >> Hi,
> >>
> >> I want to propose KIP-268 to allow rebalance metadata version upgrades
> >> in Kafka Streams:
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade
> >>
> >> Looking forward to your feedback.
> >>
> >>
> >> -Matthias
> >>
> >>
> >
> >
>
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade

2018-03-16 Thread Matthias J. Sax
Thanks for your comments.

1. Because the old leader cannot decode the new Subscription it can only
send an empty assignment back. The idea to send empty assignments to all
members is interesting. I will try this out in an PR to see how it behaves.

2. I don't see an issue with keeping config `upgrade.from` for future
releases. Personally, I would prefer to not force users to do two
upgrades if they want to go from pre-1.2 to post-1.2 version. Is there a
technical argument why you want to get rid of the config? What
disadvantages do you see keeping `upgrade.from` beyond 1.2 release?

Keeping the config is just a few lines of code in `StreamsConfig` as
well we a single `if` statement in `StreamsPartitionAssignor` to force a
downgrade (cf
https://github.com/apache/kafka/pull/4636/files#diff-392371c29384e33bb09ed342e7696c68R201)


3. I updated the KIP accordingly.


-Matthias

On 3/15/18 3:19 PM, Guozhang Wang wrote:
> Hello Matthias, thanks for the KIP. Here are some comments:
> 
> 1. "For all other instances the leader sends a regular Assignment in
> version X back." Does that mean the leader will exclude any member of the
> group whose protocol version that it does not understand? For example, if
> we have A, B, C with A the leader, and B bounced with the newer version. In
> the first rebalance, A will only consider {A, C} for assignment while
> sending empty assignment to B. And then later when B downgrades will it
> re-assign the tasks to it again? I felt this is unnecessarily increasing
> the num. rebalances and the total latency. Could the leader just sends
> empty assignment to everyone, and since upon receiving the empty assignment
> each thread will not create / restore any tasks and will not clean up its
> local state (so that the prevCachedTasks are not lost in future rebalances)
> and re-joins immediately, if users choose to bounce an instance once it is
> in RUNNING state the total time of rolling upgrades will be reduced.
> 
> 2. If we want to allow upgrading from 1.1- versions to any of the future
> versions beyond 1.2, then we'd always need to keep the special handling
> logic for this two rolling-bounce mechanism plus a config that we would
> never be able to deprecate; on the other hand, if the version probing
> procedure is fast, I think the extra operational cost from upgrading from
> 1.1- to a future version, to upgrading from 1.1- to 1.2, and then another
> upgrade from 1.2 to a future version could be small. So depending on the
> experimental result of the upgrade latency, I'd suggest considering the
> trade-off of the extra code/config needed maintaining for the special
> handling.
> 
> 3. Testing plan: could you elaborate a bit more on the actual upgrade-paths
> we should test? For example, I'm thinking the following:
> 
> a. 0.10.0 -> 1.2
> b. 1.1 -> 1.2
> c. 1.2 -> 1.3 (simulated v4)
> d. 0.10.0 -> 1.3 (simulated v4)
> e. 1.1 -> 1.3 (simulated v4)
> 
> Guozhang
> 
> 
> 
> 
> On Wed, Mar 14, 2018 at 11:17 PM, Matthias J. Sax 
> wrote:
> 
>> Hi,
>>
>> I want to propose KIP-268 to allow rebalance metadata version upgrades
>> in Kafka Streams:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade
>>
>> Looking forward to your feedback.
>>
>>
>> -Matthias
>>
>>
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-268: Simplify Kafka Streams Rebalance Metadata Upgrade

2018-03-15 Thread Guozhang Wang
Hello Matthias, thanks for the KIP. Here are some comments:

1. "For all other instances the leader sends a regular Assignment in
version X back." Does that mean the leader will exclude any member of the
group whose protocol version that it does not understand? For example, if
we have A, B, C with A the leader, and B bounced with the newer version. In
the first rebalance, A will only consider {A, C} for assignment while
sending empty assignment to B. And then later when B downgrades will it
re-assign the tasks to it again? I felt this is unnecessarily increasing
the num. rebalances and the total latency. Could the leader just sends
empty assignment to everyone, and since upon receiving the empty assignment
each thread will not create / restore any tasks and will not clean up its
local state (so that the prevCachedTasks are not lost in future rebalances)
and re-joins immediately, if users choose to bounce an instance once it is
in RUNNING state the total time of rolling upgrades will be reduced.

2. If we want to allow upgrading from 1.1- versions to any of the future
versions beyond 1.2, then we'd always need to keep the special handling
logic for this two rolling-bounce mechanism plus a config that we would
never be able to deprecate; on the other hand, if the version probing
procedure is fast, I think the extra operational cost from upgrading from
1.1- to a future version, to upgrading from 1.1- to 1.2, and then another
upgrade from 1.2 to a future version could be small. So depending on the
experimental result of the upgrade latency, I'd suggest considering the
trade-off of the extra code/config needed maintaining for the special
handling.

3. Testing plan: could you elaborate a bit more on the actual upgrade-paths
we should test? For example, I'm thinking the following:

a. 0.10.0 -> 1.2
b. 1.1 -> 1.2
c. 1.2 -> 1.3 (simulated v4)
d. 0.10.0 -> 1.3 (simulated v4)
e. 1.1 -> 1.3 (simulated v4)

Guozhang




On Wed, Mar 14, 2018 at 11:17 PM, Matthias J. Sax 
wrote:

> Hi,
>
> I want to propose KIP-268 to allow rebalance metadata version upgrades
> in Kafka Streams:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade
>
> Looking forward to your feedback.
>
>
> -Matthias
>
>


-- 
-- Guozhang