wcarlson5 commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r486644110
########## File path: streams/src/main/resources/common/message/SubscriptionInfoData.json ########## @@ -48,6 +48,11 @@ "versions": "2+", "type": "bytes" }, + { + "name": "shutdownRequested", + "versions": "8+", + "type": "int32" Review comment: if we make it bool then we only have 2 options ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -242,7 +242,8 @@ public ByteBuffer subscriptionUserData(final Set<String> topics) { LATEST_SUPPORTED_VERSION, taskManager.processId(), userEndPoint, - taskManager.getTaskOffsetSums()) + taskManager.getTaskOffsetSums(), + taskManager.isShutdownRequested()) Review comment: tests are failing because the Mock of test Manager is setting the shutdownRequested flag to null. not sure If I should fix the Mock or add a redundant null check and fix ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java ########## @@ -96,6 +107,14 @@ public SubscriptionInfo(final int version, data.setLatestSupportedVersion(latestSupportedVersion); } + if (version >= 8) { + if (shutdownRequested == null) { //check is required for testing because the mock sets it to null Review comment: Agree. I don't want to leave it there but until I fix easy mock I want it to pass ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -1436,6 +1447,16 @@ public void onAssignment(final Assignment assignment, final ConsumerGroupMetadat topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost); encodedNextScheduledRebalanceMs = info.nextRebalanceMs(); break; + case 8: + validateActiveTaskEncoding(partitions, info, logPrefix); + + activeTasks = getActiveTasks(partitions, info); + partitionsByHost = info.partitionsByHost(); + standbyPartitionsByHost = info.standbyPartitionByHost(); + topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost); + encodedNextScheduledRebalanceMs = info.nextRebalanceMs(); + //recive the shutdown then call the request close Review comment: good call ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java ########## @@ -186,6 +186,15 @@ public ByteBuffer encode() { out.writeInt(errCode); out.writeLong(nextRebalanceMs); break; + case 8: + out.writeInt(usedVersion); + out.writeInt(commonlySupportedVersion); + encodeActiveAndStandbyTaskAssignment(out); + encodeActiveAndStandbyHostPartitions(out); + out.writeInt(errCode); + out.writeLong(nextRebalanceMs); + out.writeInt(0); Review comment: yep not actually necessary, I got confused between AssignmentInfo and Subscription info because they use the same protocol version but use different schemas... i think ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java ########## @@ -82,11 +83,21 @@ public SubscriptionInfo(final int version, final UUID processId, final String userEndPoint, final Map<TaskId, Long> taskOffsetSums) { + this(version, latestSupportedVersion, processId, userEndPoint, taskOffsetSums, new AtomicInteger(0)); + } + + public SubscriptionInfo(final int version, + final int latestSupportedVersion, + final UUID processId, + final String userEndPoint, + final Map<TaskId, Long> taskOffsetSums, + final AtomicInteger shutdownRequested) { Review comment: I've changed it to a `bool` for now. I think it fits this use case better and if want to expand to use other codes it would be relatively low effort to make it an `int`. What fo you think would be better? Edit: I was going to do that but checkstyle won't let me import AtomicBoolean, so Ive rolled the shutdown the assignorError all the way, which is probably better than the half way method I was using ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -242,7 +242,8 @@ public ByteBuffer subscriptionUserData(final Set<String> topics) { LATEST_SUPPORTED_VERSION, taskManager.processId(), userEndPoint, - taskManager.getTaskOffsetSums()) + taskManager.getTaskOffsetSums(), + taskManager.isShutdownRequested()) Review comment: that makes sense, when you mean independent flag would you recommend adding it to the assignorConfiguration like the assignmentErrorCode? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org