pkuwm commented on a change in pull request #1066:
URL: https://github.com/apache/helix/pull/1066#discussion_r441963031



##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1738,29 +1705,36 @@ public Stat writeDataGetStat(final String path, Object 
datat, final int expected
     return writeDataReturnStat(path, datat, expectedVersion);
   }
 
-  public void asyncCreate(final String path, Object datat, final CreateMode 
mode,

Review comment:
       Because the API is changed so I change the name. I don't think datat 
stands for data type. Here it is not a `data type` like `T`, but it is data 
that being written to ZK.
   Changing the name `datat` -> `data` is not scope of this PR. If necessary, I 
would prefer to change all of them in another PR.

##########
File path: 
helix-core/src/main/java/org/apache/helix/controller/stages/MessageDispatchStage.java
##########
@@ -78,7 +78,17 @@ protected void processEvent(ClusterEvent event, 
MessageOutput messageOutput) thr
         batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, 
liveInstanceMap,
             manager.getProperties());
 
-    List<Message> messagesSent = sendMessages(dataAccessor, outputMessages);
+    String expectedSession = 
event.getAttribute(AttributeName.EVENT_SESSION.name());
+    // An early check for expected leader session. If the sessions don't 
match, it means the
+    // controller lost leadership, then messages should not be sent and the 
pipeline is stopped.
+    // This potentially avoid double masters for a single partition.
+    if (expectedSession != null && 
!manager.getSessionId().equals(expectedSession)) {

Review comment:
       That's because in the first version of code, `expectedSession != null` 
was not checked :) And I intentionally do it for the purpose of avoiding NPE.
   
   Actually `manager.getSessionId()` won't give null because inside 
getSessionId(), session connection is checked. The only change sessionId being 
null is zkclient is constructed but session is not yet established. Because of 
checkConnected(), the session id might be expired but won't be null.
   
   Anyway, it is a good habit to do what you suggest, especially when we are 
not sure if manager.getSessionId() would give null.

##########
File path: helix-core/src/main/java/org/apache/helix/HelixManager.java
##########
@@ -419,6 +420,17 @@ void 
addExternalViewChangeListener(org.apache.helix.ExternalViewChangeListener l
    */
   Long getSessionStartTime();
 
+  /**
+   * Checks whether the cluster manager is leader and returns the session ID 
associated to the
+   * connection of cluster data store, if and only if it is leader.
+   *
+   * @return {@code Optional<String>} session ID is present inside the {@code 
Optional} object
+   * if the cluster manager is leader. Otherwise, returns an empty {@code 
Optional} object.
+   */
+  default Optional<String> getSessionIdIfLead() {

Review comment:
       I thought about that. I remember you said `HelixManager` is not supposed 
to be a public interface and recent customize view changes were also added. It 
should be fine not to add this. It is just my habit to consider backward 
compatibility in a public interface.
   
   But helix has 3 other internal implementations: `MockZKHelixManager, 
DummyClusterManager, MockManager` that implements `HelixManager`. I just did 
not want to add empty implementation in these 3 classes. I am fine to add that 
if we decide not to add this default implementation. 
   
   In terms of `getSessionId()`, maybe we could add more javadoc to make them 
clear: getSessionId() returns session directly, while `getSessionIdIfLead()` 
checks leadership by reading the cluster data store and returns session id if 
lead.

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -2114,13 +2088,43 @@ private String getHexSessionId() {
   }
 
   /*
-   * Session aware operation needs below requirements:
-   * 1. the session id is NOT null or empty
-   * 2. create mode is EPHEMERAL or EPHEMERAL_SEQUENTIAL
+   * Gets the zookeeper instance that ensures its session ID matches the 
expected session ID.
+   * It is used for write operations that suppose the znode to be created by 
the expected session.
    */
-  private boolean isSessionAwareOperation(String expectedSessionId, CreateMode 
mode) {
-    return expectedSessionId != null && !expectedSessionId.isEmpty() && (
-        mode == CreateMode.EPHEMERAL || mode == 
CreateMode.EPHEMERAL_SEQUENTIAL);
+  private ZooKeeper getExpectedZookeeper(final String expectedSessionId) {
+    ZooKeeper zk = ((ZkConnection) getConnection()).getZookeeper();
+
+    if (expectedSessionId == null || expectedSessionId.isEmpty()) {
+      return zk;
+    }
+
+    /*
+     * 1. If operation is session aware, we have to check whether or not the
+     * passed-in(expected) session id matches actual session's id.
+     * If not, znode creation is failed. This validation is
+     * critical to guarantee the znode is created by the expected ZK session.
+     *
+     * 2. Otherwise, the operation is NOT session aware.
+     * In this case, we will use the actual zookeeper session to create the 
node.
+     */
+    acquireEventLock();
+    try {
+      final String actualSessionId = Long.toHexString(zk.getSessionId());
+      if (!actualSessionId.equals(expectedSessionId)) {
+        throw new ZkSessionMismatchedException(
+            "Failed to get expected zookeeper instance! There is a session id 
mismatch. Expected: "
+                + expectedSessionId + ". Actual: " + actualSessionId);
+      }
+
+      /*
+       * Cache the zookeeper reference and make sure later zooKeeper.create() 
is being run
+       * under this zookeeper connection. This is to avoid locking 
zooKeeper.create() which
+       * may cause potential performance issue.
+       */
+      return ((ZkConnection) getConnection()).getZookeeper();

Review comment:
       Rethinking about this code block, I just realized that maybe we don't 
need to acquireEventLock: we already have the zk object, so we are checking 
this zk's session and using it to execute operations. Even the zkclient's 
session changes, which means this zk object expires, it will throw 
SessionExpiredException and retry. I think acquireEventLock is redundant here.
   
   The reason why we considered acquireEventLock is, to prevent session changes 
after we check zkclient's session. But now we are using the zk object. If the 
client's session changes, this zk object expires, we then retry and get another 
zk object.
   
   ```
   private ZooKeeper getExpectedZookeeper(final String expectedSessionId) {
       ZooKeeper zk = ((ZkConnection) getConnection()).getZookeeper();
   
       if (expectedSessionId == null || expectedSessionId.isEmpty()) {
         return zk;
       }
   
       final String actualSessionId = Long.toHexString(zk.getSessionId());
       if (!actualSessionId.equals(expectedSessionId)) {
         throw new ZkSessionMismatchedException(
             "Failed to get expected zookeeper instance! There is a session id 
mismatch. Expected: "
                   + expectedSessionId + ". Actual: " + actualSessionId);
   
       return zk;
     }
   ```
   
   What do you think?

##########
File path: 
helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
##########
@@ -401,6 +402,8 @@ void checkConnected(long timeout) {
       throw new HelixException(
           "HelixManager is not connected within retry timeout for cluster " + 
_clusterName);
     }
+
+    _sessionId = ZKUtil.toHexSessionId(_zkclient.getSessionId());

Review comment:
       I remember why I added this: I was using this api getSessionId() when 
checking leadership. Since we have `getSessionIdIfLead()`, you are right, we 
don't need this.

##########
File path: helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java
##########
@@ -95,6 +95,24 @@
    */
   boolean[] createChildren(List<String> paths, List<T> records, int options);
 
+  /**
+   * Use it when creating children under a parent node with an expected ZK 
session.
+   * <p>
+   * This will use async api for better performance. If the children already 
exist it will return
+   * false.
+   *
+   * @param paths the paths to the children ZNodes
+   * @param records List of data to write to each of the path
+   * @param options Set the type of ZNode see the valid values in {@link 
AccessOption}
+   * @param expectedSession The expected ZK session to create children
+   * @return For each child: true if creation succeeded, false otherwise (e.g. 
if the child exists)
+   */
+  default boolean[] createChildren(List<String> paths, List<T> records, int 
options,

Review comment:
       I understand your point. By having a new interface, we could add javadoc 
to explain not use it if they don't care about session (maybe helix internal 
use only). They still keep using BaseDataAccessor without being confused by the 
extra session aware api. This is the benefit.
   
   And another benefit is we may add more session aware apis to the new 
interface in the future if we need.
   
   The downside of having a new interface is 
   1. we have to maintain the extra interfaces: SessionAwareBaseDataAccessor, 
SessionAwareHelixDataAccessor, (maybe session aware zkclient?).
   2. Need more code changes to implement the new interfaces 
(SessionAwareBaseDataAccessor) in this PR.
   
   Let's discuss further.
   
   

##########
File path: helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
##########
@@ -788,14 +788,22 @@ protected void setupInstances(String clusterName, int[] 
instances) {
     }
   }
 
-  protected void runPipeline(ClusterEvent event, Pipeline pipeline) {
+  protected void runPipeline(ClusterEvent event, Pipeline pipeline) throws 
Exception {

Review comment:
       OK then let's just keep this `protected void runPipeline(ClusterEvent 
event, Pipeline pipeline, boolean shouldThrowException)
          throws Exception;`

##########
File path: 
zookeeper-api/src/main/java/org/apache/helix/zookeeper/impl/client/DedicatedZkClient.java
##########
@@ -372,6 +372,12 @@ public void asyncCreate(String path, Object datat, 
CreateMode mode,
     _rawZkClient.asyncCreate(path, datat, mode, cb);
   }
 
+  @Override
+  public void asyncCreate(String path, Object datat, CreateMode mode,

Review comment:
       Yes, only DedicatedZkClient supports this operation. The others 
(FederatedZkClient and SharedZkClient) don't support.
   
   Or did you mean adding a final check using `isManagingConnection()`? Maybe 
it is unnecessary?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to