vvcephei commented on a change in pull request #9273: URL: https://github.com/apache/kafka/pull/9273#discussion_r486589243
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -242,7 +242,8 @@ public ByteBuffer subscriptionUserData(final Set<String> topics) { LATEST_SUPPORTED_VERSION, taskManager.processId(), userEndPoint, - taskManager.getTaskOffsetSums()) + taskManager.getTaskOffsetSums(), + taskManager.isShutdownRequested()) Review comment: This works for the POC, but for the real change, I'd recommend using an independent flag, such as an AtomicBoolean. In the past, we used to piggy-back all kinds of inappropriate flags through the TaskManager just because it happened to be shared between these components. But it ultimately made TaskManager pretty hard to maintain. Better to just keep orthogonal things independent. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java ########## @@ -96,6 +107,14 @@ public SubscriptionInfo(final int version, data.setLatestSupportedVersion(latestSupportedVersion); } + if (version >= 8) { + if (shutdownRequested == null) { //check is required for testing because the mock sets it to null Review comment: We should instead mock realistic values. Having extra branches in the code just to handle the default behavior of EasyMock creates maintenance problems. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java ########## @@ -82,11 +83,21 @@ public SubscriptionInfo(final int version, final UUID processId, final String userEndPoint, final Map<TaskId, Long> taskOffsetSums) { + this(version, latestSupportedVersion, processId, userEndPoint, taskOffsetSums, new AtomicInteger(0)); + } + + public SubscriptionInfo(final int version, + final int latestSupportedVersion, + final UUID processId, + final String userEndPoint, + final Map<TaskId, Long> taskOffsetSums, + final AtomicInteger shutdownRequested) { Review comment: It seems like we can look at this field two ways: 1. if we're really just flagging "shutdownRequested or not", then a `boolean` is fine. 2. if we're creating a general facility for members to send error codes to the assignor, then we should call this field "errorCode" instead, and an `int` is fine. ########## File path: streams/src/main/resources/common/message/SubscriptionInfoData.json ########## @@ -48,6 +48,11 @@ "versions": "2+", "type": "bytes" }, + { + "name": "shutdownRequested", + "versions": "8+", + "type": "int32" Review comment: 32 bits is a lot of possible error codes. Note: these numbers are encoding enum values. According to the java spec, you can't have more than 2^16 members a class. It looks like some people actually experimented with how many enum values they could add before the compiler fails, and it tops out at 2746 values. (https://stackoverflow.com/questions/4468388/maximum-number-of-enum-elements-in-java) Either way, an int16 (`short`) should be more than enough codes. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -1436,6 +1447,16 @@ public void onAssignment(final Assignment assignment, final ConsumerGroupMetadat topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost); encodedNextScheduledRebalanceMs = info.nextRebalanceMs(); break; + case 8: + validateActiveTaskEncoding(partitions, info, logPrefix); + + activeTasks = getActiveTasks(partitions, info); + partitionsByHost = info.partitionsByHost(); + standbyPartitionsByHost = info.standbyPartitionByHost(); + topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost); + encodedNextScheduledRebalanceMs = info.nextRebalanceMs(); + //recive the shutdown then call the request close Review comment: If it's identical, you can just do: ``` case 7: case 8: validateActive.... ... break; ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ########## @@ -566,10 +567,22 @@ void runLoop() { } } catch (final TaskMigratedException e) { handleTaskMigrated(e); + } catch (final ShutdownRequestedException e){ + handleShutdownRequest(e); Review comment: Interesting. This looks like it's probably just a consequence of the way the test is set up. Some debugging should clear it up. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java ########## @@ -186,6 +186,15 @@ public ByteBuffer encode() { out.writeInt(errCode); out.writeLong(nextRebalanceMs); break; + case 8: + out.writeInt(usedVersion); + out.writeInt(commonlySupportedVersion); + encodeActiveAndStandbyTaskAssignment(out); + encodeActiveAndStandbyHostPartitions(out); + out.writeInt(errCode); + out.writeLong(nextRebalanceMs); + out.writeInt(0); Review comment: What is this for? It looks like the actual "shutdown" error code is just encoded in the `errCode` field. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org