pkuwm commented on a change in pull request #1066:
URL: https://github.com/apache/helix/pull/1066#discussion_r498745773
##########
File path: helix-core/src/main/java/org/apache/helix/HelixManager.java
##########
@@ -419,6 +420,17 @@ void
addExternalViewChangeListener(org.apache.helix.ExternalViewChangeListener l
*/
Long getSessionStartTime();
+ /**
+ * Checks whether the cluster manager is leader and returns the session ID
associated to the
+ * connection of cluster data store, if and only if it is leader.
+ *
+ * @return {@code Optional<String>} session ID is present inside the {@code
Optional} object
+ * if the cluster manager is leader. Otherwise, returns an empty {@code
Optional} object.
+ */
+ default Optional<String> getSessionIdIfLead() {
Review comment:
I thought about the private interface. My thought is that this private
interface will increase more work to identify which places will need this
SessionAwareHelixManager and construct it. And it doesn't converge. Eg. if
there is only one place that uses the sessionAwareHelixManager and only one
place checking the session, it's fine.
But if there is mixed usage, we may need to use `instaceof` to check, which
doesn't look nice.
##########
File path: helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
##########
@@ -788,14 +788,18 @@ protected void setupInstances(String clusterName, int[]
instances) {
}
}
- protected void runPipeline(ClusterEvent event, Pipeline pipeline) {
+ protected void runPipeline(ClusterEvent event, Pipeline pipeline, boolean
shouldThrowException)
Review comment:
Actually in the first version of the change, the existing method was
kept and boolean as false by default. Then we synced and agreed that it is a
method in our tests, so we can just directly change it to keep only one single
method. So let's keep the changes now.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/MessageDispatchStage.java
##########
@@ -78,7 +79,17 @@ protected void processEvent(ClusterEvent event,
MessageOutput messageOutput) thr
batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap,
liveInstanceMap,
manager.getProperties());
+ // An early check for expected leader session. If the sessions don't
match, it means the
+ // controller's session changes, then messages should not be sent and
pipeline should stop.
+ Optional<String> expectedSession =
event.getAttribute(AttributeName.EVENT_SESSION.name());
+ if (!expectedSession.isPresent() ||
!expectedSession.get().equals(manager.getSessionId())) {
+ throw new StageException(String.format(
Review comment:
If we continue, it may cause problems or inconsistency. If the
controller loses leadership(session changes), there is no need to keep running
the remaining stages. So to be save, throwing an exception stops the pipeline.
It'll also save CPU and maybe further ZK writes. I guess the session change
case doesn't happen frequently.
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1967,18 +1937,19 @@ public void asyncSetData(final String path, Object
datat, final int version,
new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null);
return;
}
- doAsyncSetData(path, data, version, startT, cb);
+ doAsyncSetData(path, data, version, startT, cb,
parseExpectedSessionId(datat));
}
private void doAsyncSetData(final String path, byte[] data, final int
version, final long startT,
Review comment:
We may do it later if we need the expected session. I'd like to make
this diff minimal.
##########
File path: helix-core/src/main/java/org/apache/helix/model/Message.java
##########
@@ -937,6 +946,13 @@ private boolean isNullOrEmpty(String data) {
return data == null || data.length() == 0 || data.trim().length() == 0;
}
+ private void initStat() {
Review comment:
Actually the existing code is already messing :(
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
##########
@@ -703,9 +705,15 @@ private void handleEvent(ClusterEvent event,
BaseControllerDataProvider dataProv
event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(),
_rebalancerRef.getRebalancer(manager));
- if (!manager.isLeader()) {
- logger.error("Cluster manager: " + manager.getInstanceName() + " is not
leader for " + manager
- .getClusterName() + ". Pipeline will not be invoked");
+ // If manager session changes, no need to run pipeline for the stale event.
Review comment:
It's following the comment. `!eventSessionId.isPresent()` means it is
not leader.
##########
File path: helix-core/src/main/java/org/apache/helix/HelixProperty.java
##########
@@ -148,7 +154,12 @@ public String toString() {
}
}
- private Stat _stat;
+ protected Stat _stat;
+
+ public HelixProperty() {
+ _record = DEFAULT_ZNRECORD;
+ _stat = DEFAULT_STAT;
+ }
Review comment:
I thought about this one before. I think it is fine. The existing
constructors are already kind of messing.. We can not still converge the other
constructors to this one.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]