vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r416185907



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set<String> 
allSourceTopics,
             allTasks, clientStates, numStandbyReplicas());
 
         final TaskAssignor taskAssignor;
-        if (highAvailabilityEnabled) {
-            if (lagComputationSuccessful) {
-                taskAssignor = new HighAvailabilityTaskAssignor(
-                    clientStates,
-                    allTasks,
-                    statefulTasks,
-                    assignmentConfigs);
-            } else {
-                log.info("Failed to fetch end offsets for changelogs, will 
return previous assignment to clients and "
-                             + "trigger another rebalance to retry.");
-                setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-                taskAssignor = new StickyTaskAssignor(clientStates, allTasks, 
statefulTasks, assignmentConfigs, true);
-            }
+        if (!lagComputationSuccessful) {

Review comment:
       I agree with you on the principle. I think I'm just reaching a different 
conclusion.
   
   I think we can leave aside the practical thought about the likelihood of 
total failure when the end-offset API fails permanently. That's really more of 
a supporting point that maybe this isn't a terrible idea.
   
   The thing that makes me think that this is really preferable is that the 
task lags are an input to the TaskAssignor interface. It seems unreasonable for 
the StreamsPartitionAssignor would inspect the configured task assignor class 
and then decide just to pass in the ClientStates with one of the properties 
missing because it thinks it knows that particular assignor won't use it. 
Speaking of responsibility, it seems like it's not the responsibility of this 
class to reason about the implementation of each TaskAssignor. The purpose of 
an interface is that we don't have to worry about it, we just have to satisfy 
the interface. I think it would be just as strange, if not stranger to imagine 
coming back to the code base and trying to figure out why it's ok to pass a 
broken ClientState object just into the StickyTaskAssignor.
   
   I actually can attest that that last point _is_ strange, having just spent a 
bunch of time in the integration tests, trying to figure out which combination 
of ClientState arguments are actually preconditions for `TaskAssignor#assign`, 
and why it was ok to mock some, but not all, of them, some of the time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to