showuon commented on code in PR #11983:
URL: https://github.com/apache/kafka/pull/11983#discussion_r863399174


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java:
##########
@@ -148,18 +148,18 @@ public class IncrementalCooperativeConnectProtocol {
      *   Current Assignment => [Byte]
      * </pre>
      */
-    public static ByteBuffer serializeMetadata(ExtendedWorkerState 
workerState, boolean sessioned) {
+    public static ByteBuffer serializeMetadata(ExtendedWorkerState 
workerState) {

Review Comment:
   Sorry, @C0urante , I was wrong. `serializeMetadata` will be called in 
`metadataRequest` method. And the `metadataRequest` is trying to create a 
`JoinGroupRequestProtocolCollection` containing all supported protocols. So if 
it's `sessioned`, we should return a collection containing `sessioned, 
compatible, eagar`. But after our change, we will return `sessioned, sessioned, 
eagar`, which is wrong. I think we should keep `sessioned` argument here.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -336,13 +332,16 @@ ClusterAssignment performTaskAssignment(
         log.debug("Incremental connector assignments: {}", 
incrementalConnectorAssignments);
         log.debug("Incremental task assignments: {}", 
incrementalTaskAssignments);
 
+        Map<String, Collection<String>> revokedConnectors = 
transformValues(toRevoke, ConnectorsAndTasks::connectors);
+        Map<String, Collection<ConnectorTaskId>> revokedTasks = 
transformValues(toRevoke, ConnectorsAndTasks::tasks);
+
         return new ClusterAssignment(
                 incrementalConnectorAssignments,
                 incrementalTaskAssignments,
-                transformValues(toRevoke, ConnectorsAndTasks::connectors),
-                transformValues(toRevoke, ConnectorsAndTasks::tasks),
-                connectorAssignments,
-                taskAssignments
+                revokedConnectors,
+                revokedTasks,
+                diff(connectorAssignments, revokedConnectors),
+                diff(taskAssignments, revokedTasks)

Review Comment:
   Yes, you're correct! Thanks for the explanation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to