abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1944
Change subject: Un-Singleton ClusterStateManager
......................................................................
Un-Singleton ClusterStateManager
Change-Id: Id6532245033ac4c6f6aa9f193539944eecb832f7
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
M
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
R
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
36 files changed, 350 insertions(+), 163 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/44/1944/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index 7f35d08..4550ba6 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -260,10 +260,10 @@
keyAccessExpression = null;
keyAccessScalarFunctionCallExpression = null;
}
- FeedDataSource feedDataSource = new FeedDataSource(sourceFeed, aqlId,
targetDataset, feedOutputType, metaType,
- pkTypes, partitioningKeys,
keyAccessScalarFunctionCallExpression, sourceFeed.getFeedId(),
- FeedRuntimeType.valueOf(subscriptionLocation),
locations.split(","), context.getComputationNodeDomain(),
- feedConnection);
+ FeedDataSource feedDataSource = new FeedDataSource((MetadataProvider)
context.getMetadataProvider(), sourceFeed,
+ aqlId, targetDataset, feedOutputType, metaType, pkTypes,
keyAccessScalarFunctionCallExpression,
+ sourceFeed.getFeedId(),
FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","),
+ context.getComputationNodeDomain(), feedConnection);
feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY,
feedPolicy);
return feedDataSource;
}
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
index 1d47095..c3f01e8 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
@@ -38,7 +38,6 @@
import org.apache.asterix.metadata.dataset.hints.DatasetHints;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.utils.MetadataConstants;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -53,7 +52,7 @@
public void validateOperation(ICcApplicationContext appCtx, Dataverse
defaultDataverse, Statement stmt)
throws AsterixException {
- final IClusterStateManager clusterStateManager =
ClusterStateManager.INSTANCE;
+ final IClusterStateManager clusterStateManager =
appCtx.getClusterStateManager();
final IGlobalRecoveryManager globalRecoveryManager =
appCtx.getGlobalRecoveryManager();
if (!(clusterStateManager.getState().equals(ClusterState.ACTIVE)
&& globalRecoveryManager.isRecoveryCompleted())) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 2799765..71c67f4 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -32,10 +32,10 @@
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.config.PropertiesAccessor;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.hyracks.bootstrap.CCApplication;
import org.apache.asterix.hyracks.bootstrap.NCApplication;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.application.ICCApplication;
import org.apache.hyracks.api.application.INCApplication;
@@ -116,7 +116,7 @@
thread.join();
}
// Wait until cluster becomes active
- ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE);
+ ((ICcApplicationContext)
cc.getApplicationContext()).getClusterStateManager().waitForState(ClusterState.ACTIVE);
hcc = new HyracksConnection(cc.getConfig().getClientListenAddress(),
cc.getConfig().getClientListenPort());
this.ncs = nodeControllers.toArray(new
NodeControllerService[nodeControllers.size()]);
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
index 82e8f7a..218a3ea 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
@@ -28,8 +28,8 @@
import java.util.logging.Logger;
import java.util.regex.Pattern;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.runtime.utils.CcApplicationContext;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.config.Section;
import org.apache.hyracks.control.common.config.ConfigUtils;
@@ -60,9 +60,11 @@
protected static final String VERSION_URI_KEY = "versionUri";
protected static final String DIAGNOSTICS_URI_KEY = "diagnosticsUri";
private final ObjectMapper om = new ObjectMapper();
+ protected final ICcApplicationContext appCtx;
- public ClusterApiServlet(ConcurrentMap<String, Object> ctx, String...
paths) {
+ public ClusterApiServlet(ICcApplicationContext appCtx,
ConcurrentMap<String, Object> ctx, String... paths) {
super(ctx, paths);
+ this.appCtx = appCtx;
}
@Override
@@ -94,11 +96,11 @@
}
protected ObjectNode getClusterStateSummaryJSON() {
- return ClusterStateManager.INSTANCE.getClusterStateSummary();
+ return appCtx.getClusterStateManager().getClusterStateSummary();
}
protected ObjectNode getClusterStateJSON(IServletRequest request, String
pathToNode) {
- ObjectNode json =
ClusterStateManager.INSTANCE.getClusterStateDescription();
+ ObjectNode json =
appCtx.getClusterStateManager().getClusterStateDescription();
CcApplicationContext appConfig = (CcApplicationContext)
ctx.get(ASTERIX_APP_CONTEXT_INFO_ATTR);
json.putPOJO("config",
ConfigUtils.getSectionOptionsForJSON(appConfig.getServiceContext().getAppConfig(),
Section.COMMON, getConfigSelector()));
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
index 52d4d67..848e1f1 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
@@ -26,6 +26,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
@@ -41,8 +42,9 @@
private static final Logger LOGGER =
Logger.getLogger(ClusterControllerDetailsApiServlet.class.getName());
private final ObjectMapper om = new ObjectMapper();
- public ClusterControllerDetailsApiServlet(ConcurrentMap<String, Object>
ctx, String... paths) {
- super(ctx, paths);
+ public ClusterControllerDetailsApiServlet(ICcApplicationContext appCtx,
ConcurrentMap<String, Object> ctx,
+ String... paths) {
+ super(appCtx, ctx, paths);
}
@Override
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
index dcd0e70..6e62c95 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
@@ -49,11 +49,9 @@
public class DiagnosticsApiServlet extends NodeControllerDetailsApiServlet {
private static final Logger LOGGER =
Logger.getLogger(DiagnosticsApiServlet.class.getName());
- private final ICcApplicationContext appCtx;
public DiagnosticsApiServlet(ConcurrentMap<String, Object> ctx, String[]
paths, ICcApplicationContext appCtx) {
- super(ctx, paths);
- this.appCtx = appCtx;
+ super(appCtx, ctx, paths);
}
@Override
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
index d9757c7..d08204c 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
@@ -29,7 +29,9 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
+import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
@@ -48,8 +50,9 @@
private static final Logger LOGGER =
Logger.getLogger(NodeControllerDetailsApiServlet.class.getName());
private final ObjectMapper om = new ObjectMapper();
- public NodeControllerDetailsApiServlet(ConcurrentMap<String, Object> ctx,
String... paths) {
- super(ctx, paths);
+ public NodeControllerDetailsApiServlet(ICcApplicationContext appCtx,
ConcurrentMap<String, Object> ctx,
+ String... paths) {
+ super(appCtx, ctx, paths);
om.enable(SerializationFeature.INDENT_OUTPUT);
}
@@ -204,8 +207,9 @@
String dump = hcc.getThreadDump(node);
if (dump == null) {
// check to see if this is a node that is simply down
- throw ClusterStateManager.INSTANCE.getNodePartitions(node) != null
? new IllegalStateException()
- : new IllegalArgumentException();
+ IClusterStateManager csm = appCtx.getClusterStateManager();
+ ClusterPartition[] cp = csm.getNodePartitions(node);
+ throw cp != null ? new IllegalStateException() : new
IllegalArgumentException();
}
return (ObjectNode) om.readTree(dump);
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 1cec616..08f520a 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -39,7 +39,6 @@
import org.apache.asterix.lang.common.base.IParser;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
@@ -62,6 +61,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+
import io.netty.handler.codec.http.HttpResponseStatus;
public class QueryServiceServlet extends AbstractQueryApiServlet {
@@ -445,7 +445,7 @@
protected void executeStatement(String statementsText, SessionOutput
sessionOutput, ResultDelivery delivery,
IStatementExecutor.Stats stats, RequestParameters param, String
handleUrl, long[] outExecStartEnd)
throws Exception {
- IClusterManagementWork.ClusterState clusterState =
ClusterStateManager.INSTANCE.getState();
+ IClusterManagementWork.ClusterState clusterState =
appCtx.getClusterStateManager().getState();
if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
// using a plain IllegalStateException here to get into the right
catch clause for a 500
throw new IllegalStateException("Cannot execute request, cluster
is " + clusterState);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
index fdd106d..f940145 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
@@ -27,8 +27,9 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.GlobalConfig;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
@@ -48,14 +49,17 @@
public static final String NCSERVICE_PID = "ncservice_pid";
public static final String INI = "ini";
public static final String PID = "pid";
+ private final IApplicationContext appCtx;
- public ShutdownApiServlet(ConcurrentMap<String, Object> ctx, String[]
paths) {
+ public ShutdownApiServlet(IApplicationContext appCtx,
ConcurrentMap<String, Object> ctx, String[] paths) {
super(ctx, paths);
+ this.appCtx = appCtx;
}
@Override
protected void post(IServletRequest request, IServletResponse response) {
IHyracksClientConnection hcc = (IHyracksClientConnection)
ctx.get(HYRACKS_CONNECTION_ATTR);
+ IClusterStateManager csm = appCtx.getClusterStateManager();
boolean terminateNCServices =
"true".equalsIgnoreCase(request.getParameter("all"));
Thread t = new Thread(() -> {
try {
@@ -78,7 +82,7 @@
try {
jsonObject.put("status", "SHUTTING_DOWN");
jsonObject.put("date", new Date().toString());
- ObjectNode clusterState =
ClusterStateManager.INSTANCE.getClusterStateDescription();
+ ObjectNode clusterState = csm.getClusterStateDescription();
ArrayNode ncs = (ArrayNode) clusterState.get("ncs");
for (int i = 0; i < ncs.size(); i++) {
ObjectNode nc = (ObjectNode) ncs.get(i);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index e7919fa..90b9f46 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -28,7 +28,9 @@
import org.apache.asterix.algebra.base.ILangExtension;
import org.apache.asterix.api.http.server.ResultUtil;
import org.apache.asterix.app.cc.CCExtensionManager;
+import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -40,7 +42,6 @@
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.messaging.CCMessageBroker;
import org.apache.asterix.metadata.MetadataManager;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.IStatementExecutorFactory;
@@ -149,7 +150,9 @@
if (ccSrv.getNodeManager().getNodeControllerState(requestNodeId) ==
null) {
return "Node is not registerted with the CC";
}
- final IClusterManagementWork.ClusterState clusterState =
ClusterStateManager.INSTANCE.getState();
+ IApplicationContext appCtx = (IApplicationContext)
ccSrv.getApplicationContext();
+ IClusterStateManager csm = appCtx.getClusterStateManager();
+ final IClusterManagementWork.ClusterState clusterState =
csm.getState();
if (clusterState != IClusterManagementWork.ClusterState.ACTIVE) {
return "Cannot execute request, cluster is " + clusterState;
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 7647881..d52ce84 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -34,6 +34,7 @@
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.ActiveProperties;
import org.apache.asterix.common.config.AsterixExtension;
import org.apache.asterix.common.config.BuildProperties;
@@ -73,6 +74,7 @@
import org.apache.asterix.replication.recovery.RemoteRecoveryManager;
import org.apache.asterix.replication.storage.ReplicaResourcesManager;
import org.apache.asterix.runtime.transaction.GlobalResourceIdFactoryProvider;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
import
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepositoryFactory;
import org.apache.hyracks.api.application.INCServiceContext;
@@ -136,6 +138,8 @@
private final NCExtensionManager ncExtensionManager;
private final IStorageComponentProvider componentProvider;
+ private final ClusterStateManager clusterStateManager;
+
public NCAppRuntimeContext(INCServiceContext ncServiceContext,
List<AsterixExtension> extensions)
throws AsterixException, InstantiationException,
IllegalAccessException, ClassNotFoundException,
IOException {
@@ -160,6 +164,7 @@
ncExtensionManager = new NCExtensionManager(allExtensions);
componentProvider = new StorageComponentProvider();
resourceIdFactory = new
GlobalResourceIdFactoryProvider(ncServiceContext).createResourceIdFactory();
+ clusterStateManager = new ClusterStateManager();
}
@Override
@@ -482,4 +487,9 @@
public INCServiceContext getServiceContext() {
return ncServiceContext;
}
+
+ @Override
+ public IClusterStateManager getClusterStateManager() {
+ return clusterStateManager;
+ }
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b97c014..e966d3d 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -56,6 +56,7 @@
import org.apache.asterix.app.result.ResultHandle;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
@@ -152,7 +153,6 @@
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.types.TypeSignature;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import
org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
import org.apache.asterix.translator.AbstractLangTranslator;
import
org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
@@ -706,7 +706,8 @@
protected static String configureNodegroupForDataset(ICcApplicationContext
appCtx, Map<String, String> hints,
String dataverseName, String datasetName, MetadataProvider
metadataProvider) throws Exception {
- Set<String> allNodes =
ClusterStateManager.INSTANCE.getParticipantNodes(true);
+ IClusterStateManager csm = appCtx.getClusterStateManager();
+ Set<String> allNodes = csm.getParticipantNodes(true);
Set<String> selectedNodes = new LinkedHashSet<>();
String hintValue = hints.get(DatasetNodegroupCardinalityHint.NAME);
if (hintValue == null) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index e8636c8..c26156d 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -53,7 +53,6 @@
import org.apache.asterix.api.http.servlet.ServletConstants;
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.cc.CCExtensionManager;
-import org.apache.asterix.app.cc.ResourceIdManager;
import org.apache.asterix.app.external.ExternalLibraryUtils;
import org.apache.asterix.app.replication.FaultToleranceStrategyFactory;
import org.apache.asterix.common.api.AsterixThreadFactory;
@@ -78,7 +77,6 @@
import org.apache.asterix.metadata.lock.MetadataLockManager;
import org.apache.asterix.runtime.job.resource.JobCapacityController;
import org.apache.asterix.runtime.utils.CcApplicationContext;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.IStatementExecutorFactory;
import org.apache.hyracks.api.application.ICCServiceContext;
@@ -125,7 +123,7 @@
ccServiceCtx.setThreadFactory(
new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new
LifeCycleComponentManager()));
ILibraryManager libraryManager = new ExternalLibraryManager();
- ResourceIdManager resourceIdManager = new ResourceIdManager();
+
IReplicationStrategy repStrategy =
ClusterProperties.INSTANCE.getReplicationStrategy();
IFaultToleranceStrategy ftStrategy = FaultToleranceStrategyFactory
.create(ClusterProperties.INSTANCE.getCluster(), repStrategy,
ccServiceCtx);
@@ -133,10 +131,9 @@
componentProvider = new StorageComponentProvider();
GlobalRecoveryManager globalRecoveryManager =
createGlobalRecoveryManager();
statementExecutorCtx = new StatementExecutorContext();
- appCtx = new CcApplicationContext(ccServiceCtx, getHcc(),
libraryManager, resourceIdManager,
- () -> MetadataManager.INSTANCE, globalRecoveryManager,
ftStrategy, new ActiveNotificationHandler(),
- componentProvider, new MetadataLockManager());
- ClusterStateManager.INSTANCE.setCcAppCtx(appCtx);
+ appCtx = new CcApplicationContext(ccServiceCtx, getHcc(),
libraryManager, () -> MetadataManager.INSTANCE,
+ globalRecoveryManager, ftStrategy, new
ActiveNotificationHandler(), componentProvider,
+ new MetadataLockManager());
ccExtensionManager = new CCExtensionManager(getExtensions());
appCtx.setExtensionManager(ccExtensionManager);
final CCConfig ccConfig = controllerService.getCCConfig();
@@ -302,15 +299,15 @@
case Servlets.REBALANCE:
return new RebalanceApiServlet(ctx, paths, appCtx);
case Servlets.SHUTDOWN:
- return new ShutdownApiServlet(ctx, paths);
+ return new ShutdownApiServlet(appCtx, ctx, paths);
case Servlets.VERSION:
return new VersionApiServlet(ctx, paths);
case Servlets.CLUSTER_STATE:
- return new ClusterApiServlet(ctx, paths);
+ return new ClusterApiServlet(appCtx, ctx, paths);
case Servlets.CLUSTER_STATE_NODE_DETAIL:
- return new NodeControllerDetailsApiServlet(ctx, paths);
+ return new NodeControllerDetailsApiServlet(appCtx, ctx, paths);
case Servlets.CLUSTER_STATE_CC_DETAIL:
- return new ClusterControllerDetailsApiServlet(ctx, paths);
+ return new ClusterControllerDetailsApiServlet(appCtx, ctx,
paths);
case Servlets.DIAGNOSTICS:
return new DiagnosticsApiServlet(ctx, paths, appCtx);
case Servlets.ACTIVE_STATS:
@@ -331,7 +328,7 @@
@Override
public void startupCompleted() throws Exception {
ccServiceCtx.getControllerService().getExecutor().submit(() -> {
- ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE);
+ appCtx.getClusterStateManager().waitForState(ClusterState.ACTIVE);
ClusterManagerProvider.getClusterManager().notifyStartupCompleted();
return null;
});
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 66f76c5..0583508 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -32,6 +32,7 @@
import org.apache.asterix.common.api.IClusterManagementWork;
import org.apache.asterix.common.api.IClusterManagementWorkResponse;
import org.apache.asterix.common.api.IClusterManagementWorkResponse.Status;
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.event.schema.cluster.Node;
@@ -42,7 +43,6 @@
import org.apache.asterix.metadata.cluster.RemoveNodeWork;
import org.apache.asterix.metadata.cluster.RemoveNodeWorkResponse;
import org.apache.asterix.runtime.utils.CcApplicationContext;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.hyracks.api.application.IClusterLifecycleListener;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.exceptions.HyracksException;
@@ -70,10 +70,11 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("NC: " + nodeId + " joined");
}
- ClusterStateManager.INSTANCE.addNCConfiguration(nodeId,
ncConfiguration);
+ IClusterStateManager csm = appCtx.getClusterStateManager();
+ csm.addNCConfiguration(nodeId, ncConfiguration);
//if metadata node rejoining, we need to rebind the proxy connection
when it is active again.
- if (!ClusterStateManager.INSTANCE.isMetadataNodeActive()) {
+ if (!csm.isMetadataNodeActive()) {
MetadataManager.INSTANCE.rebindMetadataNode();
}
@@ -99,10 +100,11 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("NC: " + deadNode + " left");
}
- ClusterStateManager.INSTANCE.removeNCConfiguration(deadNode);
+ IClusterStateManager csm = appCtx.getClusterStateManager();
+ csm.removeNCConfiguration(deadNode);
//if metadata node failed, we need to rebind the proxy connection
when it is active again
- if (!ClusterStateManager.INSTANCE.isMetadataNodeActive()) {
+ if (!csm.isMetadataNodeActive()) {
MetadataManager.INSTANCE.rebindMetadataNode();
}
}
@@ -171,8 +173,9 @@
List<String> addedNodes = new ArrayList<>();
String asterixInstanceName =
ClusterProperties.INSTANCE.getCluster().getInstanceName();
+ IClusterStateManager csm = appCtx.getClusterStateManager();
for (int i = 0; i < nodesToAdd; i++) {
- Node node =
ClusterStateManager.INSTANCE.getAvailableSubstitutionNode();
+ Node node = csm.getAvailableSubstitutionNode();
if (node != null) {
try {
ClusterManagerProvider.getClusterManager().addNode(appCtx,
node);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
index 46968b4..2977a58 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
@@ -25,13 +25,13 @@
import java.util.logging.Logger;
import org.apache.asterix.common.api.IClusterManagementWork;
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.asterix.metadata.cluster.AddNodeWork;
import org.apache.asterix.metadata.cluster.ClusterManagerProvider;
import org.apache.asterix.metadata.cluster.RemoveNodeWork;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
public class ClusterWorkExecutor implements Runnable {
@@ -69,9 +69,10 @@
}
}
+ IClusterStateManager csm = appCtx.getClusterStateManager();
Set<Node> addedNodes = new HashSet<>();
for (int i = 0; i < nodesToAdd; i++) {
- Node node =
ClusterStateManager.INSTANCE.getAvailableSubstitutionNode();
+ Node node = csm.getAvailableSubstitutionNode();
if (node != null) {
try {
ClusterManagerProvider.getClusterManager().addNode(appCtx, node);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 9fc9940..8dc1b3e 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -33,6 +33,7 @@
import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.active.message.ActiveManagerMessage.Kind;
import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import
org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
@@ -66,7 +67,6 @@
import org.apache.asterix.metadata.feeds.LocationConstraint;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
import
org.apache.asterix.runtime.job.listener.MultiTransactionJobletEventListenerFactory;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.asterix.runtime.utils.RuntimeUtils;
import org.apache.asterix.translator.CompiledStatements;
import org.apache.asterix.translator.IStatementExecutor;
@@ -134,8 +134,10 @@
public static JobSpecification buildRemoveFeedStorageJob(MetadataProvider
metadataProvider, Feed feed)
throws AsterixException {
- JobSpecification spec =
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
- AlgebricksAbsolutePartitionConstraint allCluster =
ClusterStateManager.INSTANCE.getClusterLocations();
+ ICcApplicationContext appCtx =
metadataProvider.getApplicationContext();
+ JobSpecification spec = RuntimeUtils.createJobSpecification(appCtx);
+ IClusterStateManager csm = appCtx.getClusterStateManager();
+ AlgebricksAbsolutePartitionConstraint allCluster =
csm.getClusterLocations();
Set<String> nodes = new TreeSet<>();
for (String node : allCluster.getLocations()) {
nodes.add(node);
@@ -143,7 +145,7 @@
AlgebricksAbsolutePartitionConstraint locations =
new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new
String[nodes.size()]));
FileSplit[] feedLogFileSplits =
- FeedUtils.splitsForAdapter(feed.getDataverseName(),
feed.getFeedName(), locations);
+ FeedUtils.splitsForAdapter(appCtx, feed.getDataverseName(),
feed.getFeedName(), locations);
org.apache.hyracks.algebricks.common.utils.Pair<IFileSplitProvider,
AlgebricksPartitionConstraint> spC =
StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
FileRemoveOperatorDescriptor frod = new
FileRemoveOperatorDescriptor(spec, spC.first, true);
@@ -273,9 +275,8 @@
}
// make connections between operators
- for (Entry<ConnectorDescriptorId,
- Pair<Pair<IOperatorDescriptor, Integer>,
Pair<IOperatorDescriptor, Integer>>> entry
- : subJob.getConnectorOperatorMap().entrySet()) {
+ for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor,
Integer>, Pair<IOperatorDescriptor, Integer>>> entry : subJob
+ .getConnectorOperatorMap().entrySet()) {
ConnectorDescriptorId newId =
connectorIdMapping.get(entry.getKey());
IConnectorDescriptor connDesc =
jobSpec.getConnectorMap().get(newId);
Pair<IOperatorDescriptor, Integer> leftOp =
entry.getValue().getLeft();
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index c1421c5..53a4f23 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -405,7 +405,10 @@
IndexType.BTREE, keyFieldNames, keyFieldSourceIndicators,
keyFieldTypes, false, false, true,
MetadataUtil.PENDING_NO_OP);
List<String> nodes =
Collections.singletonList(ExecutionTestUtil.integrationUtil.ncs[0].getId());
- FileSplit[] splits =
SplitsAndConstraintsUtil.getIndexSplits(dataset, index.getIndexName(), nodes);
+ FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(
+ ((ICcApplicationContext)
ExecutionTestUtil.integrationUtil.cc.getApplicationContext())
+ .getClusterStateManager(),
+ dataset, index.getIndexName(), nodes);
fileSplitProvider = new
ConstantFileSplitProvider(Arrays.copyOfRange(splits, 0, 1));
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
index 0aea84d..90dab96 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.common.api;
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.ActiveProperties;
import org.apache.asterix.common.config.BuildProperties;
import org.apache.asterix.common.config.CompilerProperties;
@@ -60,4 +61,9 @@
IServiceContext getServiceContext();
+ /**
+ * @return the cluster state manager
+ */
+ IClusterStateManager getClusterStateManager();
+
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 30675cd..b368c3b 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -19,12 +19,19 @@
package org.apache.asterix.common.cluster;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.event.schema.cluster.Node;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.config.IOption;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
public interface IClusterStateManager {
@@ -49,6 +56,7 @@
/**
* Updates all partitions of {@code nodeId} based on the {@code active}
flag.
+ *
* @param nodeId
* @param active
* @throws HyracksDataException
@@ -93,6 +101,7 @@
/**
* Blocks until the cluster state becomes {@code state}, or timeout is
exhausted.
+ *
* @return true if the desired state was reached before timeout occurred
*/
boolean waitForState(ClusterState waitForState, long timeout, TimeUnit
unit)
@@ -116,4 +125,106 @@
* @throws HyracksDataException
*/
void deregisterNodePartitions(String nodeId) throws HyracksDataException;
+
+ /**
+ * @return true if cluster is active, false otherwise
+ */
+ boolean isClusterActive();
+
+ /**
+ * @return the set of participant nodes
+ */
+ Set<String> getParticipantNodes();
+
+ /**
+ * Returns the IO devices configured for a Node Controller
+ *
+ * @param nodeId
+ * unique identifier of the Node Controller
+ * @return a list of IO devices.
+ */
+ String[] getIODevices(String nodeId);
+
+ /**
+ * @return the constraint representing all the partitions of the cluster
+ */
+ AlgebricksAbsolutePartitionConstraint getClusterLocations();
+
+ /**
+ * @param excludePendingRemoval
+ * true, if the desired set shouldn't have pending removal nodes
+ * @return the set of participant nodes
+ */
+ Set<String> getParticipantNodes(boolean excludePendingRemoval);
+
+ /**
+ * @param node
+ * the node id
+ * @return the number of partitions on that node
+ */
+ int getNodePartitionsCount(String node);
+
+ /**
+ * @return a json object representing the cluster state summary
+ */
+ ObjectNode getClusterStateSummary();
+
+ /**
+ * @return a json object representing the cluster state description
+ */
+ ObjectNode getClusterStateDescription();
+
+ /**
+ * Set the cc application context
+ *
+ * @param appCtx
+ */
+ void setCcAppCtx(ICcApplicationContext appCtx);
+
+ /**
+ * @return the number of cluster nodes
+ */
+ int getNumberOfNodes();
+
+ /**
+ * Add node configuration
+ *
+ * @param nodeId
+ * @param ncConfiguration
+ * @throws HyracksException
+ */
+ void addNCConfiguration(String nodeId, Map<IOption, Object>
ncConfiguration) throws HyracksException;
+
+ /**
+ * @return true if metadata node is active, false otherwise
+ */
+ boolean isMetadataNodeActive();
+
+ /**
+ * Remove configuration of a dead node
+ *
+ * @param deadNode
+ * @throws HyracksException
+ */
+ void removeNCConfiguration(String deadNode) throws HyracksException;
+
+ /**
+ * @return a substitution node or null
+ */
+ Node getAvailableSubstitutionNode();
+
+ /**
+ * Add node to the list of nodes pending removal
+ *
+ * @param nodeId
+ */
+ void removePending(String nodeId);
+
+ /**
+ * Deregister intention to remove node id
+ *
+ * @param nodeId
+ * @return
+ */
+ boolean cancelRemovePending(String nodeId);
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
index 0abb92f..b258a17 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ClusterProperties.java
@@ -43,6 +43,8 @@
private Cluster cluster;
private ClusterProperties() {
+ Exception creation = new Exception("ClusterProperties is getting
created");
+ creation.printStackTrace();
InputStream is =
this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
if (is != null) {
try {
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 3eff214..64de250 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -20,9 +20,9 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.IMetadataLockManager;
-import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.cluster.IGlobalRecoveryManager;
import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.metadata.IMetadataBootstrap;
import org.apache.asterix.common.replication.IFaultToleranceStrategy;
import org.apache.asterix.common.transactions.IResourceIdManager;
import org.apache.hyracks.api.application.ICCServiceContext;
@@ -99,7 +99,7 @@
IMetadataLockManager getMetadataLockManager();
/**
- * @return the cluster state manager
+ * @return the metadata bootstrap
*/
- IClusterStateManager getClusterStateManager();
+ IMetadataBootstrap getMetadataBootstrap();
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 2eb81d4..0a47788 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -25,6 +25,7 @@
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.library.ILibraryManager;
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.api.IDataFlowController;
@@ -92,8 +93,7 @@
public synchronized IDataSourceAdapter createAdapter(IHyracksTaskContext
ctx, int partition)
throws HyracksDataException {
INCServiceContext serviceCtx =
ctx.getJobletContext().getServiceContext();
- INcApplicationContext appCtx =
- (INcApplicationContext) serviceCtx.getApplicationContext();
+ INcApplicationContext appCtx = (INcApplicationContext)
serviceCtx.getApplicationContext();
try {
restoreExternalObjects(serviceCtx, appCtx.getLibraryManager());
} catch (Exception e) {
@@ -152,15 +152,16 @@
dataParserFactory.setMetaType(metaType);
dataParserFactory.configure(configuration);
ExternalDataCompatibilityUtils.validateCompatibility(dataSourceFactory,
dataParserFactory);
- configureFeedLogManager();
+ configureFeedLogManager(appCtx);
nullifyExternalObjects();
}
- private void configureFeedLogManager() throws HyracksDataException,
AlgebricksException {
+ private void configureFeedLogManager(IApplicationContext appCtx) throws
HyracksDataException, AlgebricksException {
this.isFeed = ExternalDataUtils.isFeed(configuration);
if (isFeed) {
- feedLogFileSplits =
FeedUtils.splitsForAdapter(ExternalDataUtils.getDataverse(configuration),
- ExternalDataUtils.getFeedName(configuration),
dataSourceFactory.getPartitionConstraint());
+ feedLogFileSplits =
FeedUtils.splitsForAdapter((ICcApplicationContext) appCtx,
+ ExternalDataUtils.getDataverse(configuration),
ExternalDataUtils.getFeedName(configuration),
+ dataSourceFactory.getPartitionConstraint());
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index edda448..7cfbf51 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -25,8 +25,9 @@
import java.util.Set;
import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
@@ -87,6 +88,7 @@
public static AlgebricksAbsolutePartitionConstraint
getPartitionConstraints(IApplicationContext appCtx,
AlgebricksAbsolutePartitionConstraint constraints, int count)
throws AlgebricksException {
if (constraints == null) {
+ IClusterStateManager clusterStateManager =
((ICcApplicationContext) appCtx).getClusterStateManager();
ArrayList<String> locs = new ArrayList<>();
Set<String> stores =
appCtx.getMetadataProperties().getStores().keySet();
if (stores.isEmpty()) {
@@ -97,7 +99,7 @@
Iterator<String> storeIt = stores.iterator();
while (storeIt.hasNext()) {
String node = storeIt.next();
- int numIODevices =
ClusterStateManager.INSTANCE.getIODevices(node).length;
+ int numIODevices =
clusterStateManager.getIODevices(node).length;
for (int k = 0; k < numIODevices; k++) {
locs.add(node);
i++;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
index 12be449..c7b8633 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/TwitterFirehoseStreamFactory.java
@@ -23,10 +23,10 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IInputStreamFactory;
import org.apache.asterix.external.input.stream.TwitterFirehoseInputStream;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -54,6 +54,7 @@
private static final String KEY_INGESTION_LOCATIONS = "ingestion-location";
private Map<String, String> configuration;
+ private transient IServiceContext serviceCtx;
@Override
public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
@@ -67,10 +68,10 @@
if (ingestionCardinalityParam != null) {
count = Integer.parseInt(ingestionCardinalityParam);
}
-
+ ICcApplicationContext appCtx = (ICcApplicationContext)
serviceCtx.getApplicationContext();
List<String> chosenLocations = new ArrayList<>();
String[] availableLocations = locations != null ? locations
- :
ClusterStateManager.INSTANCE.getParticipantNodes().toArray(new String[] {});
+ :
appCtx.getClusterStateManager().getParticipantNodes().toArray(new String[] {});
for (int i = 0, k = 0; i < count; i++, k = (k + 1) %
availableLocations.length) {
chosenLocations.add(availableLocations[k]);
}
@@ -84,6 +85,7 @@
@Override
public void configure(IServiceContext serviceCtx, Map<String, String>
configuration) {
+ this.serviceCtx = serviceCtx;
this.configuration = configuration;
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index ec7de91..dad0d51 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -26,9 +26,9 @@
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
@@ -63,9 +63,9 @@
}
public enum Mode {
- PROCESS, // There is memory
- SPILL, // Memory budget has been consumed. Now we're
writing to disk
- DISCARD // Memory and Disk space budgets have been
consumed. Now we're discarding
+ PROCESS, // There is memory
+ SPILL, // Memory budget has been consumed. Now we're writing to disk
+ DISCARD // Memory and Disk space budgets have been consumed. Now we're
discarding
}
private FeedUtils() {
@@ -87,7 +87,7 @@
return StoragePathUtil.getFileSplitForClusterPartition(partition,
f.getPath());
}
- public static FileSplit[] splitsForAdapter(String dataverseName, String
feedName,
+ public static FileSplit[] splitsForAdapter(ICcApplicationContext appCtx,
String dataverseName, String feedName,
AlgebricksPartitionConstraint partitionConstraints) throws
AsterixException {
if (partitionConstraints.getPartitionConstraintType() ==
PartitionConstraintType.COUNT) {
throw new AsterixException("Can't create file splits for adapter
with count partitioning constraints");
@@ -96,7 +96,7 @@
List<FileSplit> splits = new ArrayList<>();
for (String nd : locations) {
splits.add(splitsForAdapter(dataverseName, feedName, nd,
- ClusterStateManager.INSTANCE.getNodePartitions(nd)[0]));
+ appCtx.getClusterStateManager().getNodePartitions(nd)[0]));
}
return splits.toArray(new FileSplit[] {});
}
@@ -113,8 +113,8 @@
public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx,
FileSplit feedLogFileSplit)
throws HyracksDataException {
- return new
FeedLogManager(FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getPath(),
- 0, ctx.getIoManager()).getFile());
+ return new FeedLogManager(
+ FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getPath(), 0,
ctx.getIoManager()).getFile());
}
public static void processFeedMessage(ByteBuffer input, VSizeFrame
message, FrameTupleAccessor fta)
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index b4353e7..bd50352 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -25,7 +25,9 @@
import java.util.Map;
import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.external.indexing.ExternalFile;
@@ -33,7 +35,6 @@
import org.apache.asterix.external.indexing.RecordId.RecordIdType;
import org.apache.asterix.external.input.stream.HDFSInputStream;
import org.apache.asterix.hivecompat.io.RCFileInputFormat;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -207,10 +208,12 @@
public static AlgebricksAbsolutePartitionConstraint
getPartitionConstraints(IApplicationContext appCtx,
AlgebricksAbsolutePartitionConstraint clusterLocations) {
if (clusterLocations == null) {
+ IClusterStateManager clusterStateManager =
((ICcApplicationContext) appCtx).getClusterStateManager();
ArrayList<String> locs = new ArrayList<>();
Map<String, String[]> stores =
appCtx.getMetadataProperties().getStores();
for (String node : stores.keySet()) {
- int numIODevices =
ClusterStateManager.INSTANCE.getIODevices(node).length;
+
+ int numIODevices =
clusterStateManager.getIODevices(node).length;
for (int k = 0; k < numIODevices; k++) {
locs.add(node);
}
diff --git
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
index c45941d..c09b9eb 100644
---
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/input/record/reader/kv/KVTestReaderFactory.java
@@ -23,9 +23,9 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -43,17 +43,23 @@
private int upsertCycle = 0;
private int numOfReaders;
private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
+ private transient IServiceContext serviceCtx;
private static final List<String> recordReaderNames =
Collections.unmodifiableList(Arrays.asList());
@Override
public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
- clusterLocations = ClusterStateManager.INSTANCE.getClusterLocations();
- numOfReaders = clusterLocations.getLocations().length;
+ if (clusterLocations == null) {
+ ICcApplicationContext appCtx = (ICcApplicationContext)
serviceCtx.getApplicationContext();
+ clusterLocations =
appCtx.getClusterStateManager().getClusterLocations();
+ numOfReaders = clusterLocations.getLocations().length;
+ }
return clusterLocations;
+
}
@Override
public void configure(IServiceContext serviceCtx, final Map<String,
String> configuration) {
+ this.serviceCtx = serviceCtx;
if (configuration.containsKey("num-of-records")) {
numOfRecords =
Integer.parseInt(configuration.get("num-of-records"));
}
@@ -83,7 +89,8 @@
return DCPRequest.class;
}
- @Override public List<String> getRecordReaderNames() {
+ @Override
+ public List<String> getRecordReaderNames() {
return recordReaderNames;
}
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
index 709f655..0f97194 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/dataset/hints/DatasetHints.java
@@ -21,8 +21,8 @@
import java.util.HashSet;
import java.util.Set;
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.hyracks.algebricks.common.utils.Pair;
/**
@@ -113,7 +113,8 @@
if (intValue < 0) {
return new Pair<>(false, "Value must be >= 0");
}
- int numNodesInCluster =
ClusterStateManager.INSTANCE.getParticipantNodes(true).size();
+ IClusterStateManager csm = appCtx.getClusterStateManager();
+ int numNodesInCluster = csm.getParticipantNodes(true).size();
if (numNodesInCluster < intValue) {
return new Pair<>(false,
"Value must be less than or equal to the available
number of nodes in cluster ("
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
index 26cec1e..0b6608c 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
@@ -22,7 +22,7 @@
import java.util.List;
import org.apache.asterix.active.EntityId;
-import org.apache.asterix.external.feed.api.IFeed;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
@@ -34,7 +34,6 @@
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -63,12 +62,12 @@
private final List<ScalarFunctionCallExpression> keyAccessExpression;
private final FeedConnection feedConnection;
- public FeedDataSource(Feed feed, DataSourceId id, String targetDataset,
IAType itemType, IAType metaType,
- List<IAType> pkTypes, List<List<String>> partitioningKeys,
- List<ScalarFunctionCallExpression> keyAccessExpression, EntityId
sourceFeedId,
- FeedRuntimeType location, String[] locations, INodeDomain domain,
FeedConnection feedConnection)
- throws AlgebricksException {
+ public FeedDataSource(MetadataProvider metadataProvider, Feed feed,
DataSourceId id, String targetDataset,
+ IAType itemType, IAType metaType, List<IAType> pkTypes,
+ List<ScalarFunctionCallExpression> keyAccessExpression, EntityId
sourceFeedId, FeedRuntimeType location,
+ String[] locations, INodeDomain domain, FeedConnection
feedConnection) throws AlgebricksException {
super(id, itemType, metaType, Type.FEED, domain);
+ ICcApplicationContext appCtx =
metadataProvider.getApplicationContext();
this.feed = feed;
this.targetDataset = targetDataset;
this.sourceFeedId = sourceFeedId;
@@ -76,7 +75,7 @@
this.locations = locations;
this.pkTypes = pkTypes;
this.keyAccessExpression = keyAccessExpression;
- this.computeCardinality =
ClusterStateManager.INSTANCE.getParticipantNodes().size();
+ this.computeCardinality =
appCtx.getClusterStateManager().getParticipantNodes().size();
this.feedConnection = feedConnection;
initFeedDataSource();
}
@@ -170,8 +169,8 @@
throws AlgebricksException {
try {
ARecordType feedOutputType = (ARecordType) itemType;
- ISerializerDeserializer payloadSerde =
NonTaggedDataFormat.INSTANCE.getSerdeProvider()
- .getSerializerDeserializer(feedOutputType);
+ ISerializerDeserializer payloadSerde =
+
NonTaggedDataFormat.INSTANCE.getSerdeProvider().getSerializerDeserializer(feedOutputType);
ArrayList<ISerializerDeserializer> serdes = new ArrayList<>();
serdes.add(payloadSerde);
if (metaItemType != null) {
@@ -182,16 +181,16 @@
serdes.add(SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(type));
}
}
- RecordDescriptor feedDesc = new RecordDescriptor(
- serdes.toArray(new
ISerializerDeserializer[serdes.size()]));
- FeedPolicyEntity feedPolicy = (FeedPolicyEntity) getProperties()
- .get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
+ RecordDescriptor feedDesc =
+ new RecordDescriptor(serdes.toArray(new
ISerializerDeserializer[serdes.size()]));
+ FeedPolicyEntity feedPolicy =
+ (FeedPolicyEntity)
getProperties().get(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY);
if (feedPolicy == null) {
throw new AlgebricksException("Feed not configured with a
policy");
}
feedPolicy.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY,
feedPolicy.getPolicyName());
- FeedConnectionId feedConnectionId = new
FeedConnectionId(getId().getDataverseName(),
- getId().getDatasourceName(), getTargetDataset());
+ FeedConnectionId feedConnectionId =
+ new FeedConnectionId(getId().getDataverseName(),
getId().getDatasourceName(), getTargetDataset());
FeedCollectOperatorDescriptor feedCollector = new
FeedCollectOperatorDescriptor(jobSpec, feedConnectionId,
feedOutputType, feedDesc, feedPolicy.getProperties(),
getLocation());
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
index c23755d..97c6ed2 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataManagerUtil.java
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.metadata.MetadataManager;
@@ -36,7 +37,6 @@
import org.apache.asterix.metadata.utils.MetadataConstants;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import
org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
@@ -111,12 +111,12 @@
return dataset;
}
- public static INodeDomain findNodeDomain(MetadataTransactionContext
mdTxnCtx, String nodeGroupName)
- throws AlgebricksException {
+ public static INodeDomain findNodeDomain(IClusterStateManager
clusterStateManager,
+ MetadataTransactionContext mdTxnCtx, String nodeGroupName) throws
AlgebricksException {
NodeGroup nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx,
nodeGroupName);
List<String> partitions = new ArrayList<>();
for (String location : nodeGroup.getNodeNames()) {
- int numPartitions =
ClusterStateManager.INSTANCE.getNodePartitionsCount(location);
+ int numPartitions =
clusterStateManager.getNodePartitionsCount(location);
for (int i = 0; i < numPartitions; i++) {
partitions.add(location);
}
@@ -165,24 +165,24 @@
}
}
- public static DataSource findDataSource(MetadataTransactionContext
mdTxnCtx, DataSourceId id)
- throws AlgebricksException {
+ public static DataSource findDataSource(IClusterStateManager
clusterStateManager,
+ MetadataTransactionContext mdTxnCtx, DataSourceId id) throws
AlgebricksException {
try {
- return lookupSourceInMetadata(mdTxnCtx, id);
+ return lookupSourceInMetadata(clusterStateManager, mdTxnCtx, id);
} catch (MetadataException e) {
throw new AlgebricksException(e);
}
}
- public static DataSource lookupSourceInMetadata(MetadataTransactionContext
mdTxnCtx, DataSourceId aqlId)
- throws AlgebricksException {
+ public static DataSource lookupSourceInMetadata(IClusterStateManager
clusterStateManager,
+ MetadataTransactionContext mdTxnCtx, DataSourceId aqlId) throws
AlgebricksException {
Dataset dataset = findDataset(mdTxnCtx, aqlId.getDataverseName(),
aqlId.getDatasourceName());
if (dataset == null) {
throw new AlgebricksException("Datasource with id " + aqlId + "
was not found.");
}
IAType itemType = findType(mdTxnCtx,
dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
IAType metaItemType = findType(mdTxnCtx,
dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
- INodeDomain domain = findNodeDomain(mdTxnCtx,
dataset.getNodeGroupName());
+ INodeDomain domain = findNodeDomain(clusterStateManager, mdTxnCtx,
dataset.getNodeGroupName());
byte datasourceType =
dataset.getDatasetType().equals(DatasetType.EXTERNAL) ?
DataSource.Type.EXTERNAL_DATASET
: DataSource.Type.INTERNAL_DATASET;
return new DatasetDataSource(aqlId, dataset, itemType, metaItemType,
datasourceType,
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 8971a90..d6a3f21 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -84,7 +85,6 @@
import org.apache.asterix.runtime.formats.FormatUtils;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
import
org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -295,7 +295,7 @@
}
public INodeDomain findNodeDomain(String nodeGroupName) throws
AlgebricksException {
- return MetadataManagerUtil.findNodeDomain(mdTxnCtx, nodeGroupName);
+ return
MetadataManagerUtil.findNodeDomain(appCtx.getClusterStateManager(), mdTxnCtx,
nodeGroupName);
}
public List<String> findNodes(String nodeGroupName) throws
AlgebricksException {
@@ -329,11 +329,11 @@
@Override
public DataSource findDataSource(DataSourceId id) throws
AlgebricksException {
- return MetadataManagerUtil.findDataSource(mdTxnCtx, id);
+ return
MetadataManagerUtil.findDataSource(appCtx.getClusterStateManager(), mdTxnCtx,
id);
}
public DataSource lookupSourceInMetadata(DataSourceId aqlId) throws
AlgebricksException {
- return MetadataManagerUtil.lookupSourceInMetadata(mdTxnCtx, aqlId);
+ return
MetadataManagerUtil.lookupSourceInMetadata(appCtx.getClusterStateManager(),
mdTxnCtx, aqlId);
}
@Override
@@ -709,8 +709,9 @@
int numPartitions = 0;
List<String> nodeGroup =
MetadataManager.INSTANCE.getNodegroup(mdTxnCtx,
dataset.getNodeGroupName()).getNodeNames();
+ IClusterStateManager csm = appCtx.getClusterStateManager();
for (String nd : nodeGroup) {
- numPartitions +=
ClusterStateManager.INSTANCE.getNodePartitionsCount(nd);
+ numPartitions += csm.getNodePartitionsCount(nd);
}
return numElementsHint / numPartitions;
}
@@ -755,12 +756,13 @@
}
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
splitAndConstraints(String dataverse) {
- return
SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(dataverse);
+ return
SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(),
+ dataverse);
}
public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx,
Dataset dataset, String indexName)
throws AlgebricksException {
- return SplitsAndConstraintsUtil.getIndexSplits(dataset, indexName,
mdTxnCtx);
+ return
SplitsAndConstraintsUtil.getIndexSplits(appCtx.getClusterStateManager(),
dataset, indexName, mdTxnCtx);
}
public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx,
String dataverseName, String adapterName)
@@ -777,7 +779,7 @@
}
public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
- return ClusterStateManager.INSTANCE.getClusterLocations();
+ return appCtx.getClusterStateManager().getClusterLocations();
}
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
buildExternalDataLookupRuntime(
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
index 6825f10..5b7ea59 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -23,6 +23,7 @@
import java.util.List;
import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.ClusterProperties;
import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.utils.StoragePathUtil;
@@ -30,7 +31,6 @@
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.NodeGroup;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -42,11 +42,11 @@
private SplitsAndConstraintsUtil() {
}
- private static FileSplit[] getDataverseSplits(String dataverseName) {
+ private static FileSplit[] getDataverseSplits(IClusterStateManager
clusterStateManager, String dataverseName) {
File relPathFile = new File(dataverseName);
List<FileSplit> splits = new ArrayList<>();
// get all partitions
- ClusterPartition[] clusterPartition =
ClusterStateManager.INSTANCE.getClusterPartitons();
+ ClusterPartition[] clusterPartition =
clusterStateManager.getClusterPartitons();
String storageDirName =
ClusterProperties.INSTANCE.getStorageDirectoryName();
for (int j = 0; j < clusterPartition.length; j++) {
File f = new File(
@@ -57,28 +57,29 @@
return splits.toArray(new FileSplit[] {});
}
- public static FileSplit[] getIndexSplits(Dataset dataset, String
indexName, MetadataTransactionContext mdTxnCtx)
- throws AlgebricksException {
+ public static FileSplit[] getIndexSplits(IClusterStateManager
clusterStateManager, Dataset dataset,
+ String indexName, MetadataTransactionContext mdTxnCtx) throws
AlgebricksException {
try {
NodeGroup nodeGroup =
MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName());
if (nodeGroup == null) {
throw new AlgebricksException("Couldn't find node group " +
dataset.getNodeGroupName());
}
List<String> nodeList = nodeGroup.getNodeNames();
- return getIndexSplits(dataset, indexName, nodeList);
+ return getIndexSplits(clusterStateManager, dataset, indexName,
nodeList);
} catch (MetadataException me) {
throw new AlgebricksException(me);
}
}
- public static FileSplit[] getIndexSplits(Dataset dataset, String
indexName, List<String> nodes) {
+ public static FileSplit[] getIndexSplits(IClusterStateManager
clusterStateManager, Dataset dataset,
+ String indexName, List<String> nodes) {
File relPathFile = new
File(StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(),
dataset.getDatasetName(), indexName,
dataset.getRebalanceCount()));
String storageDirName =
ClusterProperties.INSTANCE.getStorageDirectoryName();
List<FileSplit> splits = new ArrayList<>();
for (String nd : nodes) {
- int numPartitions =
ClusterStateManager.INSTANCE.getNodePartitionsCount(nd);
- ClusterPartition[] nodePartitions =
ClusterStateManager.INSTANCE.getNodePartitions(nd);
+ int numPartitions = clusterStateManager.getNodePartitionsCount(nd);
+ ClusterPartition[] nodePartitions =
clusterStateManager.getNodePartitions(nd);
// currently this case is never executed since the metadata group
doesn't exists
if
(dataset.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME)
== 0) {
numPartitions = 1;
@@ -97,8 +98,8 @@
}
public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
getDataverseSplitProviderAndConstraints(
- String dataverse) {
- FileSplit[] splits = getDataverseSplits(dataverse);
+ IClusterStateManager clusterStateManager, String dataverse) {
+ FileSplit[] splits = getDataverseSplits(clusterStateManager,
dataverse);
return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index 194fd59..82a1177 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -20,11 +20,11 @@
import java.util.Set;
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
import org.apache.asterix.common.transactions.IResourceIdManager;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class ResourceIdRequestMessage implements ICcAddressedMessage {
@@ -40,7 +40,8 @@
try {
ICCMessageBroker broker = (ICCMessageBroker)
appCtx.getServiceContext().getMessageBroker();
ResourceIdRequestResponseMessage reponse = new
ResourceIdRequestResponseMessage();
- if (!ClusterStateManager.INSTANCE.isClusterActive()) {
+ IClusterStateManager clusterStateManager =
appCtx.getClusterStateManager();
+ if (!clusterStateManager.isClusterActive()) {
reponse.setResourceId(-1);
reponse.setException(new Exception("Cannot generate global
resource id when cluster is not active."));
} else {
@@ -49,7 +50,7 @@
if (reponse.getResourceId() < 0) {
reponse.setException(new Exception("One or more nodes has
not reported max resource id."));
}
- requestMaxResourceID(resourceIdManager, broker);
+ requestMaxResourceID(clusterStateManager, resourceIdManager,
broker);
}
broker.sendApplicationMessageToNC(reponse, src);
} catch (Exception e) {
@@ -57,8 +58,9 @@
}
}
- private void requestMaxResourceID(IResourceIdManager resourceIdManager,
ICCMessageBroker broker) throws Exception {
- Set<String> getParticipantNodes =
ClusterStateManager.INSTANCE.getParticipantNodes();
+ private void requestMaxResourceID(IClusterStateManager
clusterStateManager, IResourceIdManager resourceIdManager,
+ ICCMessageBroker broker) throws Exception {
+ Set<String> getParticipantNodes =
clusterStateManager.getParticipantNodes();
ReportMaxResourceIdRequestMessage msg = new
ReportMaxResourceIdRequestMessage();
for (String nodeId : getParticipantNodes) {
if (!resourceIdManager.reported(nodeId)) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
similarity index 84%
rename from
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java
rename to
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
index 372404c..6a5ed08 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/ResourceIdManager.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
@@ -16,27 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.app.cc;
+package org.apache.asterix.runtime.transaction;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.transactions.IResourceIdManager;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
public class ResourceIdManager implements IResourceIdManager {
+ private final IClusterStateManager csm;
private final AtomicLong globalResourceId = new AtomicLong();
private volatile Set<String> reportedNodes = new HashSet<>();
private volatile boolean allReported = false;
+
+ public ResourceIdManager(IClusterStateManager csm) {
+ this.csm = csm;
+ }
@Override
public long createResourceId() {
if (!allReported) {
synchronized (this) {
if (!allReported) {
- if (reportedNodes.size() <
ClusterStateManager.INSTANCE.getNumberOfNodes()) {
+ if (reportedNodes.size() < csm.getNumberOfNodes()) {
return -1;
} else {
reportedNodes = null;
@@ -58,7 +63,7 @@
if (!allReported) {
globalResourceId.set(Math.max(maxResourceId,
globalResourceId.get()));
reportedNodes.add(nodeId);
- if (reportedNodes.size() ==
ClusterStateManager.INSTANCE.getNumberOfNodes()) {
+ if (reportedNodes.size() == csm.getNumberOfNodes()) {
reportedNodes = null;
allReported = true;
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 28c480f..e4cc7f4 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -43,6 +43,7 @@
import org.apache.asterix.common.metadata.IMetadataBootstrap;
import org.apache.asterix.common.replication.IFaultToleranceStrategy;
import org.apache.asterix.common.transactions.IResourceIdManager;
+import org.apache.asterix.runtime.transaction.ResourceIdManager;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.IJobLifecycleListener;
@@ -77,17 +78,16 @@
private IFaultToleranceStrategy ftStrategy;
private IJobLifecycleListener activeLifeCycleListener;
private IMetadataLockManager mdLockManager;
+ private IClusterStateManager clusterStateManager;
public CcApplicationContext(ICCServiceContext ccServiceCtx,
IHyracksClientConnection hcc,
- ILibraryManager libraryManager, IResourceIdManager
resourceIdManager,
- Supplier<IMetadataBootstrap> metadataBootstrapSupplier,
IGlobalRecoveryManager globalRecoveryManager,
- IFaultToleranceStrategy ftStrategy, IJobLifecycleListener
activeLifeCycleListener,
- IStorageComponentProvider storageComponentProvider,
IMetadataLockManager mdLockManager)
- throws AsterixException, IOException {
+ ILibraryManager libraryManager, Supplier<IMetadataBootstrap>
metadataBootstrapSupplier,
+ IGlobalRecoveryManager globalRecoveryManager,
IFaultToleranceStrategy ftStrategy,
+ IJobLifecycleListener activeLifeCycleListener,
IStorageComponentProvider storageComponentProvider,
+ IMetadataLockManager mdLockManager) throws AsterixException,
IOException {
this.ccServiceCtx = ccServiceCtx;
this.hcc = hcc;
this.libraryManager = libraryManager;
- this.resourceIdManager = resourceIdManager;
this.activeLifeCycleListener = activeLifeCycleListener;
// Determine whether to use old-style asterix-configuration.xml or
new-style configuration.
// QQQ strip this out eventually
@@ -109,6 +109,9 @@
this.globalRecoveryManager = globalRecoveryManager;
this.storageComponentProvider = storageComponentProvider;
this.mdLockManager = mdLockManager;
+ clusterStateManager = new ClusterStateManager();
+ clusterStateManager.setCcAppCtx(this);
+ this.resourceIdManager = new ResourceIdManager(clusterStateManager);
}
@Override
@@ -204,6 +207,7 @@
return resourceIdManager;
}
+ @Override
public IMetadataBootstrap getMetadataBootstrap() {
return metadataBootstrapSupplier.get();
}
@@ -230,6 +234,6 @@
@Override
public IClusterStateManager getClusterStateManager() {
- return ClusterStateManager.INSTANCE;
+ return clusterStateManager;
}
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index cdb3112..2042c7c 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -36,6 +36,7 @@
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.config.ClusterProperties;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.replication.IFaultToleranceStrategy;
@@ -66,7 +67,6 @@
*/
private static final Logger LOGGER =
Logger.getLogger(ClusterStateManager.class.getName());
- public static final ClusterStateManager INSTANCE = new
ClusterStateManager();
private final Map<String, Map<IOption, Object>> activeNcConfiguration =
new HashMap<>();
private Set<String> pendingRemoval = new HashSet<>();
private final Cluster cluster;
@@ -78,13 +78,16 @@
private boolean metadataNodeActive = false;
private Set<String> failedNodes = new HashSet<>();
private IFaultToleranceStrategy ftStrategy;
- private CcApplicationContext appCtx;
+ private ICcApplicationContext appCtx;
- private ClusterStateManager() {
+ public ClusterStateManager() {
+ Exception e = new Exception();
+ LOGGER.log(Level.WARNING, "Creating Cluster State manager", e);
cluster = ClusterProperties.INSTANCE.getCluster();
}
- public void setCcAppCtx(CcApplicationContext appCtx) {
+ @Override
+ public void setCcAppCtx(ICcApplicationContext appCtx) {
this.appCtx = appCtx;
node2PartitionsMap =
appCtx.getMetadataProperties().getNodePartitions();
clusterPartitions =
appCtx.getMetadataProperties().getClusterPartitions();
@@ -93,6 +96,7 @@
ftStrategy.bindTo(this);
}
+ @Override
public synchronized void removeNCConfiguration(String nodeId) throws
HyracksException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Removing configuration parameters for node id " +
nodeId);
@@ -102,6 +106,7 @@
pendingRemoval.remove(nodeId);
}
+ @Override
public synchronized void addNCConfiguration(String nodeId, Map<IOption,
Object> configuration)
throws HyracksException {
if (LOGGER.isLoggable(Level.INFO)) {
@@ -209,13 +214,7 @@
return true;
}
- /**
- * Returns the IO devices configured for a Node Controller
- *
- * @param nodeId
- * unique identifier of the Node Controller
- * @return a list of IO devices.
- */
+ @Override
public synchronized String[] getIODevices(String nodeId) {
Map<IOption, Object> ncConfig = activeNcConfiguration.get(nodeId);
if (ncConfig == null) {
@@ -233,11 +232,13 @@
return state;
}
+ @Override
public synchronized Node getAvailableSubstitutionNode() {
List<Node> subNodes = cluster.getSubstituteNodes() == null ? null :
cluster.getSubstituteNodes().getNode();
return subNodes == null || subNodes.isEmpty() ? null : subNodes.get(0);
}
+ @Override
public synchronized Set<String> getParticipantNodes() {
Set<String> participantNodes = new HashSet<>();
for (String pNode : activeNcConfiguration.keySet()) {
@@ -246,6 +247,7 @@
return participantNodes;
}
+ @Override
public synchronized Set<String> getParticipantNodes(boolean
excludePendingRemoval) {
Set<String> participantNodes = getParticipantNodes();
if (excludePendingRemoval) {
@@ -254,6 +256,7 @@
return participantNodes;
}
+ @Override
public synchronized AlgebricksAbsolutePartitionConstraint
getClusterLocations() {
if (clusterPartitionConstraint == null) {
resetClusterPartitionConstraint();
@@ -272,6 +275,7 @@
new
AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new
String[] {}));
}
+ @Override
public synchronized boolean isClusterActive() {
if (cluster == null) {
// this is a virtual cluster
@@ -280,6 +284,7 @@
return state == ClusterState.ACTIVE;
}
+ @Override
public int getNumberOfNodes() {
return appCtx.getMetadataProperties().getNodeNames().size();
}
@@ -289,6 +294,7 @@
return node2PartitionsMap.get(nodeId);
}
+ @Override
public synchronized int getNodePartitionsCount(String node) {
if (node2PartitionsMap.containsKey(node)) {
return node2PartitionsMap.get(node).length;
@@ -305,10 +311,12 @@
return partitons.toArray(new ClusterPartition[] {});
}
+ @Override
public synchronized boolean isMetadataNodeActive() {
return metadataNodeActive;
}
+ @Override
public synchronized ObjectNode getClusterStateDescription() {
ObjectMapper om = new ObjectMapper();
ObjectNode stateDescription = om.createObjectNode();
@@ -342,6 +350,7 @@
return stateDescription;
}
+ @Override
public synchronized ObjectNode getClusterStateSummary() {
ObjectMapper om = new ObjectMapper();
ObjectNode stateDescription = om.createObjectNode();
@@ -395,6 +404,7 @@
}
}
+ @Override
public synchronized void removePending(String nodeId) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Registering intention to remove node id " + nodeId);
@@ -406,6 +416,7 @@
}
}
+ @Override
public synchronized boolean cancelRemovePending(String nodeId) {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Deregistering intention to remove node id " + nodeId);
@@ -423,8 +434,8 @@
}
private void updateNodeConfig(String nodeId, Map<IOption, Object>
configuration) {
- ConfigManager configManager = ((ConfigManagerApplicationConfig)
appCtx.getServiceContext().getAppConfig())
- .getConfigManager();
+ ConfigManager configManager =
+ ((ConfigManagerApplicationConfig)
appCtx.getServiceContext().getAppConfig()).getConfigManager();
for (Map.Entry<IOption, Object> entry : configuration.entrySet()) {
if (entry.getKey().section() == Section.NC) {
configManager.set(nodeId, entry.getKey(), entry.getValue());
--
To view, visit https://asterix-gerrit.ics.uci.edu/1944
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Id6532245033ac4c6f6aa9f193539944eecb832f7
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>