GrantPSpencer commented on code in PR #3043:
URL: https://github.com/apache/helix/pull/3043#discussion_r2099297852


##########
helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java:
##########
@@ -290,6 +315,113 @@ private void generateMessage(final Resource resource, 
final BaseControllerDataPr
     } // end of for-each-partition
   }
 
+  /**
+   * Check if a state is a top state
+   * @param state The state to check
+   * @param stateModelDef The state model definition
+   * @return True if it's a top state, false otherwise
+   */
+  private boolean isTopState(String state, StateModelDefinition stateModelDef) 
{
+    return stateModelDef.getTopState().contains(state);
+  }
+
+  /**
+   * Check if a state is a second top state
+   * @param state The state to check
+   * @param stateModelDef The state model definition
+   * @return True if it's a second top state, false otherwise
+   */
+  private boolean isSecondTopState(String state, StateModelDefinition 
stateModelDef) {
+    return stateModelDef.getSecondTopStates().contains(state);
+  }
+
+  /**
+   * Check if a state transition is already in the pending messages
+   * @param resourceName The resource name
+   * @param partition The partition
+   * @param instanceName The instance name
+   * @param fromState The from state
+   * @param toState The to state
+   * @param pendingMessages The list of pending messages
+   * @return True if the state transition is already in pending messages, 
false otherwise
+   */
+  private boolean isInPendingMessages(String resourceName, Partition 
partition, String instanceName,
+      String fromState, String toState, List<Message> pendingMessages) {
+
+    for (Message message : pendingMessages) {
+      if (message.getResourceName().equals(resourceName)
+          && message.getPartitionName().equals(partition.getPartitionName())
+          && message.getTgtName().equals(instanceName) && 
message.getFromState().equals(fromState)
+          && message.getToState().equals(toState)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  /**
+   * Check if a state transition is upward
+   * @param fromState The from state
+   * @param toState The to state
+   * @param stateModelDef The state model definition
+   * @return True if it's an upward state transition, false otherwise
+   */
+  private boolean isUpwardStateTransition(String fromState, String toState,
+      StateModelDefinition stateModelDef) {
+
+    if (fromState == null || toState == null) {
+      return false;
+    }
+
+    Map<String, Integer> statePriorityMap = 
stateModelDef.getStatePriorityMap();
+
+    Integer fromStateWeight = statePriorityMap.get(fromState);
+    Integer toStateWeight = statePriorityMap.get(toState);
+
+    if (fromStateWeight == null || toStateWeight == null) {
+      return false;
+    }
+
+    return toStateWeight < fromStateWeight;
+  }
+
+  /**

Review Comment:
   The method name does not feel congruent with the description. May want to 
modify method name to clarify that we are not getting all pending upward state 
transition messages, just those that are upward and not from second-top state



##########
helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java:
##########
@@ -290,6 +315,113 @@ private void generateMessage(final Resource resource, 
final BaseControllerDataPr
     } // end of for-each-partition
   }
 
+  /**
+   * Check if a state is a top state
+   * @param state The state to check
+   * @param stateModelDef The state model definition
+   * @return True if it's a top state, false otherwise
+   */
+  private boolean isTopState(String state, StateModelDefinition stateModelDef) 
{
+    return stateModelDef.getTopState().contains(state);
+  }
+
+  /**
+   * Check if a state is a second top state
+   * @param state The state to check
+   * @param stateModelDef The state model definition
+   * @return True if it's a second top state, false otherwise
+   */
+  private boolean isSecondTopState(String state, StateModelDefinition 
stateModelDef) {
+    return stateModelDef.getSecondTopStates().contains(state);
+  }
+
+  /**
+   * Check if a state transition is already in the pending messages
+   * @param resourceName The resource name
+   * @param partition The partition
+   * @param instanceName The instance name
+   * @param fromState The from state
+   * @param toState The to state
+   * @param pendingMessages The list of pending messages
+   * @return True if the state transition is already in pending messages, 
false otherwise
+   */
+  private boolean isInPendingMessages(String resourceName, Partition 
partition, String instanceName,
+      String fromState, String toState, List<Message> pendingMessages) {
+
+    for (Message message : pendingMessages) {
+      if (message.getResourceName().equals(resourceName)
+          && message.getPartitionName().equals(partition.getPartitionName())
+          && message.getTgtName().equals(instanceName) && 
message.getFromState().equals(fromState)
+          && message.getToState().equals(toState)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  /**
+   * Check if a state transition is upward
+   * @param fromState The from state
+   * @param toState The to state
+   * @param stateModelDef The state model definition
+   * @return True if it's an upward state transition, false otherwise
+   */
+  private boolean isUpwardStateTransition(String fromState, String toState,
+      StateModelDefinition stateModelDef) {
+
+    if (fromState == null || toState == null) {
+      return false;
+    }
+
+    Map<String, Integer> statePriorityMap = 
stateModelDef.getStatePriorityMap();
+
+    Integer fromStateWeight = statePriorityMap.get(fromState);
+    Integer toStateWeight = statePriorityMap.get(toState);
+
+    if (fromStateWeight == null || toStateWeight == null) {
+      return false;
+    }
+
+    return toStateWeight < fromStateWeight;
+  }
+
+  /**
+   * Get pending upward state transition messages from non-second top state to 
second top or top
+   * state
+   * @param resourceName The resource name
+   * @param partition The partition
+   * @param currentStateOutput The current state output
+   * @param stateModelDef The state model definition
+   * @return List of pending messages for upward state transitions
+   */
+  private List<Message> getPendingUpwardStateTransitionMessages(String 
resourceName,
+      Partition partition, CurrentStateOutput currentStateOutput,
+      StateModelDefinition stateModelDef) {
+    List<Message> pendingUpwardSTMessages = new ArrayList<>();
+
+    // Instance -> PendingMessage
+    Map<String, Message> pendingMessages =
+        currentStateOutput.getPendingMessageMap(resourceName, partition);
+
+    if (pendingMessages != null && !pendingMessages.isEmpty()) {
+      for (Message message : pendingMessages.values()) {
+        String fromState = message.getFromState();
+        String toState = message.getToState();
+
+        // Check if it's an upward state transition from non-second top state 
to second top or top
+        // state
+        if (isUpwardStateTransition(fromState, toState, stateModelDef)
+            && !isSecondTopState(fromState, stateModelDef)
+            && (isSecondTopState(toState, stateModelDef) || 
isTopState(toState, stateModelDef))) {

Review Comment:
   Can pending messages  have a _toState_ that is more than 1 hop from their 
_fromState_? From that, will  `isTopState(toState, stateModelDef)` ever 
evaluate to true when the !isSecondTopState(fromState, stateModelDef) statement 
is also true (from state is not second top state). In other words, will the to 
state ever not be the top state if the from state is the second top state for 
an upward state transition? 



##########
helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java:
##########
@@ -250,17 +260,32 @@ private void generateMessage(final Resource resource, 
final BaseControllerDataPr
                     pendingMessage, manager, resource, partition, 
sessionIdMap, instanceName,
                     stateModelDef, cancellationMessage, isCancellationEnabled);
           } else {
+            // Default currentReplicaNumber is -1 (no prioritization)
+            int currentReplicaNumber = -1;
+            // Check if this is an upward state transition from non-second top 
state to second top
+            // or top state
+            if (isUpwardStateTransition(currentState, nextState, stateModelDef)
+                && !stateModelDef.getSecondTopStates().contains(currentState)
+                && (isSecondTopState(nextState, stateModelDef)
+                    || isTopState(nextState, stateModelDef))
+                && !isInPendingMessages(resourceName, partition, instanceName, 
currentState,
+                    nextState, pendingUpwardStateTransitionMessages)) {

Review Comment:
   is the isInPendingMessages check necessary if we're in else statement that 
is executed when pending message is null?
   pendingMessage =
               currentStateOutput.getPendingMessage(resourceName, partition, 
instanceName);



##########
helix-core/src/main/java/org/apache/helix/model/Message.java:
##########
@@ -935,6 +936,22 @@ public void setSrcClusterName(String clusterName) {
     _record.setSimpleField(Attributes.SRC_CLUSTER.name(), clusterName);
   }
 
+  /**
+   * Set the currentReplicaNumber for transition-related messages

Review Comment:
   Could you clarify how currentReplicaNumber can be used for prioritization? 
Seems this needs to be implemented on participant side somehow? 



##########
helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java:
##########
@@ -290,6 +315,113 @@ private void generateMessage(final Resource resource, 
final BaseControllerDataPr
     } // end of for-each-partition
   }
 
+  /**
+   * Check if a state is a top state
+   * @param state The state to check
+   * @param stateModelDef The state model definition
+   * @return True if it's a top state, false otherwise
+   */
+  private boolean isTopState(String state, StateModelDefinition stateModelDef) 
{
+    return stateModelDef.getTopState().contains(state);
+  }
+
+  /**
+   * Check if a state is a second top state
+   * @param state The state to check
+   * @param stateModelDef The state model definition
+   * @return True if it's a second top state, false otherwise
+   */
+  private boolean isSecondTopState(String state, StateModelDefinition 
stateModelDef) {
+    return stateModelDef.getSecondTopStates().contains(state);
+  }
+
+  /**
+   * Check if a state transition is already in the pending messages
+   * @param resourceName The resource name
+   * @param partition The partition
+   * @param instanceName The instance name
+   * @param fromState The from state
+   * @param toState The to state
+   * @param pendingMessages The list of pending messages
+   * @return True if the state transition is already in pending messages, 
false otherwise
+   */
+  private boolean isInPendingMessages(String resourceName, Partition 
partition, String instanceName,
+      String fromState, String toState, List<Message> pendingMessages) {
+
+    for (Message message : pendingMessages) {
+      if (message.getResourceName().equals(resourceName)
+          && message.getPartitionName().equals(partition.getPartitionName())
+          && message.getTgtName().equals(instanceName) && 
message.getFromState().equals(fromState)
+          && message.getToState().equals(toState)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  /**
+   * Check if a state transition is upward
+   * @param fromState The from state
+   * @param toState The to state
+   * @param stateModelDef The state model definition
+   * @return True if it's an upward state transition, false otherwise
+   */
+  private boolean isUpwardStateTransition(String fromState, String toState,
+      StateModelDefinition stateModelDef) {
+
+    if (fromState == null || toState == null) {
+      return false;
+    }
+
+    Map<String, Integer> statePriorityMap = 
stateModelDef.getStatePriorityMap();
+
+    Integer fromStateWeight = statePriorityMap.get(fromState);
+    Integer toStateWeight = statePriorityMap.get(toState);
+
+    if (fromStateWeight == null || toStateWeight == null) {
+      return false;
+    }
+
+    return toStateWeight < fromStateWeight;
+  }
+
+  /**
+   * Get pending upward state transition messages from non-second top state to 
second top or top
+   * state
+   * @param resourceName The resource name
+   * @param partition The partition
+   * @param currentStateOutput The current state output
+   * @param stateModelDef The state model definition
+   * @return List of pending messages for upward state transitions
+   */
+  private List<Message> getPendingUpwardStateTransitionMessages(String 
resourceName,
+      Partition partition, CurrentStateOutput currentStateOutput,
+      StateModelDefinition stateModelDef) {
+    List<Message> pendingUpwardSTMessages = new ArrayList<>();
+
+    // Instance -> PendingMessage
+    Map<String, Message> pendingMessages =
+        currentStateOutput.getPendingMessageMap(resourceName, partition);
+
+    if (pendingMessages != null && !pendingMessages.isEmpty()) {
+      for (Message message : pendingMessages.values()) {
+        String fromState = message.getFromState();
+        String toState = message.getToState();
+
+        // Check if it's an upward state transition from non-second top state 
to second top or top
+        // state
+        if (isUpwardStateTransition(fromState, toState, stateModelDef)
+            && !isSecondTopState(fromState, stateModelDef)
+            && (isSecondTopState(toState, stateModelDef) || 
isTopState(toState, stateModelDef))) {

Review Comment:
   likewise an upward to second state could not be from second state. Curious 
if we can change the logic here to just:
   message is an upward state transition AND the from state is not the 2nd top 
state



##########
helix-core/src/main/java/org/apache/helix/util/MessageUtil.java:
##########
@@ -136,4 +136,32 @@ private static Message 
createStateTransitionMessage(Message.MessageType messageT
 
     return message;
   }
+
+  /**
+   * Create a state transition message with currentReplicaNumber for 
prioritization
+   * @param msgSender message sender
+   * @param sessionId session id
+   * @param resource resource
+   * @param partitionName partition name
+   * @param instanceName instance name
+   * @param fromState from state
+   * @param toState to state
+   * @param sessionIdForInstance session id for instance
+   * @param stateModelDefName state model def name
+   * @param currentReplicaNumber the current replica number (for 
prioritization)
+   * @return message
+   */
+  public static Message createStateTransitionMessage(String msgSender, String 
sessionId,

Review Comment:
   should we have this new method be the default and call all the 
message.set___ and have the previous be overloaded to just call this new method 
with -1 as currentReplicas? 



##########
helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java:
##########
@@ -290,6 +315,113 @@ private void generateMessage(final Resource resource, 
final BaseControllerDataPr
     } // end of for-each-partition
   }
 
+  /**
+   * Check if a state is a top state
+   * @param state The state to check
+   * @param stateModelDef The state model definition
+   * @return True if it's a top state, false otherwise
+   */
+  private boolean isTopState(String state, StateModelDefinition stateModelDef) 
{
+    return stateModelDef.getTopState().contains(state);
+  }
+
+  /**
+   * Check if a state is a second top state
+   * @param state The state to check
+   * @param stateModelDef The state model definition
+   * @return True if it's a second top state, false otherwise
+   */
+  private boolean isSecondTopState(String state, StateModelDefinition 
stateModelDef) {
+    return stateModelDef.getSecondTopStates().contains(state);
+  }
+
+  /**
+   * Check if a state transition is already in the pending messages
+   * @param resourceName The resource name
+   * @param partition The partition
+   * @param instanceName The instance name
+   * @param fromState The from state
+   * @param toState The to state
+   * @param pendingMessages The list of pending messages
+   * @return True if the state transition is already in pending messages, 
false otherwise
+   */
+  private boolean isInPendingMessages(String resourceName, Partition 
partition, String instanceName,
+      String fromState, String toState, List<Message> pendingMessages) {
+
+    for (Message message : pendingMessages) {
+      if (message.getResourceName().equals(resourceName)
+          && message.getPartitionName().equals(partition.getPartitionName())
+          && message.getTgtName().equals(instanceName) && 
message.getFromState().equals(fromState)
+          && message.getToState().equals(toState)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  /**
+   * Check if a state transition is upward
+   * @param fromState The from state
+   * @param toState The to state
+   * @param stateModelDef The state model definition
+   * @return True if it's an upward state transition, false otherwise
+   */
+  private boolean isUpwardStateTransition(String fromState, String toState,

Review Comment:
   could consider having this be method of StateModelDefinition class if other 
utility functions exist in there already



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to