vvcephei commented on a change in pull request #9273:
URL: https://github.com/apache/kafka/pull/9273#discussion_r486589243



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -242,7 +242,8 @@ public ByteBuffer subscriptionUserData(final Set<String> 
topics) {
             LATEST_SUPPORTED_VERSION,
             taskManager.processId(),
             userEndPoint,
-            taskManager.getTaskOffsetSums())
+            taskManager.getTaskOffsetSums(),
+            taskManager.isShutdownRequested())

Review comment:
       This works for the POC, but for the real change, I'd recommend using an 
independent flag, such as an AtomicBoolean.
   
   In the past, we used to piggy-back all kinds of inappropriate flags through 
the TaskManager just because it happened to be shared between these components. 
But it ultimately made TaskManager pretty hard to maintain. Better to just keep 
orthogonal things independent.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
##########
@@ -96,6 +107,14 @@ public SubscriptionInfo(final int version,
             data.setLatestSupportedVersion(latestSupportedVersion);
         }
 
+        if (version >= 8) {
+            if (shutdownRequested == null) { //check is required for testing 
because the mock sets it to null

Review comment:
       We should instead mock realistic values. Having extra branches in the 
code just to handle the default behavior of EasyMock creates maintenance 
problems.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
##########
@@ -82,11 +83,21 @@ public SubscriptionInfo(final int version,
                             final UUID processId,
                             final String userEndPoint,
                             final Map<TaskId, Long> taskOffsetSums) {
+        this(version, latestSupportedVersion, processId, userEndPoint, 
taskOffsetSums, new AtomicInteger(0));
+    }
+
+    public SubscriptionInfo(final int version,
+                            final int latestSupportedVersion,
+                            final UUID processId,
+                            final String userEndPoint,
+                            final Map<TaskId, Long> taskOffsetSums,
+                            final AtomicInteger shutdownRequested) {

Review comment:
       It seems like we can look at this field two ways:
   1. if we're really just flagging "shutdownRequested or not", then a 
`boolean` is fine.
   2. if we're creating a general facility for members to send error codes to 
the assignor, then we should call this field "errorCode" instead, and an `int` 
is fine.

##########
File path: streams/src/main/resources/common/message/SubscriptionInfoData.json
##########
@@ -48,6 +48,11 @@
       "versions": "2+",
       "type": "bytes"
     },
+    {
+      "name": "shutdownRequested",
+      "versions": "8+",
+      "type": "int32"

Review comment:
       32 bits is a lot of possible error codes. Note: these numbers are 
encoding enum values. According to the java spec, you can't have more than 2^16 
members a class. It looks like some people actually experimented with how many 
enum values they could add before the compiler fails, and it tops out at 2746 
values. 
(https://stackoverflow.com/questions/4468388/maximum-number-of-enum-elements-in-java)
   
   Either way, an int16 (`short`) should be more than enough codes.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1436,6 +1447,16 @@ public void onAssignment(final Assignment assignment, 
final ConsumerGroupMetadat
                 topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
                 encodedNextScheduledRebalanceMs = info.nextRebalanceMs();
                 break;
+            case 8:
+                validateActiveTaskEncoding(partitions, info, logPrefix);
+
+                activeTasks = getActiveTasks(partitions, info);
+                partitionsByHost = info.partitionsByHost();
+                standbyPartitionsByHost = info.standbyPartitionByHost();
+                topicToPartitionInfo = getTopicPartitionInfo(partitionsByHost);
+                encodedNextScheduledRebalanceMs = info.nextRebalanceMs();
+                //recive the shutdown then call the request close

Review comment:
       If it's identical, you can just do:
   ```
   case 7:
   case 8:
       validateActive....
       ...
       break;
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -566,10 +567,22 @@ void runLoop() {
                 }
             } catch (final TaskMigratedException e) {
                 handleTaskMigrated(e);
+            } catch (final ShutdownRequestedException e){
+                handleShutdownRequest(e);

Review comment:
       Interesting. This looks like it's probably just a consequence of the way 
the test is set up. Some debugging should clear it up.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
##########
@@ -186,6 +186,15 @@ public ByteBuffer encode() {
                     out.writeInt(errCode);
                     out.writeLong(nextRebalanceMs);
                     break;
+                case 8:
+                    out.writeInt(usedVersion);
+                    out.writeInt(commonlySupportedVersion);
+                    encodeActiveAndStandbyTaskAssignment(out);
+                    encodeActiveAndStandbyHostPartitions(out);
+                    out.writeInt(errCode);
+                    out.writeLong(nextRebalanceMs);
+                    out.writeInt(0);

Review comment:
       What is this for? It looks like the actual "shutdown" error code is just 
encoded in the `errCode` field.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to