wcarlson5 commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r486644110



##########
File path: streams/src/main/resources/common/message/SubscriptionInfoData.json
##########
@@ -48,6 +48,11 @@
       "versions": "2+",
       "type": "bytes"
     },
+    {
+      "name": "shutdownRequested",
+      "versions": "8+",
+      "type": "int32"

Review comment:
       if we make it bool then we only have 2 options

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -242,7 +242,8 @@ public ByteBuffer subscriptionUserData(final Set<String> 
topics) {
             LATEST_SUPPORTED_VERSION,
             taskManager.processId(),
             userEndPoint,
-            taskManager.getTaskOffsetSums())
+            taskManager.getTaskOffsetSums(),
+            taskManager.isShutdownRequested())

Review comment:
       tests are failing because the Mock of test Manager is setting the 
shutdownRequested flag to null. not sure If I should fix the Mock or add a 
redundant null check and fix

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
##########
@@ -96,6 +107,14 @@ public SubscriptionInfo(final int version,
             data.setLatestSupportedVersion(latestSupportedVersion);
         }
 
+        if (version >= 8) {
+            if (shutdownRequested == null) { //check is required for testing 
because the mock sets it to null

Review comment:
       Agree. I don't want to leave it there but until I fix easy mock I want 
it to pass

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1436,6 +1447,16 @@ public void onAssignment(final Assignment assignment, 
final ConsumerGroupMetadat
                 topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
                 encodedNextScheduledRebalanceMs = info.nextRebalanceMs();
                 break;
+            case 8:
+                validateActiveTaskEncoding(partitions, info, logPrefix);
+
+                activeTasks = getActiveTasks(partitions, info);
+                partitionsByHost = info.partitionsByHost();
+                standbyPartitionsByHost = info.standbyPartitionByHost();
+                topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
+                encodedNextScheduledRebalanceMs = info.nextRebalanceMs();
+                //recive the shutdown then call the request close

Review comment:
       good call

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
##########
@@ -186,6 +186,15 @@ public ByteBuffer encode() {
                     out.writeInt(errCode);
                     out.writeLong(nextRebalanceMs);
                     break;
+                case 8:
+                    out.writeInt(usedVersion);
+                    out.writeInt(commonlySupportedVersion);
+                    encodeActiveAndStandbyTaskAssignment(out);
+                    encodeActiveAndStandbyHostPartitions(out);
+                    out.writeInt(errCode);
+                    out.writeLong(nextRebalanceMs);
+                    out.writeInt(0);

Review comment:
       yep not actually necessary, I got confused between AssignmentInfo and 
Subscription info because they use the same protocol version but use different 
schemas... i think 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
##########
@@ -82,11 +83,21 @@ public SubscriptionInfo(final int version,
                             final UUID processId,
                             final String userEndPoint,
                             final Map<TaskId, Long> taskOffsetSums) {
+        this(version, latestSupportedVersion, processId, userEndPoint, 
taskOffsetSums, new AtomicInteger(0));
+    }
+
+    public SubscriptionInfo(final int version,
+                            final int latestSupportedVersion,
+                            final UUID processId,
+                            final String userEndPoint,
+                            final Map<TaskId, Long> taskOffsetSums,
+                            final AtomicInteger shutdownRequested) {

Review comment:
       I've changed it to a `bool` for now. I think it fits this use case 
better and if want to expand to use other codes it would be relatively low 
effort to make it an `int`. What fo you think would be better?
   
   Edit: I was going to do that but checkstyle won't let me import 
AtomicBoolean, so Ive rolled the shutdown the assignorError all the way, which 
is probably better than the half way method I was using

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -242,7 +242,8 @@ public ByteBuffer subscriptionUserData(final Set<String> 
topics) {
             LATEST_SUPPORTED_VERSION,
             taskManager.processId(),
             userEndPoint,
-            taskManager.getTaskOffsetSums())
+            taskManager.getTaskOffsetSums(),
+            taskManager.isShutdownRequested())

Review comment:
       that makes sense, when you mean independent flag would you recommend 
adding it to the assignorConfiguration like the assignmentErrorCode?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to