cadonna commented on a change in pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#discussion_r647295043



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
##########
@@ -407,32 +409,56 @@ public void shouldNotErrorAccessingFutureVars() {
     @Test
     public void shouldEncodeAndDecodeVersion9() {
         final SubscriptionInfo info =
-                new SubscriptionInfo(9, LATEST_SUPPORTED_VERSION, UUID_1, 
"localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+                new SubscriptionInfo(9, LATEST_SUPPORTED_VERSION, UUID_1, 
"localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, 
Collections.emptyMap());
         assertThat(info, is(SubscriptionInfo.decode(info.encode())));
     }
 
     @Test
     public void shouldEncodeAndDecodeVersion10() {
         final SubscriptionInfo info =
-            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, 
"localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, 
"localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, 
Collections.emptyMap());
         assertThat(info, is(SubscriptionInfo.decode(info.encode())));
     }
 
     @Test
     public void shouldEncodeAndDecodeVersion10WithNamedTopologies() {
         final SubscriptionInfo info =
-            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, 
"localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, 
IGNORED_ERROR_CODE);
+            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, 
"localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, 
IGNORED_ERROR_CODE, Collections.emptyMap());
         assertThat(info, is(SubscriptionInfo.decode(info.encode())));
     }
 
     @Test
     public void shouldThrowIfAttemptingToUseNamedTopologiesWithOlderVersion() {
         assertThrows(
             TaskAssignmentException.class,
-            () -> new SubscriptionInfo(MIN_NAMED_TOPOLOGY_VERSION - 1, 
LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, 
IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE)
+            () -> new SubscriptionInfo(MIN_NAMED_TOPOLOGY_VERSION - 1, 
LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, 
IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, Collections.emptyMap())
         );
     }
 
+    @Test
+    public void shouldEncodeAndDecodeVersion11() {
+        final SubscriptionInfo info =
+            new SubscriptionInfo(11, LATEST_SUPPORTED_VERSION, UUID_1, 
"localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, 
mkMap(mkEntry("t1", "v1")));

Review comment:
       Could you use a map with more than just one entry? If you use the same 
map in multiple tests, you should put it into a class field. The same applies 
to the tests below.

##########
File path: streams/src/main/resources/common/message/SubscriptionInfoData.json
##########
@@ -135,6 +140,22 @@
           "type": "int64"
         }
       ]
+    },
+    {
+      "name": "ClientTag",
+      "versions": "1+",

Review comment:
       I think this should be 11+. While technically it probably does not make 
any difference, it better documents when the struct was introduced.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
##########
@@ -96,8 +98,8 @@ public void shouldThrowForUnknownVersion1() {
             "localhost:80",
             TASK_OFFSET_SUMS,
             IGNORED_UNIQUE_FIELD,
-            IGNORED_ERROR_CODE
-        ));
+            IGNORED_ERROR_CODE,
+            Collections.emptyMap()));

Review comment:
       Could you use a static final variable named `IGNORED_CLIENT_TAGS` to 
better document the code as was done for some other fields?
   Here and below.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
##########
@@ -125,10 +130,33 @@ private SubscriptionInfo(final SubscriptionInfoData 
subscriptionInfoData) {
         this.data = subscriptionInfoData;
     }
 
+    public Map<String, String> clientTags() {
+        return data.clientTags()
+                   .stream()
+                   .collect(
+                       Collectors.toMap(
+                           clientTag -> new String(clientTag.key(), 
StandardCharsets.UTF_8),
+                           clientTag -> new String(clientTag.value(), 
StandardCharsets.UTF_8)
+                       )
+                   );

Review comment:
       nit:
   ```suggestion
           return data.clientTags().stream()
               .collect(
                   Collectors.toMap(
                       clientTag -> new String(clientTag.key(), 
StandardCharsets.UTF_8),
                       clientTag -> new String(clientTag.value(), 
StandardCharsets.UTF_8)
                   )
               );
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
##########
@@ -407,32 +409,56 @@ public void shouldNotErrorAccessingFutureVars() {
     @Test
     public void shouldEncodeAndDecodeVersion9() {
         final SubscriptionInfo info =
-                new SubscriptionInfo(9, LATEST_SUPPORTED_VERSION, UUID_1, 
"localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+                new SubscriptionInfo(9, LATEST_SUPPORTED_VERSION, UUID_1, 
"localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, 
Collections.emptyMap());
         assertThat(info, is(SubscriptionInfo.decode(info.encode())));
     }
 
     @Test
     public void shouldEncodeAndDecodeVersion10() {
         final SubscriptionInfo info =
-            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, 
"localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE);
+            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, 
"localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, 
Collections.emptyMap());
         assertThat(info, is(SubscriptionInfo.decode(info.encode())));
     }
 
     @Test
     public void shouldEncodeAndDecodeVersion10WithNamedTopologies() {
         final SubscriptionInfo info =
-            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, 
"localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, 
IGNORED_ERROR_CODE);
+            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, 
"localhost:80", NAMED_TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, 
IGNORED_ERROR_CODE, Collections.emptyMap());
         assertThat(info, is(SubscriptionInfo.decode(info.encode())));
     }
 
     @Test
     public void shouldThrowIfAttemptingToUseNamedTopologiesWithOlderVersion() {
         assertThrows(
             TaskAssignmentException.class,
-            () -> new SubscriptionInfo(MIN_NAMED_TOPOLOGY_VERSION - 1, 
LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, 
IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE)
+            () -> new SubscriptionInfo(MIN_NAMED_TOPOLOGY_VERSION - 1, 
LATEST_SUPPORTED_VERSION, UUID_1, "localhost:80", NAMED_TASK_OFFSET_SUMS, 
IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, Collections.emptyMap())
         );
     }
 
+    @Test
+    public void shouldEncodeAndDecodeVersion11() {
+        final SubscriptionInfo info =
+            new SubscriptionInfo(11, LATEST_SUPPORTED_VERSION, UUID_1, 
"localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, 
mkMap(mkEntry("t1", "v1")));
+        assertThat(info, is(SubscriptionInfo.decode(info.encode())));
+    }
+
+    @Test
+    public void shouldReturnEmptyMapOfClientTagsOnOlderVersions() {
+        final SubscriptionInfo info =
+            new SubscriptionInfo(10, LATEST_SUPPORTED_VERSION, UUID_1, 
"localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, 
mkMap(mkEntry("t1", "v1")));
+
+        assertThat(info.clientTags(), is(anEmptyMap()));
+    }
+
+    @Test
+    public void shouldReturnMapOfClientTagsOnVersion11() {
+        final Map<String, String> clientTags = mkMap(mkEntry("t1", "v1"));
+        final SubscriptionInfo info =
+            new SubscriptionInfo(11, LATEST_SUPPORTED_VERSION, UUID_1, 
"localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD, IGNORED_ERROR_CODE, 
clientTags);
+
+        assertThat(info.clientTags(), is(clientTags));
+    }
+

Review comment:
       Could you add a test to verify what happens when an empty client tags 
map is passed to a version 11 subscription info?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
##########
@@ -125,10 +130,33 @@ private SubscriptionInfo(final SubscriptionInfoData 
subscriptionInfoData) {
         this.data = subscriptionInfoData;
     }
 
+    public Map<String, String> clientTags() {
+        return data.clientTags()
+                   .stream()
+                   .collect(
+                       Collectors.toMap(
+                           clientTag -> new String(clientTag.key(), 
StandardCharsets.UTF_8),
+                           clientTag -> new String(clientTag.value(), 
StandardCharsets.UTF_8)
+                       )
+                   );
+    }
+
     public int errorCode() {
         return data.errorCode();
     }
 
+    private List<ClientTag> buildClientTagsFromMap(final Map<String, String> 
clientTags) {
+        return clientTags.entrySet()
+                         .stream()
+                         .map(clientTagEntry -> {
+                             final ClientTag clientTag = new ClientTag();
+                             
clientTag.setKey(clientTagEntry.getKey().getBytes(StandardCharsets.UTF_8));
+                             
clientTag.setValue(clientTagEntry.getValue().getBytes(StandardCharsets.UTF_8));
+                             return clientTag;
+                         })
+                         .collect(Collectors.toList());
+    }

Review comment:
       nit:
   ```suggestion
       private List<ClientTag> buildClientTagsFromMap(final Map<String, String> 
clientTags) {
           return clientTags.entrySet().stream()
               .map(clientTagEntry -> {
                   final ClientTag clientTag = new ClientTag();
                   
clientTag.setKey(clientTagEntry.getKey().getBytes(StandardCharsets.UTF_8));
                   
clientTag.setValue(clientTagEntry.getValue().getBytes(StandardCharsets.UTF_8));
                   return clientTag;
               })
               .collect(Collectors.toList());
       }
   ```

##########
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##########
@@ -474,8 +474,8 @@ def do_rolling_bounce(self, processor, counter, 
current_generation):
                     monitors[first_other_processor] = first_other_monitor
                     monitors[second_other_processor] = second_other_monitor
 
-                    version_probing_message = "Sent a version 10 subscription 
and got version 10 assignment back (successful version probing). Downgrade 
subscription metadata to commonly supported version 10 and trigger new 
rebalance.",
-                    end_of_upgrade_message = "Sent a version 10 subscription 
and group.s latest commonly supported version is 11 (successful version probing 
and end of rolling upgrade). Upgrading subscription metadata version to 11 for 
next rebalance."
+                    version_probing_message = "Sent a version 11 subscription 
and got version 11 assignment back (successful version probing). Downgrade 
subscription metadata to commonly supported version 10 and trigger new 
rebalance.",
+                    end_of_upgrade_message = "Sent a version 11 subscription 
and group.s latest commonly supported version is 12 (successful version probing 
and end of rolling upgrade). Upgrading subscription metadata version to 12 for 
next rebalance."

Review comment:
       Awesome that you thought about this!

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
##########
@@ -161,8 +162,8 @@ public ByteBuffer subscriptionUserData(final Set<String> 
topics) {
                     userEndPoint(),
                     taskManager.getTaskOffsetSums(),
                     uniqueField,
-                    0
-                ).encode();
+                    0,
+                    Collections.emptyMap()).encode();

Review comment:
       Could you put the map into variable with a meaningful name?
   Would make sense to use a non-empty map here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to