abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1322
Change subject: Remove ICCContext
......................................................................
Remove ICCContext
Change-Id: I6f6a769741f14e91bcd4b970b4a022c0a453d380
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
D
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
D
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java
8 files changed, 47 insertions(+), 184 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/22/1322/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
index b097244..14b0554 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java
@@ -166,8 +166,10 @@
}
protected IHyracksClientConnection getNewHyracksClientConnection() throws
Exception {
- String strIP =
appCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
- int port =
appCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
+ String strIP = ((ClusterControllerService)
appCtx.getControllerService()).getClusterControllerInfo()
+ .getClientNetAddress();
+ int port = ((ClusterControllerService)
appCtx.getControllerService()).getClusterControllerInfo()
+ .getClientNetPort();
return new HyracksConnection(strIP, port);
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 2ce0383..f3c6ea3 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -42,18 +42,19 @@
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.hdfs.scheduler.Scheduler;
public class HDFSUtils {
public static Scheduler initializeHDFSScheduler() {
- ICCContext ccContext =
AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getCCContext();
+ ClusterControllerService ccs = (ClusterControllerService)
AsterixAppContextInfo.INSTANCE
+ .getCCApplicationContext().getControllerService();
Scheduler scheduler = null;
try {
- scheduler = new
Scheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
- ccContext.getClusterControllerInfo().getClientNetPort());
+ scheduler = new
Scheduler(ccs.getClusterControllerInfo().getClientNetAddress(),
+ ccs.getClusterControllerInfo().getClientNetPort());
} catch (HyracksException e) {
throw new IllegalStateException("Cannot obtain hdfs scheduler");
}
@@ -61,11 +62,12 @@
}
public static IndexingScheduler initializeIndexingHDFSScheduler() {
- ICCContext ccContext =
AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getCCContext();
+ ClusterControllerService ccs = (ClusterControllerService)
AsterixAppContextInfo.INSTANCE
+ .getCCApplicationContext().getControllerService();
IndexingScheduler scheduler = null;
try {
- scheduler = new
IndexingScheduler(ccContext.getClusterControllerInfo().getClientNetAddress(),
- ccContext.getClusterControllerInfo().getClientNetPort());
+ scheduler = new
IndexingScheduler(ccs.getClusterControllerInfo().getClientNetAddress(),
+ ccs.getClusterControllerInfo().getClientNetPort());
} catch (HyracksException e) {
throw new IllegalStateException("Cannot obtain hdfs scheduler");
}
@@ -87,8 +89,8 @@
public static InputSplit[] getSplits(JobConf conf, List<ExternalFile>
files) throws IOException {
// Create file system object
FileSystem fs = FileSystem.get(conf);
- ArrayList<FileSplit> fileSplits = new ArrayList<FileSplit>();
- ArrayList<ExternalFile> orderedExternalFiles = new
ArrayList<ExternalFile>();
+ ArrayList<FileSplit> fileSplits = new ArrayList<>();
+ ArrayList<ExternalFile> orderedExternalFiles = new ArrayList<>();
// Create files splits
for (ExternalFile file : files) {
Path filePath = new Path(file.getFileName());
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
index 2517df5..c10f28d 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/RuntimeUtils.java
@@ -54,7 +54,8 @@
public static Map<InetAddress, Set<String>> getNodeControllerMap() throws
HyracksDataException {
Map<InetAddress, Set<String>> map = new HashMap<>();
-
AsterixAppContextInfo.INSTANCE.getCCApplicationContext().getCCContext().getIPAddressNodeMap(map);
+ map.putAll(((ClusterControllerService)
AsterixAppContextInfo.INSTANCE.getCCApplicationContext()
+ .getControllerService()).getIpAddressNodeNameMap());
return map;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
index 8b83d83..5589ce2 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/application/ICCApplicationContext.java
@@ -20,7 +20,6 @@
import java.io.Serializable;
-import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.job.IJobLifecycleListener;
/**
@@ -55,12 +54,4 @@
* @param jobLifecycleListener
*/
public void addClusterLifecycleListener(IClusterLifecycleListener
clusterLifecycleListener);
-
- /**
- * Get the Cluster Controller Context.
- *
- * @return The Cluster Controller Context.
- */
- public ICCContext getCCContext();
-
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java
deleted file mode 100644
index f9618cf..0000000
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/ICCContext.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.api.context;
-
-import java.net.InetAddress;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hyracks.api.client.ClusterControllerInfo;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.topology.ClusterTopology;
-
-public interface ICCContext {
- public ClusterControllerInfo getClusterControllerInfo();
-
- public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map) throws
HyracksDataException;
-
- public ClusterTopology getClusterTopology();
-}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index ac38523..95b5972 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -42,10 +42,8 @@
import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions;
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.comm.NetworkAddress;
-import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.dataset.DatasetJobRecord.Status;
import org.apache.hyracks.api.deployment.DeploymentId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobIdFactory;
import org.apache.hyracks.api.job.JobInfo;
@@ -64,15 +62,14 @@
import org.apache.hyracks.control.cc.work.ClusterShutdownWork;
import org.apache.hyracks.control.cc.work.GatherStateDumpsWork.StateDumpRun;
import org.apache.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
-import org.apache.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
import org.apache.hyracks.control.cc.work.GetJobInfoWork;
import org.apache.hyracks.control.cc.work.GetJobStatusWork;
import org.apache.hyracks.control.cc.work.GetNodeControllersInfoWork;
-import org.apache.hyracks.control.cc.work.GetThreadDumpWork;
-import org.apache.hyracks.control.cc.work.GetThreadDumpWork.ThreadDumpRun;
import org.apache.hyracks.control.cc.work.GetNodeDetailsJSONWork;
import org.apache.hyracks.control.cc.work.GetResultPartitionLocationsWork;
import org.apache.hyracks.control.cc.work.GetResultStatusWork;
+import org.apache.hyracks.control.cc.work.GetThreadDumpWork;
+import org.apache.hyracks.control.cc.work.GetThreadDumpWork.ThreadDumpRun;
import org.apache.hyracks.control.cc.work.JobStartWork;
import org.apache.hyracks.control.cc.work.JobletCleanupNotificationWork;
import org.apache.hyracks.control.cc.work.NodeHeartbeatWork;
@@ -150,7 +147,7 @@
private final Timer timer;
- private final ICCContext ccContext;
+ private final ClusterTopology topology;
private final DeadNodeSweeper sweeper;
@@ -203,9 +200,8 @@
};
// WorkQueue is in charge of heartbeat as well as other events.
workQueue = new WorkQueue("ClusterController", Thread.MAX_PRIORITY);
- this.timer = new Timer(true);
- final ClusterTopology topology = computeClusterTopology(ccConfig);
- ccContext = new ClusterControllerContext(topology);
+ timer = new Timer(true);
+ topology = computeClusterTopology(ccConfig);
sweeper = new DeadNodeSweeper();
datasetDirectoryService = new
DatasetDirectoryService(ccConfig.resultTTL, ccConfig.resultSweepThreshold);
jobIdFactory = new JobIdFactory();
@@ -249,7 +245,7 @@
}
private void startApplication() throws Exception {
- appCtx = new CCApplicationContext(this, serverCtx, ccContext,
ccConfig.getAppConfig());
+ appCtx = new CCApplicationContext(this, serverCtx,
ccConfig.getAppConfig());
appCtx.addJobLifecycleListener(datasetDirectoryService);
executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
String className = ccConfig.appCCMainClass;
@@ -312,6 +308,7 @@
aep.startupCompleted();
}
}
+
public void stop(boolean terminateNCService) throws Exception {
if (terminateNCService) {
terminateNCServices();
@@ -341,10 +338,6 @@
public ServerContext getServerContext() {
return serverCtx;
- }
-
- public ICCContext getCCContext() {
- return ccContext;
}
public Map<JobId, JobRun> getActiveRunMap() {
@@ -403,32 +396,8 @@
return new NetworkAddress(ccConfig.clientNetIpAddress,
ccConfig.clientNetPort);
}
- private final class ClusterControllerContext implements ICCContext {
- private final ClusterTopology topology;
-
- private ClusterControllerContext(ClusterTopology topology) {
- this.topology = topology;
- }
-
- @Override
- public void getIPAddressNodeMap(Map<InetAddress, Set<String>> map)
throws HyracksDataException {
- GetIpAddressNodeNameMapWork ginmw = new
GetIpAddressNodeNameMapWork(ClusterControllerService.this, map);
- try {
- workQueue.scheduleAndSync(ginmw);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public ClusterControllerInfo getClusterControllerInfo() {
- return info;
- }
-
- @Override
- public ClusterTopology getClusterTopology() {
- return topology;
- }
+ public ClusterTopology getClusterTopology() {
+ return topology;
}
private class DeadNodeSweeper extends TimerTask {
@@ -461,24 +430,21 @@
case CREATE_JOB:
break;
case GET_JOB_STATUS: {
- HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf =
-
(HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
+ HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf
= (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn;
workQueue.schedule(new
GetJobStatusWork(ClusterControllerService.this, gjsf.getJobId(),
new IPCResponder<JobStatus>(handle, mid)));
return;
}
case GET_JOB_INFO: {
- HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf =
-
(HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
+ HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf =
(HyracksClientInterfaceFunctions.GetJobInfoFunction) fn;
workQueue.schedule(new
GetJobInfoWork(ClusterControllerService.this, gjsf.getJobId(),
new IPCResponder<JobInfo>(handle, mid)));
return;
}
case START_JOB: {
- HyracksClientInterfaceFunctions.StartJobFunction sjf =
- (HyracksClientInterfaceFunctions.StartJobFunction)
fn;
+ HyracksClientInterfaceFunctions.StartJobFunction sjf =
(HyracksClientInterfaceFunctions.StartJobFunction) fn;
JobId jobId = jobIdFactory.create();
workQueue.schedule(new
JobStartWork(ClusterControllerService.this, sjf.getDeploymentId(),
sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new
IPCResponder<JobId>(handle, mid)));
@@ -492,8 +458,7 @@
}
case GET_DATASET_RESULT_STATUS: {
-
HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf =
-
(HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
+
HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf =
(HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
workQueue.schedule(new
GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(),
gdrlf.getResultSetId(), new
IPCResponder<Status>(handle, mid)));
return;
@@ -502,8 +467,7 @@
case GET_DATASET_RECORD_DESCRIPTOR:
break;
case GET_DATASET_RESULT_LOCATIONS: {
-
HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
-
(HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
+
HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf =
(HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
workQueue.schedule(new
GetResultPartitionLocationsWork(ClusterControllerService.this,
gdrlf.getJobId(), gdrlf.getResultSetId(),
gdrlf.getKnownRecords(),
new IPCResponder<>(handle, mid)));
@@ -511,10 +475,9 @@
}
case WAIT_FOR_COMPLETION: {
- HyracksClientInterfaceFunctions.WaitForCompletionFunction
wfcf =
-
(HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
+ HyracksClientInterfaceFunctions.WaitForCompletionFunction
wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
workQueue.schedule(new
WaitForJobCompletionWork(ClusterControllerService.this, wfcf.getJobId(),
- new IPCResponder<Object>(handle, mid)));
+ new IPCResponder<>(handle, mid)));
return;
}
@@ -526,7 +489,7 @@
case GET_CLUSTER_TOPOLOGY: {
try {
- handle.send(mid, ccContext.getClusterTopology(), null);
+ handle.send(mid, getClusterTopology(), null);
} catch (IPCException e) {
e.printStackTrace();
}
@@ -534,38 +497,33 @@
}
case CLI_DEPLOY_BINARY: {
- HyracksClientInterfaceFunctions.CliDeployBinaryFunction
dbf =
-
(HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
+ HyracksClientInterfaceFunctions.CliDeployBinaryFunction
dbf = (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn;
workQueue.schedule(new
CliDeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs(),
dbf.getDeploymentId(), new IPCResponder<>(handle,
mid)));
return;
}
case CLI_UNDEPLOY_BINARY: {
- HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction
udbf =
-
(HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
+ HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction
udbf = (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn;
workQueue.schedule(new
CliUnDeployBinaryWork(ClusterControllerService.this, udbf.getDeploymentId(),
new IPCResponder<>(handle, mid)));
return;
}
case CLUSTER_SHUTDOWN: {
- HyracksClientInterfaceFunctions.ClusterShutdownFunction
csf =
-
(HyracksClientInterfaceFunctions.ClusterShutdownFunction) fn;
+ HyracksClientInterfaceFunctions.ClusterShutdownFunction
csf = (HyracksClientInterfaceFunctions.ClusterShutdownFunction) fn;
workQueue.schedule(new
ClusterShutdownWork(ClusterControllerService.this,
csf.isTerminateNCService(), new
IPCResponder<>(handle, mid)));
return;
}
case GET_NODE_DETAILS_JSON:
- HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction
gndjf =
-
(HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction) fn;
+ HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction
gndjf = (HyracksClientInterfaceFunctions.GetNodeDetailsJSONFunction) fn;
workQueue.schedule(new
GetNodeDetailsJSONWork(ClusterControllerService.this, gndjf.getNodeId(),
gndjf.isIncludeStats(), gndjf.isIncludeConfig(),
new IPCResponder<>(handle, mid)));
return;
case THREAD_DUMP:
- HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
-
(HyracksClientInterfaceFunctions.ThreadDumpFunction) fn;
+ HyracksClientInterfaceFunctions.ThreadDumpFunction tdf =
(HyracksClientInterfaceFunctions.ThreadDumpFunction) fn;
workQueue.schedule(new
GetThreadDumpWork(ClusterControllerService.this, tdf.getNode(),
new IPCResponder<String>(handle, mid)));
return;
@@ -637,24 +595,21 @@
}
case REGISTER_PARTITION_PROVIDER: {
- CCNCFunctions.RegisterPartitionProviderFunction rppf =
- (CCNCFunctions.RegisterPartitionProviderFunction)
fn;
+ CCNCFunctions.RegisterPartitionProviderFunction rppf =
(CCNCFunctions.RegisterPartitionProviderFunction) fn;
workQueue.schedule(new
RegisterPartitionAvailibilityWork(ClusterControllerService.this,
rppf.getPartitionDescriptor()));
return;
}
case REGISTER_PARTITION_REQUEST: {
- CCNCFunctions.RegisterPartitionRequestFunction rprf =
- (CCNCFunctions.RegisterPartitionRequestFunction)
fn;
+ CCNCFunctions.RegisterPartitionRequestFunction rprf =
(CCNCFunctions.RegisterPartitionRequestFunction) fn;
workQueue.schedule(new
RegisterPartitionRequestWork(ClusterControllerService.this,
rprf.getPartitionRequest()));
return;
}
case REGISTER_RESULT_PARTITION_LOCATION: {
- CCNCFunctions.RegisterResultPartitionLocationFunction
rrplf =
-
(CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
+ CCNCFunctions.RegisterResultPartitionLocationFunction
rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
workQueue.schedule(new
RegisterResultPartitionLocationWork(ClusterControllerService.this,
rrplf.getJobId(), rrplf.getResultSetId(),
rrplf.getOrderedResult(), rrplf.getEmptyResult(),
rrplf.getPartition(), rrplf.getNPartitions(),
rrplf.getNetworkAddress()));
@@ -662,24 +617,21 @@
}
case REPORT_RESULT_PARTITION_WRITE_COMPLETION: {
- CCNCFunctions.ReportResultPartitionWriteCompletionFunction
rrplf =
-
(CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
+ CCNCFunctions.ReportResultPartitionWriteCompletionFunction
rrplf = (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
workQueue.schedule(new
ReportResultPartitionWriteCompletionWork(ClusterControllerService.this,
rrplf.getJobId(), rrplf.getResultSetId(),
rrplf.getPartition()));
return;
}
case REPORT_RESULT_PARTITION_FAILURE: {
- CCNCFunctions.ReportResultPartitionFailureFunction rrplf =
-
(CCNCFunctions.ReportResultPartitionFailureFunction) fn;
+ CCNCFunctions.ReportResultPartitionFailureFunction rrplf =
(CCNCFunctions.ReportResultPartitionFailureFunction) fn;
workQueue.schedule(new
ReportResultPartitionFailureWork(ClusterControllerService.this,
rrplf.getJobId(), rrplf.getResultSetId(),
rrplf.getPartition()));
return;
}
case SEND_APPLICATION_MESSAGE: {
- CCNCFunctions.SendApplicationMessageFunction rsf =
- (CCNCFunctions.SendApplicationMessageFunction) fn;
+ CCNCFunctions.SendApplicationMessageFunction rsf =
(CCNCFunctions.SendApplicationMessageFunction) fn;
workQueue.schedule(new
ApplicationMessageWork(ClusterControllerService.this, rsf.getMessage(),
rsf.getDeploymentId(), rsf.getNodeId()));
return;
@@ -715,8 +667,7 @@
}
case THREAD_DUMP_RESPONSE: {
- CCNCFunctions.ThreadDumpResponseFunction tdrf =
- (CCNCFunctions.ThreadDumpResponseFunction)fn;
+ CCNCFunctions.ThreadDumpResponseFunction tdrf =
(CCNCFunctions.ThreadDumpResponseFunction) fn;
workQueue.schedule(new
NotifyThreadDumpResponse(ClusterControllerService.this,
tdrf.getRequestId(), tdrf.getThreadDumpJSON()));
return;
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
index dd6f83b..2307cb2 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCApplicationContext.java
@@ -29,7 +29,6 @@
import org.apache.hyracks.api.application.IApplicationConfig;
import org.apache.hyracks.api.application.ICCApplicationContext;
import org.apache.hyracks.api.application.IClusterLifecycleListener;
-import org.apache.hyracks.api.context.ICCContext;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
import org.apache.hyracks.api.job.IJobLifecycleListener;
@@ -42,7 +41,6 @@
import org.apache.hyracks.control.common.work.IResultCallback;
public class CCApplicationContext extends ApplicationContext implements
ICCApplicationContext {
- private final ICCContext ccContext;
protected final Set<String> initPendingNodeIds;
protected final Set<String> deinitPendingNodeIds;
@@ -54,20 +52,14 @@
private List<IClusterLifecycleListener> clusterLifecycleListeners;
private final ClusterControllerService ccs;
- public CCApplicationContext(ClusterControllerService ccs, ServerContext
serverCtx, ICCContext ccContext,
+ public CCApplicationContext(ClusterControllerService ccs, ServerContext
serverCtx,
IApplicationConfig appConfig) throws IOException {
super(serverCtx, appConfig, new
HyracksThreadFactory("ClusterController"));
- this.ccContext = ccContext;
this.ccs = ccs;
initPendingNodeIds = new HashSet<>();
deinitPendingNodeIds = new HashSet<>();
jobLifecycleListeners = new ArrayList<>();
clusterLifecycleListeners = new ArrayList<>();
- }
-
- @Override
- public ICCContext getCCContext() {
- return ccContext;
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java
deleted file mode 100644
index 31a829c..0000000
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetIpAddressNodeNameMapWork.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.control.cc.work;
-
-import java.net.InetAddress;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.common.work.SynchronizableWork;
-
-public class GetIpAddressNodeNameMapWork extends SynchronizableWork {
- private final ClusterControllerService ccs;
- private Map<InetAddress, Set<String>> map;
-
- public GetIpAddressNodeNameMapWork(ClusterControllerService ccs,
Map<InetAddress, Set<String>> map) {
- this.ccs = ccs;
- this.map = map;
- }
-
- @Override
- protected void doRun() throws Exception {
- map.putAll(ccs.getIpAddressNodeNameMap());
- }
-}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1322
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I6f6a769741f14e91bcd4b970b4a022c0a453d380
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>