narendly commented on a change in pull request #727: Add 
MetadataStoreRoutingDataWriter with DistributedLeaderElection
URL: https://github.com/apache/helix/pull/727#discussion_r377378006
 
 

 ##########
 File path: 
helix-rest/src/main/java/org/apache/helix/rest/metadatastore/concurrency/ZkDistributedLeaderElection.java
 ##########
 @@ -0,0 +1,142 @@
+package org.apache.helix.rest.metadatastore.concurrency;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.zookeeper.IZkStateListener;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ZkDistributedLeaderElection implements IZkDataListener, 
IZkStateListener {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ZkDistributedLeaderElection.class);
+  private static final String PREFIX = "MSDS_SERVER_";
+
+  private final HelixZkClient _zkClient;
+  private final String _basePath;
+  private final ZNRecord _participantInfo;
+  private ZNRecord _currentLeaderInfo;
+
+  private String _myEphemeralSequentialPath;
+  private volatile boolean _isLeader;
+
+  public ZkDistributedLeaderElection(HelixZkClient zkClient, String basePath,
+      ZNRecord participantInfo) {
+    synchronized (this) {
+      if (zkClient == null || zkClient.isClosed()) {
+        throw new IllegalArgumentException("ZkClient cannot be null or 
closed!");
+      }
+      _zkClient = zkClient;
+      _zkClient.setZkSerializer(new ZNRecordSerializer());
+      if (basePath == null || basePath.isEmpty()) {
+        throw new IllegalArgumentException("lockBasePath cannot be null or 
empty!");
+      }
+      _basePath = basePath;
+      _participantInfo = participantInfo;
+      _isLeader = false;
+    }
+    init();
+  }
+
+  /**
+   * Create the base path if it doesn't exist and create an ephemeral 
sequential ZNode.
+   */
+  private void init() {
+    try {
+      _zkClient.createPersistent(_basePath, true);
+    } catch (ZkNodeExistsException e) {
+      // Okay if it exists already
+    }
+
+    // Create my ephemeral sequential node with my information
+    _myEphemeralSequentialPath = _zkClient
+        .create(_basePath + "/" + PREFIX, _participantInfo, 
CreateMode.EPHEMERAL_SEQUENTIAL);
+    if (_myEphemeralSequentialPath == null) {
+      throw new IllegalStateException(
+          "Unable to create ephemeral sequential node at path: " + _basePath);
+    }
+    tryAcquiringLeadership();
+  }
+
+  private void tryAcquiringLeadership() {
+    List<String> children = _zkClient.getChildren(_basePath);
+    Collections.sort(children);
+    String leaderName = children.get(0);
+    ZNRecord leaderInfo = _zkClient.readData(_basePath + "/" + leaderName, 
true);
+
+    String[] myNameArray = _myEphemeralSequentialPath.split("/");
+    String myName = myNameArray[myNameArray.length - 1];
+
+    if (leaderName.equals(myName)) {
+      // My turn for leadership
+      _isLeader = true;
+      _currentLeaderInfo = leaderInfo;
+      LOG.info("{} acquired leadership! Info: {}", myName, leaderInfo);
+    } else {
+      // Watch the ephemeral ZNode before me for a deletion event
+      String beforeMe = children.get(children.indexOf(myName) - 1);
+      _zkClient.subscribeDataChanges(_basePath + "/" + beforeMe, this);
+    }
+  }
+
+  public synchronized boolean isLeader() {
+    return _isLeader;
+  }
+
+  public synchronized ZNRecord getCurrentLeaderInfo() {
+    return _currentLeaderInfo;
+  }
+
+  @Override
+  public synchronized void handleStateChanged(Watcher.Event.KeeperState state) 
{
+    if (state == Watcher.Event.KeeperState.SyncConnected) {
+      init();
+    }
+  }
+
+  @Override
+  public void handleNewSession(String sessionId) {
+    return;
+  }
+
+  @Override
+  public void handleSessionEstablishmentError(Throwable error) {
+    return;
+  }
+
+  @Override
+  public void handleDataChange(String s, Object o) {
+    return;
+  }
+
+  @Override
+  public void handleDataDeleted(String s) {
 
 Review comment:
   I'm not sure what you're asking about exactly...
   
   >   private void fireDataChangedEvents(final String path, 
Set<IZkDataListenerEntry> listeners,
   >       final OptionalLong notificationTime) {
   >     try {
   >       final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord(path);
   >       // Trigger listener callbacks
   >       for (final IZkDataListenerEntry listener : listeners) {
   >         _eventThread.send(new ZkEvent("Data of " + path + " changed sent 
to "
   >             + listener.getDataListener() + " prefetch data: " + 
listener.isPrefetchData()) {
   >           @Override
   >           public void run() throws Exception {
   >             // Reinstall watch before listener callbacks to check the 
znode status
   >             if (!pathStatRecord.pathChecked()) {
   >               pathStatRecord.recordPathStat(getStat(path, true), 
notificationTime);
   >             }
   >             if (!pathStatRecord.pathExists()) {
   >               // no znode found at the path, trigger data deleted handler.
   >               listener.getDataListener().handleDataDeleted(path);
   >             } else {
   >               Object data = null;
   >               if (listener.isPrefetchData()) {
   >                 if (LOG.isDebugEnabled()) {
   >                   LOG.debug("Prefetch data for path: {}", path);
   >                 }
   >                 try {
   >                   data = readData(path, null, true);
   >                 } catch (ZkNoNodeException e) {
   >                   LOG.warn("Prefetch data for path: {} failed.", path, e);
   >                   listener.getDataListener().handleDataDeleted(path);
   >                   return;
   >                 }
   >               }
   >               listener.getDataListener().handleDataChange(path, data);
   >             }
   >           }
   >         });
   >       }
   
   Does this help?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@helix.apache.org
For additional commands, e-mail: reviews-h...@helix.apache.org

Reply via email to