abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1325
Change subject: Extract IPCIs out of ClusterControllerService
......................................................................
Extract IPCIs out of ClusterControllerService
moving the two IPCIs out of cluster controller service is a good
start to cleanup the class. In addition, this change renames queue
to workQueue in NodeControllerService for consistency.
Change-Id: I403e61cc054a860bef6a71fa04393f4d9c368b36
---
A
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
A
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/HyracksClientInterfaceIPCI.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
4 files changed, 373 insertions(+), 348 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/25/1325/1
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
new file mode 100644
index 0000000..3dd21a0
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -0,0 +1,186 @@
+package org.apache.hyracks.control.cc;
+
+import java.util.Map;
+
+import org.apache.hyracks.api.client.NodeControllerInfo;
+import org.apache.hyracks.control.cc.work.ApplicationMessageWork;
+import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
+import org.apache.hyracks.control.cc.work.JobletCleanupNotificationWork;
+import org.apache.hyracks.control.cc.work.NodeHeartbeatWork;
+import org.apache.hyracks.control.cc.work.NotifyDeployBinaryWork;
+import org.apache.hyracks.control.cc.work.NotifyShutdownWork;
+import org.apache.hyracks.control.cc.work.NotifyStateDumpResponse;
+import org.apache.hyracks.control.cc.work.NotifyThreadDumpResponse;
+import org.apache.hyracks.control.cc.work.RegisterNodeWork;
+import org.apache.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
+import org.apache.hyracks.control.cc.work.RegisterPartitionRequestWork;
+import org.apache.hyracks.control.cc.work.RegisterResultPartitionLocationWork;
+import org.apache.hyracks.control.cc.work.ReportProfilesWork;
+import org.apache.hyracks.control.cc.work.ReportResultPartitionFailureWork;
+import
org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
+import org.apache.hyracks.control.cc.work.TaskCompleteWork;
+import org.apache.hyracks.control.cc.work.TaskFailureWork;
+import org.apache.hyracks.control.cc.work.UnregisterNodeWork;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions;
+import org.apache.hyracks.control.common.ipc.CCNCFunctions.Function;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownResponseFunction;
+import
org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction;
+import org.apache.hyracks.control.common.work.IPCResponder;
+import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.api.IIPCI;
+
+class ClusterControllerIPCI implements IIPCI {
+ private final ClusterControllerService ccs;
+
+ ClusterControllerIPCI(ClusterControllerService ccs) {
+ this.ccs = ccs;
+ }
+
+ @Override
+ public void deliverIncomingMessage(final IIPCHandle handle, long mid, long
rmid, Object payload,
+ Exception exception) {
+ CCNCFunctions.Function fn = (Function) payload;
+ switch (fn.getFunctionId()) {
+ case REGISTER_NODE: {
+ CCNCFunctions.RegisterNodeFunction rnf =
(CCNCFunctions.RegisterNodeFunction) fn;
+ ccs.getWorkQueue().schedule(new RegisterNodeWork(ccs,
rnf.getNodeRegistration()));
+ return;
+ }
+
+ case UNREGISTER_NODE: {
+ CCNCFunctions.UnregisterNodeFunction unf =
(CCNCFunctions.UnregisterNodeFunction) fn;
+ ccs.getWorkQueue().schedule(new UnregisterNodeWork(ccs,
unf.getNodeId()));
+ return;
+ }
+
+ case NODE_HEARTBEAT: {
+ CCNCFunctions.NodeHeartbeatFunction nhf =
(CCNCFunctions.NodeHeartbeatFunction) fn;
+ ccs.getWorkQueue().schedule(new NodeHeartbeatWork(ccs,
nhf.getNodeId(),
+ nhf.getHeartbeatData()));
+ return;
+ }
+
+ case NOTIFY_JOBLET_CLEANUP: {
+ CCNCFunctions.NotifyJobletCleanupFunction njcf =
(CCNCFunctions.NotifyJobletCleanupFunction) fn;
+ ccs.getWorkQueue().schedule(new
JobletCleanupNotificationWork(ccs, njcf.getJobId(),
+ njcf.getNodeId()));
+ return;
+ }
+
+ case NOTIFY_DEPLOY_BINARY: {
+ CCNCFunctions.NotifyDeployBinaryFunction ndbf =
(CCNCFunctions.NotifyDeployBinaryFunction) fn;
+ ccs.getWorkQueue().schedule(new NotifyDeployBinaryWork(ccs,
ndbf.getDeploymentId(),
+ ndbf.getNodeId(), ndbf.getDeploymentStatus()));
+ return;
+ }
+
+ case REPORT_PROFILE: {
+ CCNCFunctions.ReportProfileFunction rpf =
(CCNCFunctions.ReportProfileFunction) fn;
+ ccs.getWorkQueue().schedule(new ReportProfilesWork(ccs,
rpf.getProfiles()));
+ return;
+ }
+
+ case NOTIFY_TASK_COMPLETE: {
+ CCNCFunctions.NotifyTaskCompleteFunction ntcf =
(CCNCFunctions.NotifyTaskCompleteFunction) fn;
+ ccs.getWorkQueue().schedule(new TaskCompleteWork(ccs,
ntcf.getJobId(),
+ ntcf.getTaskId(), ntcf.getNodeId(),
ntcf.getStatistics()));
+ return;
+ }
+ case NOTIFY_TASK_FAILURE: {
+ CCNCFunctions.NotifyTaskFailureFunction ntff =
(CCNCFunctions.NotifyTaskFailureFunction) fn;
+ ccs.getWorkQueue().schedule(new TaskFailureWork(ccs,
ntff.getJobId(),
+ ntff.getTaskId(), ntff.getNodeId(),
ntff.getExceptions()));
+ return;
+ }
+
+ case REGISTER_PARTITION_PROVIDER: {
+ CCNCFunctions.RegisterPartitionProviderFunction rppf =
+ (CCNCFunctions.RegisterPartitionProviderFunction) fn;
+ ccs.getWorkQueue().schedule(new
RegisterPartitionAvailibilityWork(ccs,
+ rppf.getPartitionDescriptor()));
+ return;
+ }
+
+ case REGISTER_PARTITION_REQUEST: {
+ CCNCFunctions.RegisterPartitionRequestFunction rprf =
+ (CCNCFunctions.RegisterPartitionRequestFunction) fn;
+ ccs.getWorkQueue().schedule(new
RegisterPartitionRequestWork(ccs,
+ rprf.getPartitionRequest()));
+ return;
+ }
+
+ case REGISTER_RESULT_PARTITION_LOCATION: {
+ CCNCFunctions.RegisterResultPartitionLocationFunction rrplf =
+
(CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
+ ccs.getWorkQueue().schedule(new
RegisterResultPartitionLocationWork(ccs,
+ rrplf.getJobId(), rrplf.getResultSetId(),
rrplf.getOrderedResult(), rrplf.getEmptyResult(),
+ rrplf.getPartition(), rrplf.getNPartitions(),
rrplf.getNetworkAddress()));
+ return;
+ }
+
+ case REPORT_RESULT_PARTITION_WRITE_COMPLETION: {
+ CCNCFunctions.ReportResultPartitionWriteCompletionFunction
rrplf =
+
(CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
+ ccs.getWorkQueue().schedule(new
ReportResultPartitionWriteCompletionWork(ccs,
+ rrplf.getJobId(), rrplf.getResultSetId(),
rrplf.getPartition()));
+ return;
+ }
+
+ case REPORT_RESULT_PARTITION_FAILURE: {
+ CCNCFunctions.ReportResultPartitionFailureFunction rrplf =
+ (CCNCFunctions.ReportResultPartitionFailureFunction)
fn;
+ ccs.getWorkQueue().schedule(new
ReportResultPartitionFailureWork(ccs,
+ rrplf.getJobId(), rrplf.getResultSetId(),
rrplf.getPartition()));
+ return;
+ }
+
+ case SEND_APPLICATION_MESSAGE: {
+ CCNCFunctions.SendApplicationMessageFunction rsf =
+ (CCNCFunctions.SendApplicationMessageFunction) fn;
+ ccs.getWorkQueue().schedule(new ApplicationMessageWork(ccs,
rsf.getMessage(),
+ rsf.getDeploymentId(), rsf.getNodeId()));
+ return;
+ }
+
+ case GET_NODE_CONTROLLERS_INFO: {
+ ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs,
+ new IResultCallback<Map<String, NodeControllerInfo>>()
{
+ @Override
+ public void setValue(Map<String,
NodeControllerInfo> result) {
+ new
IPCResponder<CCNCFunctions.GetNodeControllersInfoResponseFunction>(handle, -1)
+ .setValue(new
CCNCFunctions.GetNodeControllersInfoResponseFunction(result));
+ }
+
+ @Override
+ public void setException(Exception e) {
+ }
+ }));
+ return;
+ }
+
+ case STATE_DUMP_RESPONSE: {
+ CCNCFunctions.StateDumpResponseFunction dsrf =
(StateDumpResponseFunction) fn;
+ ccs.getWorkQueue().schedule(new NotifyStateDumpResponse(ccs,
dsrf.getNodeId(),
+ dsrf.getStateDumpId(), dsrf.getState()));
+ return;
+ }
+
+ case SHUTDOWN_RESPONSE: {
+ CCNCFunctions.ShutdownResponseFunction sdrf =
(ShutdownResponseFunction) fn;
+ ccs.getWorkQueue().schedule(new NotifyShutdownWork(ccs,
sdrf.getNodeId()));
+ return;
+ }
+
+ case THREAD_DUMP_RESPONSE: {
+ CCNCFunctions.ThreadDumpResponseFunction tdrf =
+ (CCNCFunctions.ThreadDumpResponseFunction)fn;
+ ccs.getWorkQueue().schedule(new NotifyThreadDumpResponse(ccs,
+ tdrf.getRequestId(), tdrf.getThreadDumpJSON()));
+ return;
+
+ }
+ }
+ ClusterControllerService.LOGGER.warning("Unknown function: " +
fn.getFunctionId());
+ }
+}
\ No newline at end of file
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index ac38523..2291e69 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -39,17 +39,11 @@
import org.apache.hyracks.api.application.ICCApplicationEntryPoint;
import org.apache.hyracks.api.client.ClusterControllerInfo;
-import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
-import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.context.ICCContext;
-import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobIdFactory;
-import org.apache.hyracks.api.job.JobInfo;
-import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.api.topology.ClusterTopology;
import org.apache.hyracks.api.topology.TopologyDefinitionParser;
@@ -58,65 +52,28 @@
import org.apache.hyracks.control.cc.dataset.IDatasetDirectoryService;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.cc.web.WebServer;
-import org.apache.hyracks.control.cc.work.ApplicationMessageWork;
-import org.apache.hyracks.control.cc.work.CliDeployBinaryWork;
-import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork;
-import org.apache.hyracks.control.cc.work.ClusterShutdownWork;
import org.apache.hyracks.control.cc.work.GatherStateDumpsWork.StateDumpRun;
-import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
import org.apache.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
-import org.apache.hyracks.control.cc.work.GetJobInfoWork;
-import org.apache.hyracks.control.cc.work.GetJobStatusWork;
-import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
-import org.apache.hyracks.control.cc.work.GetThreadDumpWork;
import org.apache.hyracks.control.cc.work.GetThreadDumpWork.ThreadDumpRun;
-import org.apache.hyracks.control.cc.work.GetNodeDetailsJSONWork;
-import org.apache.hyracks.control.cc.work.GetResultPartitionLocationsWork;
-import org.apache.hyracks.control.cc.work.GetResultStatusWork;
-import org.apache.hyracks.control.cc.work.JobStartWork;
-import org.apache.hyracks.control.cc.work.JobletCleanupNotificationWork;
-import org.apache.hyracks.control.cc.work.NodeHeartbeatWork;
-import org.apache.hyracks.control.cc.work.NotifyDeployBinaryWork;
-import org.apache.hyracks.control.cc.work.NotifyShutdownWork;
-import org.apache.hyracks.control.cc.work.NotifyStateDumpResponse;
-import org.apache.hyracks.control.cc.work.NotifyThreadDumpResponse;
-import org.apache.hyracks.control.cc.work.RegisterNodeWork;
-import org.apache.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
-import org.apache.hyracks.control.cc.work.RegisterPartitionRequestWork;
-import org.apache.hyracks.control.cc.work.RegisterResultPartitionLocationWork;
import org.apache.hyracks.control.cc.work.RemoveDeadNodesWork;
-import org.apache.hyracks.control.cc.work.ReportProfilesWork;
-import org.apache.hyracks.control.cc.work.ReportResultPartitionFailureWork;
-import
org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
import org.apache.hyracks.control.cc.work.ShutdownNCServiceWork;
-import org.apache.hyracks.control.cc.work.TaskCompleteWork;
-import org.apache.hyracks.control.cc.work.TaskFailureWork;
import org.apache.hyracks.control.cc.work.TriggerNCWork;
-import org.apache.hyracks.control.cc.work.UnregisterNodeWork;
-import org.apache.hyracks.control.cc.work.WaitForJobCompletionWork;
import org.apache.hyracks.control.common.context.ServerContext;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.controllers.IniUtils;
import org.apache.hyracks.control.common.deployment.DeploymentRun;
import org.apache.hyracks.control.common.ipc.CCNCFunctions;
-import org.apache.hyracks.control.common.ipc.CCNCFunctions.Function;
-import
org.apache.hyracks.control.common.ipc.CCNCFunctions.ShutdownResponseFunction;
-import
org.apache.hyracks.control.common.ipc.CCNCFunctions.StateDumpResponseFunction;
import org.apache.hyracks.control.common.logs.LogFile;
import org.apache.hyracks.control.common.shutdown.ShutdownRun;
-import org.apache.hyracks.control.common.work.IPCResponder;
-import org.apache.hyracks.control.common.work.IResultCallback;
import org.apache.hyracks.control.common.work.WorkQueue;
-import org.apache.hyracks.ipc.api.IIPCHandle;
import org.apache.hyracks.ipc.api.IIPCI;
-import org.apache.hyracks.ipc.exceptions.IPCException;
import org.apache.hyracks.ipc.impl.IPCSystem;
import
org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer;
import org.ini4j.Ini;
import org.xml.sax.InputSource;
public class ClusterControllerService implements IControllerService {
- private static final Logger LOGGER =
Logger.getLogger(ClusterControllerService.class.getName());
+ static final Logger LOGGER =
Logger.getLogger(ClusterControllerService.class.getName());
private final CCConfig ccConfig;
@@ -156,8 +113,6 @@
private final IDatasetDirectoryService datasetDirectoryService;
- private final JobIdFactory jobIdFactory;
-
private final Map<DeploymentId, DeploymentRun> deploymentRunMap;
private final Map<String, StateDumpRun> stateDumpRunMap;
@@ -175,10 +130,10 @@
nodeRegistry = new LinkedHashMap<>();
ipAddressNodeNameMap = new HashMap<>();
serverCtx = new
ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new
File(ccConfig.ccRoot));
- IIPCI ccIPCI = new ClusterControllerIPCI();
+ IIPCI ccIPCI = new ClusterControllerIPCI(this);
clusterIPC = new IPCSystem(new
InetSocketAddress(ccConfig.clusterNetPort), ccIPCI,
new CCNCFunctions.SerializerDeserializer());
- IIPCI ciIPCI = new HyracksClientInterfaceIPCI();
+ IIPCI ciIPCI = new HyracksClientInterfaceIPCI(this);
clientIPC = new IPCSystem(new
InetSocketAddress(ccConfig.clientNetIpAddress, ccConfig.clientNetPort), ciIPCI,
new JavaSerializationBasedPayloadSerializerDeserializer());
webServer = new WebServer(this);
@@ -208,7 +163,6 @@
ccContext = new ClusterControllerContext(topology);
sweeper = new DeadNodeSweeper();
datasetDirectoryService = new
DatasetDirectoryService(ccConfig.resultTTL, ccConfig.resultSweepThreshold);
- jobIdFactory = new JobIdFactory();
deploymentRunMap = new HashMap<>();
stateDumpRunMap = new HashMap<>();
@@ -440,291 +394,6 @@
public IDatasetDirectoryService getDatasetDirectoryService() {
return datasetDirectoryService;
- }
-
- private class HyracksClientInterfaceIPCI implements IIPCI {
-
- @Override
- public void deliverIncomingMessage(IIPCHandle handle, long mid, long
rmid, Object payload,
- Exception exception) {
- HyracksClientInterfaceFunctions.Function fn =
(HyracksClientInterfaceFunctions.Function) payload;
- switch (fn.getFunctionId()) {
- case GET_CLUSTER_CONTROLLER_INFO: {
- try {
- handle.send(mid, info, null);
- } catch (IPCException e) {
- e.printStackTrace();
- }
- return;
- }
-
- case CREATE_JOB:
- break;
- case GET_JOB_STATUS: {
- HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
-
(HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
- workQueue.schedule(new
GetJobStatusWork(ClusterControllerService.this, gjsf.getJobId(),
- new IPCResponder<JobStatus>(handle, mid)));
- return;
- }
-
- case GET_JOB_INFO: {
- HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf =
-
(HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
- workQueue.schedule(new
GetJobInfoWork(ClusterControllerService.this, gjsf.getJobId(),
- new IPCResponder<JobInfo>(handle, mid)));
- return;
- }
-
- case START_JOB: {
- HyracksClientInterfaceFunctions.StartJobFunction sjf =
- (HyracksClientInterfaceFunctions.StartJobFunction)
fn;
- JobId jobId = jobIdFactory.create();
- workQueue.schedule(new
JobStartWork(ClusterControllerService.this, sjf.getDeploymentId(),
- sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new
IPCResponder<JobId>(handle, mid)));
- return;
- }
-
- case GET_DATASET_DIRECTORY_SERIVICE_INFO: {
- workQueue.schedule(new
GetDatasetDirectoryServiceInfoWork(ClusterControllerService.this,
- new IPCResponder<NetworkAddress>(handle, mid)));
- return;
- }
-
- case GET_DATASET_RESULT_STATUS: {
-
HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf =
-
(HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
- workQueue.schedule(new
GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(),
- gdrlf.getResultSetId(), new
IPCResponder<Status>(handle, mid)));
- return;
- }
-
- case GET_DATASET_RECORD_DESCRIPTOR:
- break;
- case GET_DATASET_RESULT_LOCATIONS: {
-
HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
-
(HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
- workQueue.schedule(new
GetResultPartitionLocationsWork(ClusterControllerService.this,
- gdrlf.getJobId(), gdrlf.getResultSetId(),
gdrlf.getKnownRecords(),
- new IPCResponder<>(handle, mid)));
- return;
- }
-
- case WAIT_FOR_COMPLETION: {
- HyracksClientInterfaceFunctions.WaitForCompletionFunction
wfcf =
-
(HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
- workQueue.schedule(new
WaitForJobCompletionWork(ClusterControllerService.this, wfcf.getJobId(),
- new IPCResponder<Object>(handle, mid)));
- return;
- }
-
- case GET_NODE_CONTROLLERS_INFO: {
- workQueue.schedule(new
GetNodeControllersInfoWork(ClusterControllerService.this,
- new IPCResponder<>(handle, mid)));
- return;
- }
-
- case GET_CLUSTER_TOPOLOGY: {
- try {
- handle.send(mid, ccContext.getClusterTopology(), null);
- } catch (IPCException e) {
- e.printStackTrace();
- }
- return;
- }
-
- case CLI_DEPLOY_BINARY: {
- HyracksClientInterfaceFunctions.CliDeployBinaryFunction
dbf =
-
(HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
- workQueue.schedule(new
CliDeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs(),
- dbf.getDeploymentId(), new IPCResponder<>(handle,
mid)));
- return;
- }
-
- case CLI_UNDEPLOY_BINARY: {
- HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction
udbf =
-
(HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
- workQueue.schedule(new
CliUnDeployBinaryWork(ClusterControllerService.this, udbf.getDeploymentId(),
- new IPCResponder<>(handle, mid)));
- return;
- }
- case CLUSTER_SHUTDOWN: {
- HyracksClientInterfaceFunctions.ClusterShutdownFunction
csf =
-
(HyracksClientInterfaceFunctions.ClusterShutdownFunction) fn;
- workQueue.schedule(new
ClusterShutdownWork(ClusterControllerService.this,
- csf.isTerminateNCService(), new
IPCResponder<>(handle, mid)));
- return;
- }
-
- case GET_NODE_DETAILS_JSON:
- HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction
gndjf =
-
(HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction) fn;
- workQueue.schedule(new
GetNodeDetailsJSONWork(ClusterControllerService.this, gndjf.getNodeId(),
- gndjf.isIncludeStats(), gndjf.isIncludeConfig(),
new IPCResponder<>(handle, mid)));
- return;
-
- case THREAD_DUMP:
- HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
-
(HyracksClientInterfaceFunctions.ThreadDumpFunction) fn;
- workQueue.schedule(new
GetThreadDumpWork(ClusterControllerService.this, tdf.getNode(),
- new IPCResponder<String>(handle, mid)));
- return;
- }
- try {
- handle.send(mid, null, new IllegalArgumentException("Unknown
function " + fn.getFunctionId()));
- } catch (IPCException e) {
- e.printStackTrace();
- }
- }
- }
-
- private class ClusterControllerIPCI implements IIPCI {
- @Override
- public void deliverIncomingMessage(final IIPCHandle handle, long mid,
long rmid, Object payload,
- Exception exception) {
- CCNCFunctions.Function fn = (Function) payload;
- switch (fn.getFunctionId()) {
- case REGISTER_NODE: {
- CCNCFunctions.RegisterNodeFunction rnf =
(CCNCFunctions.RegisterNodeFunction) fn;
- workQueue.schedule(new
RegisterNodeWork(ClusterControllerService.this, rnf.getNodeRegistration()));
- return;
- }
-
- case UNREGISTER_NODE: {
- CCNCFunctions.UnregisterNodeFunction unf =
(CCNCFunctions.UnregisterNodeFunction) fn;
- workQueue.schedule(new
UnregisterNodeWork(ClusterControllerService.this, unf.getNodeId()));
- return;
- }
-
- case NODE_HEARTBEAT: {
- CCNCFunctions.NodeHeartbeatFunction nhf =
(CCNCFunctions.NodeHeartbeatFunction) fn;
- workQueue.schedule(new
NodeHeartbeatWork(ClusterControllerService.this, nhf.getNodeId(),
- nhf.getHeartbeatData()));
- return;
- }
-
- case NOTIFY_JOBLET_CLEANUP: {
- CCNCFunctions.NotifyJobletCleanupFunction njcf =
(CCNCFunctions.NotifyJobletCleanupFunction) fn;
- workQueue.schedule(new
JobletCleanupNotificationWork(ClusterControllerService.this, njcf.getJobId(),
- njcf.getNodeId()));
- return;
- }
-
- case NOTIFY_DEPLOY_BINARY: {
- CCNCFunctions.NotifyDeployBinaryFunction ndbf =
(CCNCFunctions.NotifyDeployBinaryFunction) fn;
- workQueue.schedule(new
NotifyDeployBinaryWork(ClusterControllerService.this, ndbf.getDeploymentId(),
- ndbf.getNodeId(), ndbf.getDeploymentStatus()));
- return;
- }
-
- case REPORT_PROFILE: {
- CCNCFunctions.ReportProfileFunction rpf =
(CCNCFunctions.ReportProfileFunction) fn;
- workQueue.schedule(new
ReportProfilesWork(ClusterControllerService.this, rpf.getProfiles()));
- return;
- }
-
- case NOTIFY_TASK_COMPLETE: {
- CCNCFunctions.NotifyTaskCompleteFunction ntcf =
(CCNCFunctions.NotifyTaskCompleteFunction) fn;
- workQueue.schedule(new
TaskCompleteWork(ClusterControllerService.this, ntcf.getJobId(),
- ntcf.getTaskId(), ntcf.getNodeId(),
ntcf.getStatistics()));
- return;
- }
- case NOTIFY_TASK_FAILURE: {
- CCNCFunctions.NotifyTaskFailureFunction ntff =
(CCNCFunctions.NotifyTaskFailureFunction) fn;
- workQueue.schedule(new
TaskFailureWork(ClusterControllerService.this, ntff.getJobId(),
- ntff.getTaskId(), ntff.getNodeId(),
ntff.getExceptions()));
- return;
- }
-
- case REGISTER_PARTITION_PROVIDER: {
- CCNCFunctions.RegisterPartitionProviderFunction rppf =
- (CCNCFunctions.RegisterPartitionProviderFunction)
fn;
- workQueue.schedule(new
RegisterPartitionAvailibilityWork(ClusterControllerService.this,
- rppf.getPartitionDescriptor()));
- return;
- }
-
- case REGISTER_PARTITION_REQUEST: {
- CCNCFunctions.RegisterPartitionRequestFunction rprf =
- (CCNCFunctions.RegisterPartitionRequestFunction)
fn;
- workQueue.schedule(new
RegisterPartitionRequestWork(ClusterControllerService.this,
- rprf.getPartitionRequest()));
- return;
- }
-
- case REGISTER_RESULT_PARTITION_LOCATION: {
- CCNCFunctions.RegisterResultPartitionLocationFunction
rrplf =
-
(CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
- workQueue.schedule(new
RegisterResultPartitionLocationWork(ClusterControllerService.this,
- rrplf.getJobId(), rrplf.getResultSetId(),
rrplf.getOrderedResult(), rrplf.getEmptyResult(),
- rrplf.getPartition(), rrplf.getNPartitions(),
rrplf.getNetworkAddress()));
- return;
- }
-
- case REPORT_RESULT_PARTITION_WRITE_COMPLETION: {
- CCNCFunctions.ReportResultPartitionWriteCompletionFunction
rrplf =
-
(CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
- workQueue.schedule(new
ReportResultPartitionWriteCompletionWork(ClusterControllerService.this,
- rrplf.getJobId(), rrplf.getResultSetId(),
rrplf.getPartition()));
- return;
- }
-
- case REPORT_RESULT_PARTITION_FAILURE: {
- CCNCFunctions.ReportResultPartitionFailureFunction rrplf =
-
(CCNCFunctions.ReportResultPartitionFailureFunction) fn;
- workQueue.schedule(new
ReportResultPartitionFailureWork(ClusterControllerService.this,
- rrplf.getJobId(), rrplf.getResultSetId(),
rrplf.getPartition()));
- return;
- }
-
- case SEND_APPLICATION_MESSAGE: {
- CCNCFunctions.SendApplicationMessageFunction rsf =
- (CCNCFunctions.SendApplicationMessageFunction) fn;
- workQueue.schedule(new
ApplicationMessageWork(ClusterControllerService.this, rsf.getMessage(),
- rsf.getDeploymentId(), rsf.getNodeId()));
- return;
- }
-
- case GET_NODE_CONTROLLERS_INFO: {
- workQueue.schedule(new
GetNodeControllersInfoWork(ClusterControllerService.this,
- new IResultCallback<Map<String,
NodeControllerInfo>>() {
- @Override
- public void setValue(Map<String,
NodeControllerInfo> result) {
- new
IPCResponder<CCNCFunctions.GetNodeControllersInfoResponseFunction>(handle, -1)
- .setValue(new
CCNCFunctions.GetNodeControllersInfoResponseFunction(result));
- }
-
- @Override
- public void setException(Exception e) {
- }
- }));
- return;
- }
-
- case STATE_DUMP_RESPONSE: {
- CCNCFunctions.StateDumpResponseFunction dsrf =
(StateDumpResponseFunction) fn;
- workQueue.schedule(new
NotifyStateDumpResponse(ClusterControllerService.this, dsrf.getNodeId(),
- dsrf.getStateDumpId(), dsrf.getState()));
- return;
- }
-
- case SHUTDOWN_RESPONSE: {
- CCNCFunctions.ShutdownResponseFunction sdrf =
(ShutdownResponseFunction) fn;
- workQueue.schedule(new
NotifyShutdownWork(ClusterControllerService.this, sdrf.getNodeId()));
- return;
- }
-
- case THREAD_DUMP_RESPONSE: {
- CCNCFunctions.ThreadDumpResponseFunction tdrf =
- (CCNCFunctions.ThreadDumpResponseFunction)fn;
- workQueue.schedule(new
NotifyThreadDumpResponse(ClusterControllerService.this,
- tdrf.getRequestId(), tdrf.getThreadDumpJSON()));
- return;
-
- }
- }
- LOGGER.warning("Unknown function: " + fn.getFunctionId());
- }
}
public synchronized void addStateDumpRun(String id, StateDumpRun sdr) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/HyracksClientInterfaceIPCI.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/HyracksClientInterfaceIPCI.java
new file mode 100644
index 0000000..249f6ca
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/HyracksClientInterfaceIPCI.java
@@ -0,0 +1,170 @@
+package org.apache.hyracks.control.cc;
+
+import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
+import org.apache.hyracks.api.comm.NetworkAddress;
+import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.job.JobIdFactory;
+import org.apache.hyracks.api.job.JobInfo;
+import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.control.cc.work.CliDeployBinaryWork;
+import org.apache.hyracks.control.cc.work.CliUnDeployBinaryWork;
+import org.apache.hyracks.control.cc.work.ClusterShutdownWork;
+import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
+import org.apache.hyracks.control.cc.work.GetJobInfoWork;
+import org.apache.hyracks.control.cc.work.GetJobStatusWork;
+import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
+import org.apache.hyracks.control.cc.work.GetNodeDetailsJSONWork;
+import org.apache.hyracks.control.cc.work.GetResultPartitionLocationsWork;
+import org.apache.hyracks.control.cc.work.GetResultStatusWork;
+import org.apache.hyracks.control.cc.work.GetThreadDumpWork;
+import org.apache.hyracks.control.cc.work.JobStartWork;
+import org.apache.hyracks.control.cc.work.WaitForJobCompletionWork;
+import org.apache.hyracks.control.common.work.IPCResponder;
+import org.apache.hyracks.ipc.api.IIPCHandle;
+import org.apache.hyracks.ipc.api.IIPCI;
+import org.apache.hyracks.ipc.exceptions.IPCException;
+
+class HyracksClientInterfaceIPCI implements IIPCI {
+
+ private final ClusterControllerService ccs;
+ private final JobIdFactory jobIdFactory;
+
+ HyracksClientInterfaceIPCI(ClusterControllerService ccs) {
+ this.ccs = ccs;
+ jobIdFactory = new JobIdFactory();
+ }
+
+ @Override
+ public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid,
Object payload,
+ Exception exception) {
+ HyracksClientInterfaceFunctions.Function fn =
(HyracksClientInterfaceFunctions.Function) payload;
+ switch (fn.getFunctionId()) {
+ case GET_CLUSTER_CONTROLLER_INFO: {
+ try {
+ handle.send(mid, ccs.getClusterControllerInfo(), null);
+ } catch (IPCException e) {
+ e.printStackTrace();
+ }
+ return;
+ }
+
+ case CREATE_JOB:
+ break;
+ case GET_JOB_STATUS: {
+ HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
+ (HyracksClientInterfaceFunctions.GetJobStatusFunction)
fn;
+ ccs.getWorkQueue().schedule(new GetJobStatusWork(ccs,
gjsf.getJobId(),
+ new IPCResponder<JobStatus>(handle, mid)));
+ return;
+ }
+
+ case GET_JOB_INFO: {
+ HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf =
+ (HyracksClientInterfaceFunctions.GetJobInfoFunction)
fn;
+ ccs.getWorkQueue().schedule(new GetJobInfoWork(ccs,
gjsf.getJobId(),
+ new IPCResponder<JobInfo>(handle, mid)));
+ return;
+ }
+
+ case START_JOB: {
+ HyracksClientInterfaceFunctions.StartJobFunction sjf =
+ (HyracksClientInterfaceFunctions.StartJobFunction) fn;
+ JobId jobId = jobIdFactory.create();
+ ccs.getWorkQueue().schedule(new JobStartWork(ccs,
sjf.getDeploymentId(),
+ sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new
IPCResponder<JobId>(handle, mid)));
+ return;
+ }
+
+ case GET_DATASET_DIRECTORY_SERIVICE_INFO: {
+ ccs.getWorkQueue().schedule(new
GetDatasetDirectoryServiceInfoWork(ccs,
+ new IPCResponder<NetworkAddress>(handle, mid)));
+ return;
+ }
+
+ case GET_DATASET_RESULT_STATUS: {
+ HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction
gdrlf =
+
(HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
+ ccs.getWorkQueue().schedule(new GetResultStatusWork(ccs,
gdrlf.getJobId(),
+ gdrlf.getResultSetId(), new
IPCResponder<Status>(handle, mid)));
+ return;
+ }
+
+ case GET_DATASET_RECORD_DESCRIPTOR:
+ break;
+ case GET_DATASET_RESULT_LOCATIONS: {
+
HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
+
(HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
+ ccs.getWorkQueue().schedule(new
GetResultPartitionLocationsWork(ccs,
+ gdrlf.getJobId(), gdrlf.getResultSetId(),
gdrlf.getKnownRecords(),
+ new IPCResponder<>(handle, mid)));
+ return;
+ }
+
+ case WAIT_FOR_COMPLETION: {
+ HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf
=
+
(HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
+ ccs.getWorkQueue().schedule(new WaitForJobCompletionWork(ccs,
wfcf.getJobId(),
+ new IPCResponder<>(handle, mid)));
+ return;
+ }
+
+ case GET_NODE_CONTROLLERS_INFO: {
+ ccs.getWorkQueue().schedule(new GetNodeControllersInfoWork(ccs,
+ new IPCResponder<>(handle, mid)));
+ return;
+ }
+
+ case GET_CLUSTER_TOPOLOGY: {
+ try {
+ handle.send(mid, ccs.getCCContext().getClusterTopology(),
null);
+ } catch (IPCException e) {
+ e.printStackTrace();
+ }
+ return;
+ }
+
+ case CLI_DEPLOY_BINARY: {
+ HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf =
+
(HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
+ ccs.getWorkQueue().schedule(new CliDeployBinaryWork(ccs,
dbf.getBinaryURLs(),
+ dbf.getDeploymentId(), new IPCResponder<>(handle,
mid)));
+ return;
+ }
+
+ case CLI_UNDEPLOY_BINARY: {
+ HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf
=
+
(HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
+ ccs.getWorkQueue().schedule(new CliUnDeployBinaryWork(ccs,
udbf.getDeploymentId(),
+ new IPCResponder<>(handle, mid)));
+ return;
+ }
+ case CLUSTER_SHUTDOWN: {
+ HyracksClientInterfaceFunctions.ClusterShutdownFunction csf =
+
(HyracksClientInterfaceFunctions.ClusterShutdownFunction) fn;
+ ccs.getWorkQueue().schedule(new ClusterShutdownWork(ccs,
+ csf.isTerminateNCService(), new IPCResponder<>(handle,
mid)));
+ return;
+ }
+
+ case GET_NODE_DETAILS_JSON:
+ HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction
gndjf =
+
(HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction) fn;
+ ccs.getWorkQueue().schedule(new GetNodeDetailsJSONWork(ccs,
gndjf.getNodeId(),
+ gndjf.isIncludeStats(), gndjf.isIncludeConfig(), new
IPCResponder<>(handle, mid)));
+ return;
+
+ case THREAD_DUMP:
+ HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
+ (HyracksClientInterfaceFunctions.ThreadDumpFunction)
fn;
+ ccs.getWorkQueue().schedule(new GetThreadDumpWork(ccs,
tdf.getNode(),
+ new IPCResponder<String>(handle, mid)));
+ return;
+ }
+ try {
+ handle.send(mid, null, new IllegalArgumentException("Unknown
function " + fn.getFunctionId()));
+ } catch (IPCException e) {
+ e.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index ed46b53..ae7472f 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -118,7 +118,7 @@
private DatasetNetworkManager datasetNetworkManager;
- private final WorkQueue queue;
+ private final WorkQueue workQueue;
private final Timer timer;
@@ -181,7 +181,7 @@
FullFrameChannelInterfaceFactory.INSTANCE);
lccm = new LifeCycleComponentManager();
- queue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves
MAX_PRIORITY of the heartbeat thread.
+ workQueue = new WorkQueue(id, Thread.NORM_PRIORITY); // Reserves
MAX_PRIORITY of the heartbeat thread.
jobletMap = new Hashtable<>();
timer = new Timer(true);
serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
@@ -305,7 +305,7 @@
}
appCtx.setDistributedState(nodeParameters.getDistributedState());
- queue.start();
+ workQueue.start();
heartbeatTask = new HeartbeatTask(ccs);
@@ -359,7 +359,7 @@
if (messagingNetManager != null) {
messagingNetManager.stop();
}
- queue.stop();
+ workQueue.stop();
if (ncAppEntryPoint != null) {
ncAppEntryPoint.stop();
}
@@ -414,7 +414,7 @@
}
public WorkQueue getWorkQueue() {
- return queue;
+ return workQueue;
}
public ThreadMXBean getThreadMXBean() {
@@ -497,7 +497,7 @@
try {
FutureValue<List<JobProfile>> fv = new FutureValue<>();
BuildJobProfilesWork bjpw = new
BuildJobProfilesWork(NodeControllerService.this, fv);
- queue.scheduleAndSync(bjpw);
+ workQueue.scheduleAndSync(bjpw);
List<JobProfile> profiles = fv.get();
if (!profiles.isEmpty()) {
cc.reportProfile(id, profiles);
@@ -517,30 +517,30 @@
case SEND_APPLICATION_MESSAGE:
CCNCFunctions.SendApplicationMessageFunction amf =
(CCNCFunctions.SendApplicationMessageFunction) fn;
- queue.schedule(new
ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
+ workQueue.schedule(new
ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
amf.getDeploymentId(), amf.getNodeId()));
return;
case START_TASKS:
CCNCFunctions.StartTasksFunction stf =
(CCNCFunctions.StartTasksFunction) fn;
- queue.schedule(new
StartTasksWork(NodeControllerService.this, stf.getDeploymentId(),
stf.getJobId(),
+ workQueue.schedule(new
StartTasksWork(NodeControllerService.this, stf.getDeploymentId(),
stf.getJobId(),
stf.getPlanBytes(), stf.getTaskDescriptors(),
stf.getConnectorPolicies(), stf.getFlags()));
return;
case ABORT_TASKS:
CCNCFunctions.AbortTasksFunction atf =
(CCNCFunctions.AbortTasksFunction) fn;
- queue.schedule(new
AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
+ workQueue.schedule(new
AbortTasksWork(NodeControllerService.this, atf.getJobId(), atf.getTasks()));
return;
case CLEANUP_JOBLET:
CCNCFunctions.CleanupJobletFunction cjf =
(CCNCFunctions.CleanupJobletFunction) fn;
- queue.schedule(new
CleanupJobletWork(NodeControllerService.this, cjf.getJobId(), cjf.getStatus()));
+ workQueue.schedule(new
CleanupJobletWork(NodeControllerService.this, cjf.getJobId(), cjf.getStatus()));
return;
case REPORT_PARTITION_AVAILABILITY:
CCNCFunctions.ReportPartitionAvailabilityFunction rpaf =
(CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
- queue.schedule(new
ReportPartitionAvailabilityWork(NodeControllerService.this,
+ workQueue.schedule(new
ReportPartitionAvailabilityWork(NodeControllerService.this,
rpaf.getPartitionId(), rpaf.getNetworkAddress()));
return;
@@ -557,18 +557,18 @@
case DEPLOY_BINARY:
CCNCFunctions.DeployBinaryFunction dbf =
(CCNCFunctions.DeployBinaryFunction) fn;
- queue.schedule(new
DeployBinaryWork(NodeControllerService.this, dbf.getDeploymentId(),
+ workQueue.schedule(new
DeployBinaryWork(NodeControllerService.this, dbf.getDeploymentId(),
dbf.getBinaryURLs()));
return;
case UNDEPLOY_BINARY:
CCNCFunctions.UnDeployBinaryFunction ndbf =
(CCNCFunctions.UnDeployBinaryFunction) fn;
- queue.schedule(new
UnDeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId()));
+ workQueue.schedule(new
UnDeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId()));
return;
case STATE_DUMP_REQUEST:
final CCNCFunctions.StateDumpRequestFunction dsrf =
(StateDumpRequestFunction) fn;
- queue.schedule(new
StateDumpWork(NodeControllerService.this, dsrf.getStateDumpId()));
+ workQueue.schedule(new
StateDumpWork(NodeControllerService.this, dsrf.getStateDumpId()));
return;
case SHUTDOWN_REQUEST:
--
To view, visit https://asterix-gerrit.ics.uci.edu/1325
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I403e61cc054a860bef6a71fa04393f4d9c368b36
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>