cadonna commented on a change in pull request #10802: URL: https://github.com/apache/kafka/pull/10802#discussion_r647295043
########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java ########## @@ -407,32 +409,56 @@ public void shouldNotErrorAccessingFutureVars() { @Test public void shouldEncodeAndDecodeVersion9() { final SubscriptionInfo info = - new SubscriptionInfo(9, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE); + new SubscriptionInfo(9, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, Collections.emptyMap()); assertThat(info, is(SubscriptionInfo.decode(info.encode()))); } @Test public void shouldEncodeAndDecodeVersion10() { final SubscriptionInfo info = - new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE); + new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, Collections.emptyMap()); assertThat(info, is(SubscriptionInfo.decode(info.encode()))); } @Test public void shouldEncodeAndDecodeVersion10WithNamedTopologies() { final SubscriptionInfo info = - new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE); + new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, Collections.emptyMap()); assertThat(info, is(SubscriptionInfo.decode(info.encode()))); } @Test public void shouldThrowIfAttemptingToUseNamedTopologiesWithOlderVersion() { assertThrows( TaskAssignmentException.class, - () -> new SubscriptionInfo(MIN_NAMED_TOPOLOGY_VERSION - 1, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE) + () -> new SubscriptionInfo(MIN_NAMED_TOPOLOGY_VERSION - 1, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, Collections.emptyMap()) ); } + @Test + public void shouldEncodeAndDecodeVersion11() { + final SubscriptionInfo info = + new SubscriptionInfo(11, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, mkMap(mkEntry("t1", "v1"))); Review comment: Could you use a map with more than just one entry? If you use the same map in multiple tests, you should put it into a class field. The same applies to the tests below. ########## File path: streams/src/main/resources/common/message/SubscriptionInfoData.json ########## @@ -135,6 +140,22 @@ "type": "int64" } ] + }, + { + "name": "ClientTag", + "versions": "1+", Review comment: I think this should be 11+. While technically it probably does not make any difference, it better documents when the struct was introduced. ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java ########## @@ -96,8 +98,8 @@ public void shouldThrowForUnknownVersion1() { "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, - IGNORED_ERROR_CODE - )); + IGNORED_ERROR_CODE, + Collections.emptyMap())); Review comment: Could you use a static final variable named `IGNORED_CLIENT_TAGS` to better document the code as was done for some other fields? Here and below. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java ########## @@ -125,10 +130,33 @@ private SubscriptionInfo(final SubscriptionInfoData subscriptionInfoData) { this.data = subscriptionInfoData; } + public Map<String, String> clientTags() { + return data.clientTags() + .stream() + .collect( + Collectors.toMap( + clientTag -> new String(clientTag.key(), StandardCharsets.UTF_8), + clientTag -> new String(clientTag.value(), StandardCharsets.UTF_8) + ) + ); Review comment: nit: ```suggestion return data.clientTags().stream() .collect( Collectors.toMap( clientTag -> new String(clientTag.key(), StandardCharsets.UTF_8), clientTag -> new String(clientTag.value(), StandardCharsets.UTF_8) ) ); ``` ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java ########## @@ -407,32 +409,56 @@ public void shouldNotErrorAccessingFutureVars() { @Test public void shouldEncodeAndDecodeVersion9() { final SubscriptionInfo info = - new SubscriptionInfo(9, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE); + new SubscriptionInfo(9, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, Collections.emptyMap()); assertThat(info, is(SubscriptionInfo.decode(info.encode()))); } @Test public void shouldEncodeAndDecodeVersion10() { final SubscriptionInfo info = - new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE); + new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, Collections.emptyMap()); assertThat(info, is(SubscriptionInfo.decode(info.encode()))); } @Test public void shouldEncodeAndDecodeVersion10WithNamedTopologies() { final SubscriptionInfo info = - new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE); + new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, Collections.emptyMap()); assertThat(info, is(SubscriptionInfo.decode(info.encode()))); } @Test public void shouldThrowIfAttemptingToUseNamedTopologiesWithOlderVersion() { assertThrows( TaskAssignmentException.class, - () -> new SubscriptionInfo(MIN_NAMED_TOPOLOGY_VERSION - 1, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE) + () -> new SubscriptionInfo(MIN_NAMED_TOPOLOGY_VERSION - 1, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, Collections.emptyMap()) ); } + @Test + public void shouldEncodeAndDecodeVersion11() { + final SubscriptionInfo info = + new SubscriptionInfo(11, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, mkMap(mkEntry("t1", "v1"))); + assertThat(info, is(SubscriptionInfo.decode(info.encode()))); + } + + @Test + public void shouldReturnEmptyMapOfClientTagsOnOlderVersions() { + final SubscriptionInfo info = + new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, mkMap(mkEntry("t1", "v1"))); + + assertThat(info.clientTags(), is(anEmptyMap())); + } + + @Test + public void shouldReturnMapOfClientTagsOnVersion11() { + final Map<String, String> clientTags = mkMap(mkEntry("t1", "v1")); + final SubscriptionInfo info = + new SubscriptionInfo(11, LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, clientTags); + + assertThat(info.clientTags(), is(clientTags)); + } + Review comment: Could you add a test to verify what happens when an empty client tags map is passed to a version 11 subscription info? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java ########## @@ -125,10 +130,33 @@ private SubscriptionInfo(final SubscriptionInfoData subscriptionInfoData) { this.data = subscriptionInfoData; } + public Map<String, String> clientTags() { + return data.clientTags() + .stream() + .collect( + Collectors.toMap( + clientTag -> new String(clientTag.key(), StandardCharsets.UTF_8), + clientTag -> new String(clientTag.value(), StandardCharsets.UTF_8) + ) + ); + } + public int errorCode() { return data.errorCode(); } + private List<ClientTag> buildClientTagsFromMap(final Map<String, String> clientTags) { + return clientTags.entrySet() + .stream() + .map(clientTagEntry -> { + final ClientTag clientTag = new ClientTag(); + clientTag.setKey(clientTagEntry.getKey().getBytes(StandardCharsets.UTF_8)); + clientTag.setValue(clientTagEntry.getValue().getBytes(StandardCharsets.UTF_8)); + return clientTag; + }) + .collect(Collectors.toList()); + } Review comment: nit: ```suggestion private List<ClientTag> buildClientTagsFromMap(final Map<String, String> clientTags) { return clientTags.entrySet().stream() .map(clientTagEntry -> { final ClientTag clientTag = new ClientTag(); clientTag.setKey(clientTagEntry.getKey().getBytes(StandardCharsets.UTF_8)); clientTag.setValue(clientTagEntry.getValue().getBytes(StandardCharsets.UTF_8)); return clientTag; }) .collect(Collectors.toList()); } ``` ########## File path: tests/kafkatest/tests/streams/streams_upgrade_test.py ########## @@ -474,8 +474,8 @@ def do_rolling_bounce(self, processor, counter, current_generation): monitors[first_other_processor] = first_other_monitor monitors[second_other_processor] = second_other_monitor - version_probing_message = "Sent a version 10 subscription and got version 10 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 10 and trigger new rebalance.", - end_of_upgrade_message = "Sent a version 10 subscription and group.s latest commonly supported version is 11 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 11 for next rebalance." + version_probing_message = "Sent a version 11 subscription and got version 11 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version 10 and trigger new rebalance.", + end_of_upgrade_message = "Sent a version 11 subscription and group.s latest commonly supported version is 12 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 12 for next rebalance." Review comment: Awesome that you thought about this! ########## File path: streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java ########## @@ -161,8 +162,8 @@ public ByteBuffer subscriptionUserData(final Set<String> topics) { userEndPoint(), taskManager.getTaskOffsetSums(), uniqueField, - 0 - ).encode(); + 0, + Collections.emptyMap()).encode(); Review comment: Could you put the map into variable with a meaningful name? Would make sense to use a non-empty map here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org