pkuwm commented on a change in pull request #1066:
URL: https://github.com/apache/helix/pull/1066#discussion_r441963031
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1738,29 +1705,36 @@ public Stat writeDataGetStat(final String path, Object
datat, final int expected
return writeDataReturnStat(path, datat, expectedVersion);
}
- public void asyncCreate(final String path, Object datat, final CreateMode
mode,
Review comment:
Because the API is changed so I change the name. I don't think datat
stands for data type. Here it is not a `data type` like `T`, but it is data
that being written to ZK.
Changing the name `datat` -> `data` is not scope of this PR. If necessary, I
would prefer to change all of them in another PR.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/MessageDispatchStage.java
##########
@@ -78,7 +78,17 @@ protected void processEvent(ClusterEvent event,
MessageOutput messageOutput) thr
batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap,
liveInstanceMap,
manager.getProperties());
- List<Message> messagesSent = sendMessages(dataAccessor, outputMessages);
+ String expectedSession =
event.getAttribute(AttributeName.EVENT_SESSION.name());
+ // An early check for expected leader session. If the sessions don't
match, it means the
+ // controller lost leadership, then messages should not be sent and the
pipeline is stopped.
+ // This potentially avoid double masters for a single partition.
+ if (expectedSession != null &&
!manager.getSessionId().equals(expectedSession)) {
Review comment:
That's because in the first version of code, `expectedSession != null`
was not checked :) And I intentionally do it for the purpose of avoiding NPE.
Actually `manager.getSessionId()` won't give null because inside
getSessionId(), session connection is checked. The only change sessionId being
null is zkclient is constructed but session is not yet established. Because of
checkConnected(), the session id might be expired but won't be null.
Anyway, it is a good habit to do what you suggest, especially when we are
not sure if manager.getSessionId() would give null.
##########
File path: helix-core/src/main/java/org/apache/helix/HelixManager.java
##########
@@ -419,6 +420,17 @@ void
addExternalViewChangeListener(org.apache.helix.ExternalViewChangeListener l
*/
Long getSessionStartTime();
+ /**
+ * Checks whether the cluster manager is leader and returns the session ID
associated to the
+ * connection of cluster data store, if and only if it is leader.
+ *
+ * @return {@code Optional<String>} session ID is present inside the {@code
Optional} object
+ * if the cluster manager is leader. Otherwise, returns an empty {@code
Optional} object.
+ */
+ default Optional<String> getSessionIdIfLead() {
Review comment:
I thought about that. I remember you said `HelixManager` is not supposed
to be a public interface and recent customize view changes were also added. It
should be fine not to add this. It is just my habit to consider backward
compatibility in a public interface.
But helix has 3 other internal implementations: `MockZKHelixManager,
DummyClusterManager, MockManager` that implements `HelixManager`. I just did
not want to add empty implementation in these 3 classes. I am fine to add that
if we decide not to add this default implementation.
In terms of `getSessionId()`, maybe we could add more javadoc to make them
clear: getSessionId() returns session directly, while `getSessionIdIfLead()`
checks leadership by reading the cluster data store and returns session id if
lead.
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -2114,13 +2088,43 @@ private String getHexSessionId() {
}
/*
- * Session aware operation needs below requirements:
- * 1. the session id is NOT null or empty
- * 2. create mode is EPHEMERAL or EPHEMERAL_SEQUENTIAL
+ * Gets the zookeeper instance that ensures its session ID matches the
expected session ID.
+ * It is used for write operations that suppose the znode to be created by
the expected session.
*/
- private boolean isSessionAwareOperation(String expectedSessionId, CreateMode
mode) {
- return expectedSessionId != null && !expectedSessionId.isEmpty() && (
- mode == CreateMode.EPHEMERAL || mode ==
CreateMode.EPHEMERAL_SEQUENTIAL);
+ private ZooKeeper getExpectedZookeeper(final String expectedSessionId) {
+ ZooKeeper zk = ((ZkConnection) getConnection()).getZookeeper();
+
+ if (expectedSessionId == null || expectedSessionId.isEmpty()) {
+ return zk;
+ }
+
+ /*
+ * 1. If operation is session aware, we have to check whether or not the
+ * passed-in(expected) session id matches actual session's id.
+ * If not, znode creation is failed. This validation is
+ * critical to guarantee the znode is created by the expected ZK session.
+ *
+ * 2. Otherwise, the operation is NOT session aware.
+ * In this case, we will use the actual zookeeper session to create the
node.
+ */
+ acquireEventLock();
+ try {
+ final String actualSessionId = Long.toHexString(zk.getSessionId());
+ if (!actualSessionId.equals(expectedSessionId)) {
+ throw new ZkSessionMismatchedException(
+ "Failed to get expected zookeeper instance! There is a session id
mismatch. Expected: "
+ + expectedSessionId + ". Actual: " + actualSessionId);
+ }
+
+ /*
+ * Cache the zookeeper reference and make sure later zooKeeper.create()
is being run
+ * under this zookeeper connection. This is to avoid locking
zooKeeper.create() which
+ * may cause potential performance issue.
+ */
+ return ((ZkConnection) getConnection()).getZookeeper();
Review comment:
Rethinking about this code block, I just realized that maybe we don't
need to acquireEventLock: we already have the zk object, so we are checking
this zk's session and using it to execute operations. Even the zkclient's
session changes, which means this zk object expires, it will throw
SessionExpiredException and retry. I think acquireEventLock is redundant here.
The reason why we considered acquireEventLock is, to prevent session changes
after we check zkclient's session. But now we are using the zk object. If the
client's session changes, this zk object expires, we then retry and get another
zk object.
```
private ZooKeeper getExpectedZookeeper(final String expectedSessionId) {
ZooKeeper zk = ((ZkConnection) getConnection()).getZookeeper();
if (expectedSessionId == null || expectedSessionId.isEmpty()) {
return zk;
}
final String actualSessionId = Long.toHexString(zk.getSessionId());
if (!actualSessionId.equals(expectedSessionId)) {
throw new ZkSessionMismatchedException(
"Failed to get expected zookeeper instance! There is a session id
mismatch. Expected: "
+ expectedSessionId + ". Actual: " + actualSessionId);
return zk;
}
```
What do you think?
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
##########
@@ -401,6 +402,8 @@ void checkConnected(long timeout) {
throw new HelixException(
"HelixManager is not connected within retry timeout for cluster " +
_clusterName);
}
+
+ _sessionId = ZKUtil.toHexSessionId(_zkclient.getSessionId());
Review comment:
I remember why I added this: I was using this api getSessionId() when
checking leadership. Since we have `getSessionIdIfLead()`, you are right, we
don't need this.
##########
File path: helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
##########
@@ -95,6 +95,24 @@
*/
boolean[] createChildren(List<String> paths, List<T> records, int options);
+ /**
+ * Use it when creating children under a parent node with an expected ZK
session.
+ * <p>
+ * This will use async api for better performance. If the children already
exist it will return
+ * false.
+ *
+ * @param paths the paths to the children ZNodes
+ * @param records List of data to write to each of the path
+ * @param options Set the type of ZNode see the valid values in {@link
AccessOption}
+ * @param expectedSession The expected ZK session to create children
+ * @return For each child: true if creation succeeded, false otherwise (e.g.
if the child exists)
+ */
+ default boolean[] createChildren(List<String> paths, List<T> records, int
options,
Review comment:
I understand your point. By having a new interface, we could add javadoc
to explain not use it if they don't care about session (maybe helix internal
use only). They still keep using BaseDataAccessor without being confused by the
extra session aware api. This is the benefit.
And another benefit is we may add more session aware apis to the new
interface in the future if we need.
The downside of having a new interface is
1. we have to maintain the extra interfaces: SessionAwareBaseDataAccessor,
SessionAwareHelixDataAccessor, (maybe session aware zkclient?).
2. Need more code changes to implement the new interfaces
(SessionAwareBaseDataAccessor) in this PR.
Let's discuss further.
##########
File path: helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
##########
@@ -788,14 +788,22 @@ protected void setupInstances(String clusterName, int[]
instances) {
}
}
- protected void runPipeline(ClusterEvent event, Pipeline pipeline) {
+ protected void runPipeline(ClusterEvent event, Pipeline pipeline) throws
Exception {
Review comment:
OK then let's just keep this `protected void runPipeline(ClusterEvent
event, Pipeline pipeline, boolean shouldThrowException)
throws Exception;`
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
##########
@@ -372,6 +372,12 @@ public void asyncCreate(String path, Object datat,
CreateMode mode,
_rawZkClient.asyncCreate(path, datat, mode, cb);
}
+ @Override
+ public void asyncCreate(String path, Object datat, CreateMode mode,
Review comment:
Yes, only DedicatedZkClient supports this operation. The others
(FederatedZkClient and SharedZkClient) don't support.
Or did you mean adding a final check using `isManagingConnection()`? Maybe
it is unnecessary?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]