jiajunwang commented on a change in pull request #1124:
URL: https://github.com/apache/helix/pull/1124#discussion_r456005033
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
##########
@@ -165,6 +168,19 @@ public void setPendingRelayMessage(String resourceName,
Partition partition, Str
setStateMessage(resourceName, partition, instanceName, message,
_pendingRelayMessageMap);
}
+ public Map<String, Map<String, Message>> getStaleMessageMap() {
+ return _staleMessageMap;
+ }
+
+ public void setStaleMessageMap(Map<String, Map<String, Message>>
staleMessageMap) {
+ _staleMessageMap = staleMessageMap;
Review comment:
I would prefer "_staleMessageMap = new HashMap<>(staleMessageMap)" to
avoid implicit reference.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
##########
@@ -103,14 +114,20 @@ public void process(ClusterEvent event) throws Exception {
// update all pending messages to CurrentStateOutput.
private void updatePendingMessages(LiveInstance instance,
Collection<Message> pendingMessages,
CurrentStateOutput currentStateOutput, Collection<Message>
pendingRelayMessages,
- Map<String, Resource> resourceMap) {
+ Map<String, Resource> resourceMap, Map<String, Map<String, Message>>
existingStaleMessages) {
Review comment:
nit, could you please move the existingStaleMessages so it adjacent to
pendingMessages? That makes the params more organized.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
##########
@@ -103,14 +114,20 @@ public void process(ClusterEvent event) throws Exception {
// update all pending messages to CurrentStateOutput.
private void updatePendingMessages(LiveInstance instance,
Collection<Message> pendingMessages,
CurrentStateOutput currentStateOutput, Collection<Message>
pendingRelayMessages,
- Map<String, Resource> resourceMap) {
+ Map<String, Resource> resourceMap, Map<String, Map<String, Message>>
existingStaleMessages) {
Review comment:
Maybe pendingRelayMessages as well : )
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
##########
@@ -194,6 +210,8 @@ public String getCurrentState(String resourceName,
Partition partition, String i
return null;
}
+
Review comment:
remove?
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
##########
@@ -238,6 +256,16 @@ public Message getPendingMessage(String resourceName,
Partition partition, Strin
return getStateMessage(resourceName, partition, instanceName,
_pendingMessageMap);
}
+ public Set<Message> getStaleMessages(String instanceName) {
Review comment:
nit, maybe call it getStaleMessagesByInstance would help to
differentiate it from getStaleMessageMap
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
##########
@@ -56,11 +57,17 @@
*/
public class CurrentStateComputationStage extends AbstractBaseStage {
private static Logger LOG =
LoggerFactory.getLogger(CurrentStateComputationStage.class);
+ private boolean _isTaskFrameworkPipeline = false;
@Override
public void process(ClusterEvent event) throws Exception {
_eventId = event.getEventId();
BaseControllerDataProvider cache =
event.getAttribute(AttributeName.ControllerDataProvider.name());
+
+ if (cache instanceof WorkflowControllerDataProvider) {
Review comment:
This may potentially increase our future work to split the pipeline. We
need a better and more centralized way to tell the pipeline type. Otherwise,
once we need more types it would be very messy. For example aggregation
pipeline.
Please add a TODO here that we should avoid this hacky style.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
##########
@@ -236,6 +267,11 @@ private void updateCurrentStates(LiveInstance instance,
Collection<CurrentState>
}
}
+ private void setStaleMessage(CurrentStateOutput currentStateOutput, String
instanceName,
Review comment:
Any comment?
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
##########
@@ -77,15 +84,19 @@ public void process(ClusterEvent event) throws Exception {
String instanceName = instance.getInstanceName();
String instanceSessionId = instance.getEphemeralOwner();
- // update pending messages
- Map<String, Message> messages = cache.getMessages(instanceName);
- Map<String, Message> relayMessages =
cache.getRelayMessages(instanceName);
- updatePendingMessages(instance, messages.values(), currentStateOutput,
relayMessages.values(), resourceMap);
-
// update current states.
Map<String, CurrentState> currentStateMap =
cache.getCurrentState(instanceName,
instanceSessionId);
updateCurrentStates(instance, currentStateMap.values(),
currentStateOutput, resourceMap);
+
+ Map<String, Map<String, Message>> existingStaleMessages =
cache.getStaleMessages();
+ currentStateOutput.setStaleMessageMap(existingStaleMessages);
+ // update pending messages
+ Map<String, Message> messages = cache.getMessages(instanceName);
+ Map<String, Message> relayMessages =
cache.getRelayMessages(instanceName);
+ updatePendingMessages(instance, messages.values(), currentStateOutput,
+ relayMessages.values(), resourceMap, existingStaleMessages);
+ cache.setStaleMessages(currentStateOutput.getStaleMessageMap());
Review comment:
What if we always calculate based on the current states and pending
message lists? So don't update the cache, or don't have the cache even. Will
that still work?
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
##########
@@ -103,14 +114,20 @@ public void process(ClusterEvent event) throws Exception {
// update all pending messages to CurrentStateOutput.
private void updatePendingMessages(LiveInstance instance,
Collection<Message> pendingMessages,
CurrentStateOutput currentStateOutput, Collection<Message>
pendingRelayMessages,
- Map<String, Resource> resourceMap) {
+ Map<String, Resource> resourceMap, Map<String, Map<String, Message>>
existingStaleMessages) {
String instanceName = instance.getInstanceName();
String instanceSessionId = instance.getEphemeralOwner();
// update all pending messages
for (Message message : pendingMessages) {
+ // ignore existing stale messages
+ if (existingStaleMessages.containsKey(instanceName) &&
existingStaleMessages.get(instanceName)
Review comment:
To simplicity,
existingStaleMessages.getOrDefault(instanceName,
Collection.EMPTY_MAP).containsKey(message.getMsgId())
##########
File path:
helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
##########
@@ -55,6 +55,8 @@
// <instance -> {<MessageId, Message>}>
private Map<String, Map<String, Message>> _messageCache = Maps.newHashMap();
+ private Map<String, Map<String, Message>> _staleMessageCache =
Maps.newHashMap();
Review comment:
Please comment on what are the 2 strings in the map.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
##########
@@ -77,15 +84,19 @@ public void process(ClusterEvent event) throws Exception {
String instanceName = instance.getInstanceName();
String instanceSessionId = instance.getEphemeralOwner();
- // update pending messages
- Map<String, Message> messages = cache.getMessages(instanceName);
- Map<String, Message> relayMessages =
cache.getRelayMessages(instanceName);
- updatePendingMessages(instance, messages.values(), currentStateOutput,
relayMessages.values(), resourceMap);
-
// update current states.
Map<String, CurrentState> currentStateMap =
cache.getCurrentState(instanceName,
instanceSessionId);
updateCurrentStates(instance, currentStateMap.values(),
currentStateOutput, resourceMap);
+
+ Map<String, Map<String, Message>> existingStaleMessages =
cache.getStaleMessages();
+ currentStateOutput.setStaleMessageMap(existingStaleMessages);
+ // update pending messages
+ Map<String, Message> messages = cache.getMessages(instanceName);
+ Map<String, Message> relayMessages =
cache.getRelayMessages(instanceName);
+ updatePendingMessages(instance, messages.values(), currentStateOutput,
+ relayMessages.values(), resourceMap, existingStaleMessages);
Review comment:
Since currentStateOutput already has existingStaleMessages, we don't
need existingStaleMessages in the parameter list, right?
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
##########
@@ -77,15 +84,19 @@ public void process(ClusterEvent event) throws Exception {
String instanceName = instance.getInstanceName();
String instanceSessionId = instance.getEphemeralOwner();
- // update pending messages
- Map<String, Message> messages = cache.getMessages(instanceName);
- Map<String, Message> relayMessages =
cache.getRelayMessages(instanceName);
- updatePendingMessages(instance, messages.values(), currentStateOutput,
relayMessages.values(), resourceMap);
-
// update current states.
Map<String, CurrentState> currentStateMap =
cache.getCurrentState(instanceName,
instanceSessionId);
updateCurrentStates(instance, currentStateMap.values(),
currentStateOutput, resourceMap);
+
+ Map<String, Map<String, Message>> existingStaleMessages =
cache.getStaleMessages();
+ currentStateOutput.setStaleMessageMap(existingStaleMessages);
+ // update pending messages
+ Map<String, Message> messages = cache.getMessages(instanceName);
+ Map<String, Message> relayMessages =
cache.getRelayMessages(instanceName);
+ updatePendingMessages(instance, messages.values(), currentStateOutput,
+ relayMessages.values(), resourceMap, existingStaleMessages);
+ cache.setStaleMessages(currentStateOutput.getStaleMessageMap());
Review comment:
Is this cache designed for performance concern only?
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
##########
@@ -238,6 +256,16 @@ public Message getPendingMessage(String resourceName,
Partition partition, Strin
return getStateMessage(resourceName, partition, instanceName,
_pendingMessageMap);
}
+ public Set<Message> getStaleMessages(String instanceName) {
+ if (_staleMessageMap.containsKey(instanceName)) {
Review comment:
Is this check necessary? You checked that if staleMessageMap is null or
not later, right?
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
##########
@@ -371,6 +404,11 @@ private boolean shouldCleanUpPendingMessage(Message
pendingMsg, String currentSt
}
}
+ private boolean shouldCleanUpStaleMessage(Long
currentStateTransitionEndTime) {
Review comment:
nit, I think we can just put this logic to the caller. Since there is
just one caller, and the logic is simple. The additional method layers will
increase latency a little bit you know.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
##########
@@ -77,8 +81,9 @@ protected void processEvent(ClusterEvent event,
ResourcesStateMap resourcesState
BaseControllerDataProvider cache =
event.getAttribute(AttributeName.ControllerDataProvider.name());
Map<String, Resource> resourceMap =
event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
- Map<String, Map<String, Message>> pendingMessagesToCleanUp = new
HashMap<>();
CurrentStateOutput currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.name());
+
+ Map<String, Map<String, Message>> pendingMessagesToCleanUp = new
HashMap<>();
Review comment:
Takes me a while to figure it out. So we shall call it messagesToCleanup
now, right?
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
##########
@@ -335,15 +362,21 @@ private void addGeneratedMessageToMap(final Message
message,
*/
private void schedulePendingMessageCleanUp(
final Map<String, Map<String, Message>> pendingMessagesToPurge,
ExecutorService workerPool,
- final HelixDataAccessor accessor) {
+ final HelixDataAccessor accessor, Map<String, Map<String, Message>>
staleMessageMap) {
workerPool.submit(new Callable<Object>() {
- @Override public Object call() {
+ @Override
+ public Object call() {
for (Map.Entry<String, Map<String, Message>> entry :
pendingMessagesToPurge.entrySet()) {
String instanceName = entry.getKey();
for (Message msg : entry.getValue().values()) {
if (accessor.removeProperty(msg.getKey(accessor.keyBuilder(),
instanceName))) {
LogUtil.logInfo(logger, _eventId, String
.format("Deleted message %s from instance %s",
msg.getMsgId(), instanceName));
+ staleMessageMap.getOrDefault(msg.getTgtName(),
Collections.emptyMap())
Review comment:
So based on what I commented before, if we calculate from scratch in
every pipeline, we won't need to maintain this map here, right? Or we will have
some usage side this stage that we will need to update the content of the map
at runtime?
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
##########
@@ -77,8 +81,9 @@ protected void processEvent(ClusterEvent event,
ResourcesStateMap resourcesState
BaseControllerDataProvider cache =
event.getAttribute(AttributeName.ControllerDataProvider.name());
Map<String, Resource> resourceMap =
event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
- Map<String, Map<String, Message>> pendingMessagesToCleanUp = new
HashMap<>();
CurrentStateOutput currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.name());
+
+ Map<String, Map<String, Message>> pendingMessagesToCleanUp = new
HashMap<>();
Review comment:
Same for the names of the other parameters that are pointing to this
same map.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]