xyuanlu commented on code in PR #2515:
URL: https://github.com/apache/helix/pull/2515#discussion_r1232782943


##########
meta-client/src/main/java/org/apache/helix/metaclient/recipes/lock/DistributedSemaphore.java:
##########
@@ -22,26 +22,56 @@
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.datamodel.DataRecord;
+import org.apache.helix.metaclient.exception.MetaClientException;
 import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 public class DistributedSemaphore {
+  private final MetaClientInterface<DataRecord> _metaClient;
+  private String _path;
+  private static final String INITIAL_CAPACITY_NAME = "INITIAL_CAPACITY";
+  private static final String REMAINING_CAPACITY_NAME = "REMAINING_CAPACITY";
+  private static final long DEFAULT_REMAINING_CAPACITY = -1;
+  private static final Logger LOG = 
LoggerFactory.getLogger(DistributedSemaphore.class);
 
   /**
    * Create a distributed semaphore client with the given configuration.
    * @param config configuration of the client
    */
   public DistributedSemaphore(MetaClientConfig config) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (config == null) {
+      throw new MetaClientException("Configuration cannot be null");
+    }
+    if (MetaClientConfig.StoreType.ZOOKEEPER.equals(config.getStoreType())) {
+      ZkMetaClientConfig zkMetaClientConfig = new 
ZkMetaClientConfig.ZkMetaClientConfigBuilder()
+          .setConnectionAddress(config.getConnectionAddress())
+          .setZkSerializer(new DataRecordSerializer()) // Currently only 
support ZNRecordSerializer.
+          // Setting DataRecordSerializer as DataRecord extends ZNRecord.
+          .build();
+      _metaClient = new 
ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+      _metaClient.connect();
+    } else {
+      throw new MetaClientException("Unsupported store type: " + 
config.getStoreType());
+    }
   }
 
   /**
    * Connect to an existing distributed semaphore client.
    * @param client client to connect to
    */
   public DistributedSemaphore(MetaClientInterface<DataRecord> client) {
-    throw new NotImplementedException("Not implemented yet.");
+    if (client == null) {
+      throw new MetaClientException("Client cannot be null");
+    }
+    _metaClient = client;
+    _metaClient.connect();

Review Comment:
   What is the client is already connected?  Right now `connect()` in 
zkMetaClient is not re-entrant 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to