jiajunwang commented on a change in pull request #1124:
URL: https://github.com/apache/helix/pull/1124#discussion_r458303927
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
##########
@@ -102,15 +114,22 @@ public void process(ClusterEvent event) throws Exception {
// update all pending messages to CurrentStateOutput.
private void updatePendingMessages(LiveInstance instance,
Collection<Message> pendingMessages,
- CurrentStateOutput currentStateOutput, Collection<Message>
pendingRelayMessages,
- Map<String, Resource> resourceMap) {
+ Collection<Message> pendingRelayMessages,
+ Map<String, Map<String, Message>> existingStaleMessages,
+ CurrentStateOutput currentStateOutput, Map<String, Resource>
resourceMap) {
String instanceName = instance.getInstanceName();
String instanceSessionId = instance.getEphemeralOwner();
// update all pending messages
for (Message message : pendingMessages) {
+ // ignore existing stale messages
+ if (existingStaleMessages.getOrDefault(instanceName,
Collections.emptyMap())
Review comment:
We can call currentStateOutput.getStaleMessagesByInstance() to get the
message set as the input, right?
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
##########
@@ -129,7 +148,14 @@ private void updatePendingMessages(LiveInstance instance,
Collection<Message> pe
String partitionName = message.getPartitionName();
Partition partition = resource.getPartition(partitionName);
if (partition != null) {
- setMessageState(currentStateOutput, resourceName, partition,
instanceName, message);
+ String currentState =
currentStateOutput.getCurrentState(resourceName, partition,
+ instanceName);
+ if (_isTaskFrameworkPipeline || !isStaleMessage(message,
currentState)) {
+ setMessageState(currentStateOutput, resourceName, partition,
instanceName, message);
+ } else {
+ existingStaleMessages.putIfAbsent(instanceName, new HashMap<>());
Review comment:
As mentioned above, I prefer this to be an explicit set method,
addStaleMessage()
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
##########
@@ -544,6 +544,19 @@ public synchronized void
setLiveInstances(List<LiveInstance> liveInstances) {
return _instanceMessagesCache.getMessages(instanceName);
}
+ public Map<String, Map<String, Message>> getStaleMessages() {
Review comment:
So this one will return the map for modifying. I still don't like this
implicit map/list ref for modifying purposes.
As I checked the current usage, I think you can replace it with 2 explicit
modifying methods,
1. addStaleMessage(instance, message)
2. removeStaleMessage(instance, message) -- or maybe always clean up when
the cache is refreshed, so you don't even need this method?
As for supporting test, we can make it protected so only visible to test. Or
keeping this method but always return shallow copy of the map only for the
tests.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
##########
@@ -188,14 +196,21 @@ private void generateMessage(final Resource resource,
final BaseControllerDataPr
if (pendingMessage != null &&
shouldCleanUpPendingMessage(pendingMessage, currentState,
currentStateOutput.getEndTime(resourceName, partition,
instanceName))) {
- LogUtil.logInfo(logger, _eventId, String.format(
- "Adding pending message %s on instance %s to clean up. Msg:
%s->%s, current state of resource %s:%s is %s",
- pendingMessage.getMsgId(), instanceName,
pendingMessage.getFromState(),
- pendingMessage.getToState(), resourceName, partition,
currentState));
- if (!pendingMessagesToCleanUp.containsKey(instanceName)) {
- pendingMessagesToCleanUp.put(instanceName, new HashMap<String,
Message>());
+ logAndAddToCleanUp(messagesToCleanUp, pendingMessage, instanceName,
resourceName,
+ partition, currentState, PENDING_MESSAGE);
+ }
+
+ for (Message staleMessage : staleMessages) {
+ if (staleMessage == null) {
+ logger.warn("Should not contain a stale message as null");
Review comment:
Is it really possible that we have a null element in the return set?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]