jiajunwang commented on a change in pull request #1124:
URL: https://github.com/apache/helix/pull/1124#discussion_r457697703
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
##########
@@ -335,15 +363,21 @@ private void addGeneratedMessageToMap(final Message
message,
*/
private void schedulePendingMessageCleanUp(
final Map<String, Map<String, Message>> pendingMessagesToPurge,
ExecutorService workerPool,
- final HelixDataAccessor accessor) {
+ final HelixDataAccessor accessor, Map<String, Map<String, Message>>
staleMessageMap) {
workerPool.submit(new Callable<Object>() {
- @Override public Object call() {
+ @Override
+ public Object call() {
for (Map.Entry<String, Map<String, Message>> entry :
pendingMessagesToPurge.entrySet()) {
String instanceName = entry.getKey();
for (Message msg : entry.getValue().values()) {
if (accessor.removeProperty(msg.getKey(accessor.keyBuilder(),
instanceName))) {
LogUtil.logInfo(logger, _eventId, String
.format("Deleted message %s from instance %s",
msg.getMsgId(), instanceName));
+ staleMessageMap.getOrDefault(msg.getTgtName(),
Collections.emptyMap())
Review comment:
Now I have a better understanding of your change. And I think this
cleanup is not necessary.
The stale message map in the current state output object is re-calculated
from the cache every time. So as long as we clean the cache properly, we don't
need to care about the output map. Please correct me if I missed anything.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
##########
@@ -544,6 +544,14 @@ public synchronized void
setLiveInstances(List<LiveInstance> liveInstances) {
return _instanceMessagesCache.getMessages(instanceName);
}
+ public Map<String, Map<String, Message>> getStaleMessages() {
+ return _instanceMessagesCache.getStaleMessageCache();
+ }
+
+ public void setStaleMessages(Map<String, Map<String, Message>>
staleMessageMap) {
Review comment:
nit, cacheStaleMessages so all the updating message cache methods are
named in the same way?
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
##########
@@ -77,15 +87,20 @@ public void process(ClusterEvent event) throws Exception {
String instanceName = instance.getInstanceName();
String instanceSessionId = instance.getEphemeralOwner();
- // update pending messages
- Map<String, Message> messages = cache.getMessages(instanceName);
- Map<String, Message> relayMessages =
cache.getRelayMessages(instanceName);
- updatePendingMessages(instance, messages.values(), currentStateOutput,
relayMessages.values(), resourceMap);
-
// update current states.
Map<String, CurrentState> currentStateMap =
cache.getCurrentState(instanceName,
instanceSessionId);
updateCurrentStates(instance, currentStateMap.values(),
currentStateOutput, resourceMap);
+
+ Map<String, Map<String, Message>> existingStaleMessages =
Review comment:
nit, can we follow the same style that how the other 2 maps are used
here?
Basically passing the existingStaleMessages to updatePendingMessages too
instead of set to the currentStateOutput here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]