desaikomal commented on code in PR #2558:
URL: https://github.com/apache/helix/pull/2558#discussion_r1266045347


##########
meta-client/src/main/java/org/apache/helix/metaclient/datamodel/DataRecord.java:
##########
@@ -35,4 +36,8 @@ public DataRecord(String znodeId) {
   public DataRecord(ZNRecord record) {
     super(record);
   }
+
+  public DataRecord(DataRecord record, String id) {

Review Comment:
   nit: Looks like we are extending directly from ZkRecord. we will need to 
keep them loosely coupled.



##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java:
##########
@@ -42,7 +59,17 @@
  * When the client is used by a leader election service, one client is created 
for each participant.
  *
  */
-public class LeaderElectionClient {
+public class LeaderElectionClient implements AutoCloseable {
+
+  private final MetaClientInterface<LeaderInfo> _metaClient;
+  private final String _participant;
+  private static final Logger LOG = 
LoggerFactory.getLogger(LeaderElectionClient.class);
+
+  // A list of leader election group that this client joins.
+  private Set<String> leaderGroups = new HashSet<>();
+
+  private static String LEADER_ENTRY_KEY = "/LEADER";

Review Comment:
   nit: do we use final as well



##########
meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java:
##########
@@ -83,17 +83,15 @@ public class ZkMetaClient<T> implements 
MetaClientInterface<T>, AutoCloseable {
   // Lock all activities related to ZkClient connection
   private ReentrantLock _zkClientConnectionMutex = new ReentrantLock();
 
-
   public ZkMetaClient(ZkMetaClientConfig config) {
     _initConnectionTimeout = config.getConnectionInitTimeoutInMillis();
     _reconnectTimeout = 
config.getMetaClientReconnectPolicy().getAutoReconnectTimeout();
     // TODO: Right new ZkClient reconnect using exp backoff with fixed max 
backoff interval. We should
     // Allow user to config reconnect policy
-    _zkClient = new ZkClient(
-        new ZkConnection(config.getConnectionAddress(), (int) 
config.getSessionTimeoutInMillis()),
+    _zkClient = new ZkClient(new ZkConnection(config.getConnectionAddress(), 
(int) config.getSessionTimeoutInMillis()),

Review Comment:
   nit: is this formatting ok?



##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java:
##########
@@ -53,36 +80,49 @@ public class LeaderElectionClient {
    * @param metaClientConfig The config used to create an metaclient.
    */
   public LeaderElectionClient(MetaClientConfig metaClientConfig, String 
participant) {
-
+    this._participant = participant;
+    if (metaClientConfig == null) {
+      throw new IllegalArgumentException("MetaClientConfig cannot be null.");
+    }
+    LOG.info("Creating MetaClient for LeaderElectionClient");
+    if 
(MetaClientConfig.StoreType.ZOOKEEPER.equals(metaClientConfig.getStoreType())) {

Review Comment:
   nit: not sure if we have instance of ??



##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java:
##########
@@ -91,10 +131,46 @@ public boolean joinLeaderElectionParticipantPool(String 
leaderPath) {
    *
    * @param leaderPath The path for leader election.
    * @param userInfo Any additional information to associate with this 
participant.
-   * @return boolean indicating if the operation is succeeded.
+   * @throws RuntimeException if the operation is not succeeded.
    */
-  public boolean joinLeaderElectionParticipantPool(String leaderPath, Object 
userInfo) {
-    return false;
+  public void joinLeaderElectionParticipantPool(String leaderPath, LeaderInfo 
userInfo) {
+    // TODO: create participant entry with info
+    subscribeAndTryCreateLeaderEntry(leaderPath);
+  }
+
+  private void subscribeAndTryCreateLeaderEntry(String leaderPath) {
+    _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, 
_reElectListener, false);
+    LeaderInfo leaderInfo = new LeaderInfo(LEADER_ENTRY_KEY);
+    leaderInfo.setLeaderName(_participant);
+
+    try {
+      // try to create leader entry, assuming leader election group node is 
already there
+      _metaClient.create(leaderPath + LEADER_ENTRY_KEY, leaderInfo, 
MetaClientInterface.EntryMode.EPHEMERAL);

Review Comment:
   just a question: do you think, using transaction here will help?



##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java:
##########
@@ -42,7 +59,17 @@
  * When the client is used by a leader election service, one client is created 
for each participant.
  *
  */
-public class LeaderElectionClient {
+public class LeaderElectionClient implements AutoCloseable {
+
+  private final MetaClientInterface<LeaderInfo> _metaClient;
+  private final String _participant;
+  private static final Logger LOG = 
LoggerFactory.getLogger(LeaderElectionClient.class);
+
+  // A list of leader election group that this client joins.
+  private Set<String> leaderGroups = new HashSet<>();

Review Comment:
   your class members can follow the guideline of using _leaderGroups



##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java:
##########
@@ -91,10 +131,46 @@ public boolean joinLeaderElectionParticipantPool(String 
leaderPath) {
    *
    * @param leaderPath The path for leader election.
    * @param userInfo Any additional information to associate with this 
participant.
-   * @return boolean indicating if the operation is succeeded.
+   * @throws RuntimeException if the operation is not succeeded.
    */
-  public boolean joinLeaderElectionParticipantPool(String leaderPath, Object 
userInfo) {
-    return false;
+  public void joinLeaderElectionParticipantPool(String leaderPath, LeaderInfo 
userInfo) {
+    // TODO: create participant entry with info
+    subscribeAndTryCreateLeaderEntry(leaderPath);
+  }
+
+  private void subscribeAndTryCreateLeaderEntry(String leaderPath) {
+    _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, 
_reElectListener, false);
+    LeaderInfo leaderInfo = new LeaderInfo(LEADER_ENTRY_KEY);
+    leaderInfo.setLeaderName(_participant);
+
+    try {
+      // try to create leader entry, assuming leader election group node is 
already there
+      _metaClient.create(leaderPath + LEADER_ENTRY_KEY, leaderInfo, 
MetaClientInterface.EntryMode.EPHEMERAL);
+    } catch (MetaClientNodeExistsException ex) {
+      LOG.info("Already a leader for group {}" , leaderPath);
+    } catch (MetaClientNoNodeException ex) {
+      try {
+        // try to create leader path root entry
+        _metaClient.create(leaderPath, null);

Review Comment:
   can you please explain what is the first create and this second create?



##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java:
##########
@@ -106,23 +182,65 @@ public boolean joinLeaderElectionParticipantPool(String 
leaderPath, Object userI
    * Throws exception if the participant is not in the pool.
    *
    * @param leaderPath The path for leader election.
-   * @return boolean indicating if the operation is succeeded.
+   * @throws RuntimeException if the operation is not succeeded.
    *
-   * @throws RuntimeException If the participant did not join participant pool 
via this client. // TODO: define exp type
+   * @throws RuntimeException If the participant did not join participant pool 
via this client.
    */
-  public boolean exitLeaderElectionParticipantPool(String leaderPath) {
-    return false;
+  public void exitLeaderElectionParticipantPool(String leaderPath) {
+    _metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, 
_reElectListener);
+    // TODO: remove from pool folder
+    relinquishLeaderHelper(leaderPath, true);
   }
 
   /**
-   * Releases leadership for participant.
+   * Releases leadership for participant. Still stays in the participant pool.
    *
    * @param leaderPath The path for leader election.
    *
    * @throws RuntimeException if the leadership is not owned by this 
participant, or if the
-   *                          participant did not join participant pool via 
this client. // TODO: define exp type
+   *                          participant did not join participant pool via 
this client.
    */
   public void relinquishLeader(String leaderPath) {
+    relinquishLeaderHelper(leaderPath, false);
+  }
+
+  /**
+   * relinquishLeaderHelper and LeaderElectionParticipantPool if configured
+   * @param leaderPath
+   * @param exitLeaderElectionParticipantPool
+   */
+  private void relinquishLeaderHelper(String leaderPath, Boolean 
exitLeaderElectionParticipantPool) {
+    String key = leaderPath + LEADER_ENTRY_KEY;
+    // if current client is in the group
+    if (leaderGroups.contains(key)) {
+      // remove leader path from leaderGroups after check if exiting the pool.
+      // to prevent a race condition in In Zk implementation:
+      // If there are delays in ZkClient event queue, it is possible the 
leader election client received leader
+      // deleted event after unsubscribeDataChange. We will need to remove it 
from in memory `leaderGroups` map before
+      // deleting ZNode. So that handler in ReElectListener won't recreate the 
leader node.
+      if (exitLeaderElectionParticipantPool) {
+        leaderGroups.remove(leaderPath + LEADER_ENTRY_KEY);
+      }
+      // check if current participant is the leader
+      // read data and stats, check, and multi check + delete
+      ImmutablePair<LeaderInfo, MetaClientInterface.Stat> tup = 
_metaClient.getDataAndStat(key);
+      if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) {
+        int expectedVersion = tup.right.getVersion();
+        List<Op> ops = Arrays.asList(Op.check(key, expectedVersion), 
Op.delete(key, expectedVersion));
+        //Execute transactional support on operations
+        List<OpResult> opResults = _metaClient.transactionOP(ops);
+        if (opResults.get(0).getType() == ERRORRESULT) {
+          if (isLeader(leaderPath)) {
+            // Participant re-elected as leader.
+            throw new ConcurrentModificationException("Concurrent operation, 
please retry");
+          } else {
+            LOG.info("Someone else is already leader");
+          }
+        }
+      }
+    } else {

Review Comment:
   it may be better to have reverse condition, so you exit cleanly first.. ie. 
reverse the if (condition), so that this code exits first.



##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java:
##########
@@ -133,6 +251,11 @@ public void relinquishLeader(String leaderPath) {
    * @throws RuntimeException when leader path does not exist. // TODO: define 
exp type
    */
   public String getLeader(String leaderPath) {
+    LeaderInfo leaderInfo = _metaClient.get(leaderPath + LEADER_ENTRY_KEY);
+    return leaderInfo == null ? null : leaderInfo.getLeaderName();
+  }
+
+  public LeaderInfo getParticipantInfo(String leaderPath) {

Review Comment:
   add todo..



##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java:
##########
@@ -53,36 +80,49 @@ public class LeaderElectionClient {
    * @param metaClientConfig The config used to create an metaclient.
    */
   public LeaderElectionClient(MetaClientConfig metaClientConfig, String 
participant) {
-
+    this._participant = participant;

Review Comment:
   nit: this is not required



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to