[GitHub] [kafka] ableegoldman commented on a change in pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur

2020-10-13 Thread GitBox


ableegoldman commented on a change in pull request #9383:
URL: https://github.com/apache/kafka/pull/9383#discussion_r504111457



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -483,7 +485,7 @@ public void testEagerSubscription() {
 Collections.sort(subscription.topics());
 assertEquals(asList("topic1", "topic2"), subscription.topics());
 
-final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks);

Review comment:
   Oh, saw that you already added the test for `0` --> `1` --> `2` 
behavior. In that case can you just add two things to that test: verify the 
behavior on overflow, and verify that the length is always exactly one byte





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur

2020-10-13 Thread GitBox


ableegoldman commented on a change in pull request #9383:
URL: https://github.com/apache/kafka/pull/9383#discussion_r504110656



##
File path: 
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
##
@@ -158,7 +159,8 @@ public ByteBuffer subscriptionUserData(final Set 
topics) {
 LATEST_SUPPORTED_VERSION + 1,
 taskManager.processId(),
 userEndPoint(),
-taskManager.getTaskOffsetSums()
+taskManager.getTaskOffsetSums(),
+Bytes.EMPTY

Review comment:
   We should do the same thing that we do in StreamsPartitionAssignor here, 
ie a changing byte. This is supposed to be like the same exact thing but for a 
"future version" of the partition assignor, to test version probing and general 
compatibility. 
   It's a bit annoying that you have to just copy the same exact stuff but 
that's just how it is for now





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur

2020-10-13 Thread GitBox


ableegoldman commented on a change in pull request #9383:
URL: https://github.com/apache/kafka/pull/9383#discussion_r504108760



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -483,7 +485,7 @@ public void testEagerSubscription() {
 Collections.sort(subscription.topics());
 assertEquals(asList("topic1", "topic2"), subscription.topics());
 
-final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks);

Review comment:
   Oh right duh I was thinking it was just a single bit but it's a byte. In 
that case we should have a test that verifies it goes from `0` to `1` to `2`, 
etc -- might be good to verify the behavior on overflow as well, if you call 
`subscriptionUserData` the max_value number of times





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur

2020-10-09 Thread GitBox


ableegoldman commented on a change in pull request #9383:
URL: https://github.com/apache/kafka/pull/9383#discussion_r501875691



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
##
@@ -286,7 +286,8 @@ private static Properties streamsProperties(final String 
appId,
 mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, 
"6"),
 mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, 
configuredAssignmentListener),
 mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100"),
-
mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, 
HighAvailabilityTaskAssignor.class.getName())
+
mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, 
HighAvailabilityTaskAssignor.class.getName()),
+mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 40)

Review comment:
   Can we add a comment here explaining why we set the thread count so 
high? I feel like we'll forget and be really confused when we stumble across 
this in the future.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
##
@@ -59,6 +60,7 @@
 );
 
 private final static String IGNORED_USER_ENDPOINT = 
"ignoredUserEndpoint:80";
+private static final byte[] IGNORED_UNIQUE_FIELD = Bytes.EMPTY;

Review comment:
   nit: let's use `new byte[1]` for this to make sure it's actually being 
ignored when it should be (since apparently it won't notice if you just pass in 
empty bytes for this field on a version < 8)

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -483,7 +485,7 @@ public void testEagerSubscription() {
 Collections.sort(subscription.topics());
 assertEquals(asList("topic1", "topic2"), subscription.topics());
 
-final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks);

Review comment:
   Yeah, it should be `0` the first time you call it, then `1` the second 
time, and then back to `0` again on the third call





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur

2020-10-08 Thread GitBox


ableegoldman commented on a change in pull request #9383:
URL: https://github.com/apache/kafka/pull/9383#discussion_r502052730



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -483,7 +485,7 @@ public void testEagerSubscription() {
 Collections.sort(subscription.topics());
 assertEquals(asList("topic1", "topic2"), subscription.topics());
 
-final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks);

Review comment:
   Yeah, it should be `0` the first time you call it, then `1` the second 
time, and then back to `0` again on the third call





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur

2020-10-08 Thread GitBox


ableegoldman commented on a change in pull request #9383:
URL: https://github.com/apache/kafka/pull/9383#discussion_r501876646



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
##
@@ -59,6 +60,7 @@
 );
 
 private final static String IGNORED_USER_ENDPOINT = 
"ignoredUserEndpoint:80";
+private static final byte[] IGNORED_UNIQUE_FIELD = Bytes.EMPTY;

Review comment:
   nit: let's use `new byte[1]` for this to make sure it's actually being 
ignored when it should be (since apparently it won't notice if you just pass in 
empty bytes for this field on a version < 8)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur

2020-10-08 Thread GitBox


ableegoldman commented on a change in pull request #9383:
URL: https://github.com/apache/kafka/pull/9383#discussion_r501875691



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
##
@@ -286,7 +286,8 @@ private static Properties streamsProperties(final String 
appId,
 mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, 
"6"),
 mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, 
configuredAssignmentListener),
 mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100"),
-
mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, 
HighAvailabilityTaskAssignor.class.getName())
+
mkEntry(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, 
HighAvailabilityTaskAssignor.class.getName()),
+mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 40)

Review comment:
   Can we add a comment here explaining why we set the thread count so 
high? I feel like we'll forget and be really confused when we stumble across 
this in the future.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur

2020-10-07 Thread GitBox


ableegoldman commented on a change in pull request #9383:
URL: https://github.com/apache/kafka/pull/9383#discussion_r50133



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -212,6 +213,7 @@ public void configure(final Map configs) {
 rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
 taskAssignorSupplier = assignorConfiguration::taskAssignor;
 assignmentListener = assignorConfiguration.assignmentListener();
+uniqueField = usedSubscriptionMetadataVersion >= 8 ? new byte[1] : new 
byte[0];

Review comment:
   That seems like an indication that the subscription is being encoded 
incorrectly. If the version is below 8, it shouldn't be trying to encode the 
`uniqueField` in the first place -- you shouldn't have to mimic that yourself 
by forcing it to be empty (and even if it is I worry there might be some 
additional bytes for metadata that would get stored for that field, when that 
field should not exist at all for versions < 8)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur

2020-10-07 Thread GitBox


ableegoldman commented on a change in pull request #9383:
URL: https://github.com/apache/kafka/pull/9383#discussion_r501322300



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -483,7 +485,7 @@ public void testEagerSubscription() {
 Collections.sort(subscription.topics());
 assertEquals(asList("topic1", "topic2"), subscription.topics());
 
-final SubscriptionInfo info = getInfo(UUID_1, prevTasks, standbyTasks);

Review comment:
   Can you add a test that verifies that it goes back and forth between the 
two expected values when you call `partitionAssignor.subscriptionUserData` 
multiples times (let's say 3) -- also maybe add a verification that the 
`uniqueField` has a length of just 1

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -234,15 +236,20 @@ public ByteBuffer subscriptionUserData(final Set 
topics) {
 // Adds the following information to subscription
 // 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
 // 2. Map from task id to its overall lag
+// 3. Unique Field to ensure a rebalance when a thread rejoins by 
forcing the user data to be different
 
 handleRebalanceStart(topics);
+if (usedSubscriptionMetadataVersion >= 8) {

Review comment:
   Instead of hardcoded `8` all over, let's just define a constant for this 
similar to the `MIN_VERSION_OFFSET_SUM_SUBSCRIPTION ` in SubscriptionInfo

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -234,15 +236,20 @@ public ByteBuffer subscriptionUserData(final Set 
topics) {
 // Adds the following information to subscription
 // 1. Client UUID (a unique id assigned to an instance of KafkaStreams)
 // 2. Map from task id to its overall lag
+// 3. Unique Field to ensure a rebalance when a thread rejoins by 
forcing the user data to be different
 
 handleRebalanceStart(topics);
+if (usedSubscriptionMetadataVersion >= 8) {

Review comment:
   If you want, I think it's also fine to just always flip the byte and not 
even check against the used subscription version. 

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
##
@@ -286,23 +296,23 @@ public void 
shouldEncodeAndDecodeSmallerLatestSupportedVersion() {
 final int latestSupportedVersion = LATEST_SUPPORTED_VERSION - 1;
 
 final SubscriptionInfo info =
-new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, 
"localhost:80", TASK_OFFSET_SUMS);
+new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, 
"localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD);
 final SubscriptionInfo expectedInfo =
-new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, 
"localhost:80", TASK_OFFSET_SUMS);
+new SubscriptionInfo(usedVersion, latestSupportedVersion, UUID_1, 
"localhost:80", TASK_OFFSET_SUMS, IGNORED_UNIQUE_FIELD);
 assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode()));
 }
 
 @Test
 public void shouldEncodeAndDecodeVersion7() {

Review comment:
   Can you add a test like this for the new version 8?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -212,6 +213,7 @@ public void configure(final Map configs) {
 rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
 taskAssignorSupplier = assignorConfiguration::taskAssignor;
 assignmentListener = assignorConfiguration.assignmentListener();
+uniqueField = usedSubscriptionMetadataVersion >= 8 ? new byte[1] : new 
byte[0];

Review comment:
   Also, now that I think about it, the `usedSubscriptionMetadataVersion` 
will only ever be >= 8 at this point. It might be set to something lower at 
some point later on, but it has to be at least 8 when the assignor is just 
being created/configured

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
##
@@ -313,7 +323,8 @@ public void 
shouldReturnTaskOffsetSumsMapForDecodedSubscription() {
 new SubscriptionInfo(MIN_VERSION_OFFSET_SUM_SUBSCRIPTION,
  LATEST_SUPPORTED_VERSION, UUID_1,
  "localhost:80",
- TASK_OFFSET_SUMS)
+ TASK_OFFSET_SUMS,
+ IGNORED_UNIQUE_FIELD)
 .encode());
 assertThat(info.taskOffsetSums(), is(TASK_OFFSET_SUMS));
 }

Review comment:
   I think we should also add a test to