Michael Blow has submitted this change and it was merged. Change subject: HTTP API++, NCService Termination After Dead NCService ......................................................................
HTTP API++, NCService Termination After Dead NCService - include PID of NC service in NC config API - shutdown NCService even when NC is unresponsive (e.g. dead) when shutdown?all=true - include PIDs of NC service & NC in shutdown response document - interrogate cluster state from client helper - advertise shutdown, version cluster HTTP APIs - fix possible bad return code from stop-sample-cluster in case of very fast CCDriver exit - cleanup transposed JUnit assert args in SampleLocalClusterIT Change-Id: Ia48eb16696e48444f610fd5e8d7d4666d0257a38 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1193 Tested-by: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> Integration-Tests: Jenkins <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ShutdownAPIServlet.java M asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm M asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_3/cluster_state_3.1.adm M asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_4/cluster_state_4.1.adm M asterixdb/asterix-client-helper/pom.xml M asterixdb/asterix-client-helper/src/main/java/org/apache/asterix/clienthelper/commands/ClientCommand.java M asterixdb/asterix-client-helper/src/main/java/org/apache/asterix/clienthelper/commands/GetClusterStateCommand.java M asterixdb/asterix-client-helper/src/main/java/org/apache/asterix/clienthelper/commands/RemoteCommand.java M asterixdb/asterix-client-helper/src/main/java/org/apache/asterix/clienthelper/commands/WaitForClusterCommand.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java M asterixdb/asterix-server/src/main/samples/local/bin/start-sample-cluster.bat M asterixdb/asterix-server/src/main/samples/local/bin/start-sample-cluster.sh M asterixdb/asterix-server/src/main/samples/local/bin/stop-sample-cluster.bat M asterixdb/asterix-server/src/main/samples/local/bin/stop-sample-cluster.sh M asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ShutdownNCServiceWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java A hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ServiceConstants.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java 24 files changed, 429 insertions(+), 180 deletions(-) Approvals: Till Westmann: Looks good to me, approved Jenkins: Verified; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java index 0d9ff16..7e7b068 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ClusterAPIServlet.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.regex.Pattern; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; @@ -44,6 +45,10 @@ public static final String CONFIG_URI_KEY = "configUri"; public static final String STATS_URI_KEY = "statsUri"; public static final String THREAD_DUMP_URI_KEY = "threadDumpUri"; + public static final String SHUTDOWN_URI_KEY = "shutdownUri"; + public static final String FULL_SHUTDOWN_URI_KEY = "fullShutdownUri"; + public static final String VERSION_URI_KEY = "versionUri"; + public static final Pattern PARENT_DIR = Pattern.compile("/[^./]+/\\.\\./"); @Override public final void doGet(HttpServletRequest request, HttpServletResponse response) { @@ -93,17 +98,13 @@ json.put("config", allProperties); JSONArray ncs = json.getJSONArray("ncs"); - final StringBuffer requestURL = request.getRequestURL(); + final StringBuilder requestURL = new StringBuilder(request.getRequestURL()); if (requestURL.charAt(requestURL.length() - 1) != '/') { requestURL.append('/'); } requestURL.append(pathToNode); - String clusterURL = ""; - String newClusterURL = requestURL.toString(); - while (!clusterURL.equals(newClusterURL)) { - clusterURL = newClusterURL; - newClusterURL = clusterURL.replaceAll("/[^./]+/\\.\\./", "/"); - } + String clusterURL = canonicalize(requestURL); + String analyticsURL = canonicalize(clusterURL + "../"); String nodeURL = clusterURL + "node/"; for (int i = 0; i < ncs.length(); i++) { JSONObject nc = ncs.getJSONObject(i); @@ -121,6 +122,19 @@ cc.put(CONFIG_URI_KEY, clusterURL + "cc/config"); cc.put(STATS_URI_KEY, clusterURL + "cc/stats"); cc.put(THREAD_DUMP_URI_KEY, clusterURL + "cc/threaddump"); + json.put(SHUTDOWN_URI_KEY, analyticsURL + "shutdown"); + json.put(FULL_SHUTDOWN_URI_KEY, analyticsURL + "shutdown?all=true"); + json.put(VERSION_URI_KEY, analyticsURL + "version"); return json; } + + private String canonicalize(CharSequence requestURL) { + String clusterURL = ""; + String newClusterURL = requestURL.toString(); + while (!clusterURL.equals(newClusterURL)) { + clusterURL = newClusterURL; + newClusterURL = PARENT_DIR.matcher(clusterURL).replaceAll("/"); + } + return clusterURL; + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ShutdownAPIServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ShutdownAPIServlet.java index d8cb553..d99b379 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ShutdownAPIServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ShutdownAPIServlet.java @@ -34,10 +34,15 @@ import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.runtime.util.ClusterStateManager; import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.json.JSONArray; import org.json.JSONObject; public class ShutdownAPIServlet extends HttpServlet { private static final long serialVersionUID = 1L; + public static final String NODE_ID_KEY = "node_id"; + public static final String NCSERVICE_PID = "ncservice_pid"; + public static final String INI = "ini"; + public static final String PID = "pid"; @Override protected void doPost(HttpServletRequest request, HttpServletResponse response) @@ -61,7 +66,18 @@ try { jsonObject.put("status", "SHUTTING_DOWN"); jsonObject.put("date", new Date()); - jsonObject.put("cluster", ClusterStateManager.INSTANCE.getClusterStateDescription()); + JSONObject clusterState = ClusterStateManager.INSTANCE.getClusterStateDescription(); + JSONArray ncs = clusterState.getJSONArray("ncs"); + for (int i = 0; i < ncs.length(); i++) { + JSONObject nc = ncs.getJSONObject(i); + String node = nc.getString(NODE_ID_KEY); + JSONObject details = new JSONObject(hcc.getNodeDetailsJSON(node, false, true)); + nc.put(PID, details.get(PID)); + if (details.has(INI) && details.getJSONObject(INI).has(NCSERVICE_PID)) { + nc.put(NCSERVICE_PID, details.getJSONObject(INI).getInt(NCSERVICE_PID)); + } + } + jsonObject.put("cluster", clusterState); final PrintWriter writer = response.getWriter(); writer.print(jsonObject.toString(4)); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm index 8b66885..ea002ab 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.adm @@ -54,15 +54,23 @@ "web.queryinterface.port": 19006, "web.secondary.port": 19005 }, + "fullShutdownUri": "http://127.0.0.1:19002/admin/shutdown?all=true", "metadata_node": "asterix_nc1", "ncs": [ { "configUri": "http://127.0.0.1:19002/admin/cluster/node/asterix_nc1/config", "node_id": "asterix_nc1", "partitions": [ - "partition_0", - "partition_1" + { + "active": true, + "partition_id": "partition_0" + }, + { + "active": true, + "partition_id": "partition_1" + } ], + "state": "ACTIVE", "statsUri": "http://127.0.0.1:19002/admin/cluster/node/asterix_nc1/stats", "threadDumpUri": "http://127.0.0.1:19002/admin/cluster/node/asterix_nc1/threaddump" }, @@ -70,12 +78,21 @@ "configUri": "http://127.0.0.1:19002/admin/cluster/node/asterix_nc2/config", "node_id": "asterix_nc2", "partitions": [ - "partition_2", - "partition_3" + { + "active": true, + "partition_id": "partition_2" + }, + { + "active": true, + "partition_id": "partition_3" + } ], + "state": "ACTIVE", "statsUri": "http://127.0.0.1:19002/admin/cluster/node/asterix_nc2/stats", "threadDumpUri": "http://127.0.0.1:19002/admin/cluster/node/asterix_nc2/threaddump" } ], - "state": "ACTIVE" -} + "shutdownUri": "http://127.0.0.1:19002/admin/shutdown", + "state": "ACTIVE", + "versionUri": "http://127.0.0.1:19002/admin/version" +} \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_3/cluster_state_3.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_3/cluster_state_3.1.adm index 4357d25..b0815ee 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_3/cluster_state_3.1.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_3/cluster_state_3.1.adm @@ -3,9 +3,16 @@ "configUri": "http://127.0.0.1:19002/admin/cluster/node/asterix_nc1/config", "node_id": "asterix_nc1", "partitions": [ - "partition_0", - "partition_1" + { + "active": true, + "partition_id": "partition_0" + }, + { + "active": true, + "partition_id": "partition_1" + } ], + "state": "ACTIVE", "statsUri": "http://127.0.0.1:19002/admin/cluster/node/asterix_nc1/stats", "threadDumpUri": "http://127.0.0.1:19002/admin/cluster/node/asterix_nc1/threaddump" }, @@ -13,9 +20,16 @@ "configUri": "http://127.0.0.1:19002/admin/cluster/node/asterix_nc2/config", "node_id": "asterix_nc2", "partitions": [ - "partition_2", - "partition_3" + { + "active": true, + "partition_id": "partition_2" + }, + { + "active": true, + "partition_id": "partition_3" + } ], + "state": "ACTIVE", "statsUri": "http://127.0.0.1:19002/admin/cluster/node/asterix_nc2/stats", "threadDumpUri": "http://127.0.0.1:19002/admin/cluster/node/asterix_nc2/threaddump" } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_4/cluster_state_4.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_4/cluster_state_4.1.adm index c893534..4635690 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_4/cluster_state_4.1.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_4/cluster_state_4.1.adm @@ -2,9 +2,16 @@ "configUri": "http://127.0.0.1:19002/admin/cluster/node/asterix_nc1/config", "node_id": "asterix_nc1", "partitions": [ - "partition_0", - "partition_1" + { + "active": true, + "partition_id": "partition_0" + }, + { + "active": true, + "partition_id": "partition_1" + } ], + "state": "ACTIVE", "statsUri": "http://127.0.0.1:19002/admin/cluster/node/asterix_nc1/stats", "threadDumpUri": "http://127.0.0.1:19002/admin/cluster/node/asterix_nc1/threaddump" } diff --git a/asterixdb/asterix-client-helper/pom.xml b/asterixdb/asterix-client-helper/pom.xml index 1fbc0ec..e082629 100644 --- a/asterixdb/asterix-client-helper/pom.xml +++ b/asterixdb/asterix-client-helper/pom.xml @@ -97,5 +97,15 @@ <artifactId>appassembler-booter</artifactId> <version>1.10</version> </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>2.4</version> + </dependency> + <dependency> + <groupId>org.json</groupId> + <artifactId>json</artifactId> + <version>20090211</version> + </dependency> </dependencies> </project> diff --git a/asterixdb/asterix-client-helper/src/main/java/org/apache/asterix/clienthelper/commands/ClientCommand.java b/asterixdb/asterix-client-helper/src/main/java/org/apache/asterix/clienthelper/commands/ClientCommand.java index c86ea36..979cd55 100644 --- a/asterixdb/asterix-client-helper/src/main/java/org/apache/asterix/clienthelper/commands/ClientCommand.java +++ b/asterixdb/asterix-client-helper/src/main/java/org/apache/asterix/clienthelper/commands/ClientCommand.java @@ -28,8 +28,8 @@ public abstract class ClientCommand { public enum Command { - GET_CLUSTER_STATE("Get state of cluster (errorcode 0 = UP, non-zero = DOWN)"), - WAIT_FOR_CLUSTER("Wait for cluster to be ready (errorcode 0 = UP, non-zero = UNKNOWN)"), + GET_CLUSTER_STATE("Get state of cluster (errorcode 0 = ACTIVE, 1 = DOWN, 2 = UNUSABLE, 3 = OTHER)"), + WAIT_FOR_CLUSTER("Wait for cluster to be ready (errorcode 0 = ACTIVE, non-zero = UNKNOWN)"), SHUTDOWN_CLUSTER("Instructs the cluster to shut down, leaving NCService processes intact"), SHUTDOWN_CLUSTER_ALL("Instructs the cluster to shut down, including NCService processes"); diff --git a/asterixdb/asterix-client-helper/src/main/java/org/apache/asterix/clienthelper/commands/GetClusterStateCommand.java b/asterixdb/asterix-client-helper/src/main/java/org/apache/asterix/clienthelper/commands/GetClusterStateCommand.java index 275583e..77ed78f 100644 --- a/asterixdb/asterix-client-helper/src/main/java/org/apache/asterix/clienthelper/commands/GetClusterStateCommand.java +++ b/asterixdb/asterix-client-helper/src/main/java/org/apache/asterix/clienthelper/commands/GetClusterStateCommand.java @@ -19,10 +19,15 @@ package org.apache.asterix.clienthelper.commands; import java.io.IOException; +import java.net.HttpURLConnection; +import java.nio.charset.StandardCharsets; import javax.servlet.http.HttpServletResponse; import org.apache.asterix.clienthelper.Args; +import org.apache.commons.io.IOUtils; +import org.json.JSONException; +import org.json.JSONObject; public class GetClusterStateCommand extends RemoteCommand { @@ -38,18 +43,32 @@ @Override public int execute() throws IOException { log("Attempting to determine state of cluster " + hostPort + "..."); - int statusCode = tryGet(args.getClusterStatePath()); - // TODO (mblow): interrogate result to determine cluster readiness, not rely on HTTP 200 - switch (statusCode) { - case HttpServletResponse.SC_OK: - logState("UP"); - return 0; - case -1: - logState("DOWN"); - return 1; - default: - logState("UNKNOWN"); - return 2; + HttpURLConnection conn; + // 0 = ACTIVE, 1 = DOWN, 2 = UNUSABLE, 3 = OTHER + try { + conn = openConnection(args.getClusterStatePath(), Method.GET); + if (conn.getResponseCode() == HttpServletResponse.SC_OK) { + String result = IOUtils.toString(conn.getInputStream(), StandardCharsets.UTF_8.name()); + JSONObject json = new JSONObject(result); + final String state = json.getString("state"); + logState(state); + switch (state) { + case "ACTIVE": + return 0; + case "UNUSABLE": + return 2; + default: + return 3; + } + } + logState("UNKNOWN (HTTP error code: " + conn.getResponseCode() + ")"); + return 3; + } catch (IOException e) { // NOSONAR - log or rethrow exception + logState("DOWN"); + return 1; + } catch (JSONException e) { // NOSONAR - log or rethrow exception + logState("UNKNOWN (malformed response)"); + return 3; } } } diff --git a/asterixdb/asterix-client-helper/src/main/java/org/apache/asterix/clienthelper/commands/RemoteCommand.java b/asterixdb/asterix-client-helper/src/main/java/org/apache/asterix/clienthelper/commands/RemoteCommand.java index 77ccbb4..031a721 100644 --- a/asterixdb/asterix-client-helper/src/main/java/org/apache/asterix/clienthelper/commands/RemoteCommand.java +++ b/asterixdb/asterix-client-helper/src/main/java/org/apache/asterix/clienthelper/commands/RemoteCommand.java @@ -19,6 +19,7 @@ package org.apache.asterix.clienthelper.commands; import java.io.IOException; +import java.io.InputStream; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; @@ -40,12 +41,9 @@ @SuppressWarnings("squid:S1166") // log or rethrow exception protected int tryConnect(String path, Method method) throws MalformedURLException { - URL url = new URL("http://" + hostPort + "/" + path); try { - HttpURLConnection conn = (HttpURLConnection)url.openConnection(); - conn.setRequestMethod(method.name()); + HttpURLConnection conn = openConnection(path, method); return conn.getResponseCode(); - } catch (IOException e) { return -1; } @@ -58,4 +56,15 @@ protected int tryPost(String path) throws MalformedURLException { return tryConnect(path, Method.POST); } + + protected InputStream openAsStream(String path, Method method) throws IOException { + return openConnection(path, method).getInputStream(); + } + + protected HttpURLConnection openConnection(String path, Method method) throws IOException { + URL url = new URL("http://" + hostPort + "/" + path); + HttpURLConnection conn = (HttpURLConnection)url.openConnection(); + conn.setRequestMethod(method.name()); + return conn; + } } diff --git a/asterixdb/asterix-client-helper/src/main/java/org/apache/asterix/clienthelper/commands/WaitForClusterCommand.java b/asterixdb/asterix-client-helper/src/main/java/org/apache/asterix/clienthelper/commands/WaitForClusterCommand.java index fda22ee..390ce7b 100644 --- a/asterixdb/asterix-client-helper/src/main/java/org/apache/asterix/clienthelper/commands/WaitForClusterCommand.java +++ b/asterixdb/asterix-client-helper/src/main/java/org/apache/asterix/clienthelper/commands/WaitForClusterCommand.java @@ -19,10 +19,15 @@ package org.apache.asterix.clienthelper.commands; import java.io.IOException; +import java.net.HttpURLConnection; +import java.nio.charset.StandardCharsets; import javax.servlet.http.HttpServletResponse; import org.apache.asterix.clienthelper.Args; +import org.apache.commons.io.IOUtils; +import org.json.JSONException; +import org.json.JSONObject; public class WaitForClusterCommand extends RemoteCommand { @@ -38,23 +43,41 @@ + "for cluster " + hostPort + " to be available."); long startTime = System.currentTimeMillis(); + boolean first = true; + String lastState = null; while (true) { - if (tryGet(args.getClusterStatePath()) == HttpServletResponse.SC_OK) { - log("Cluster started."); - return 0; + if (!first) { + if (args.getTimeoutSecs() >= 0 + && (startTime + (args.getTimeoutSecs() * 1000) < System.currentTimeMillis())) { + break; + } + try { + Thread.sleep(500); + } catch (InterruptedException e) { + return 22; + } } - if (args.getTimeoutSecs() >= 0 - && (startTime + (args.getTimeoutSecs() * 1000) < System.currentTimeMillis())) { - break; - } + first = false; + + HttpURLConnection conn; try { - Thread.sleep(500); - } catch (InterruptedException e) { - return 22; + conn = openConnection(args.getClusterStatePath(), Method.GET); + if (conn.getResponseCode() == HttpServletResponse.SC_OK) { + String result = IOUtils.toString(conn.getInputStream(), StandardCharsets.UTF_8.name()); + JSONObject json = new JSONObject(result); + lastState = json.getString("state"); + if ("ACTIVE".equals(lastState)) { + log("Cluster started and is ACTIVE."); + return 0; + } + } + } catch (JSONException |IOException e) { //NOSONAR - log or rethrow exception + // ignore exception, try again } } log("Cluster " + hostPort + " was not available before timeout of " + args.getTimeoutSecs() - + " seconds was exhausted."); + + " seconds was exhausted" + (lastState != null ? " (state: " + lastState + ")" : "") + + "; check logs for more information"); return 1; } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java index 487b8fa..bc15788 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/util/ClusterStateManager.java @@ -629,10 +629,23 @@ for (Map.Entry<String, ClusterPartition[]> entry : node2PartitionsMap.entrySet()) { JSONObject nodeJSON = new JSONObject(); nodeJSON.put("node_id", entry.getKey()); - List<String> partitions = new ArrayList<>(); + boolean allActive = true; + boolean anyActive = false; + Set<Map<String, Object>> partitions = new HashSet<>(); for (ClusterPartition part : entry.getValue()) { - partitions.add("partition_" + part.getPartitionId()); + HashMap<String, Object> partition = new HashMap<>(); + partition.put("partition_id", "partition_" + part.getPartitionId()); + partition.put("active", part.isActive()); + partitions.add(partition); + allActive = allActive && part.isActive(); + if (allActive) { + anyActive = true; + } } + nodeJSON.put("state", failedNodes.contains(entry.getKey()) ? "FAILED" + : allActive ? "ACTIVE" + : anyActive ? "PARTIALLY_ACTIVE" + : "INACTIVE"); nodeJSON.put("partitions", partitions); stateDescription.accumulate("ncs", nodeJSON); } diff --git a/asterixdb/asterix-server/src/main/samples/local/bin/start-sample-cluster.bat b/asterixdb/asterix-server/src/main/samples/local/bin/start-sample-cluster.bat index db86560..589d55e 100644 --- a/asterixdb/asterix-server/src/main/samples/local/bin/start-sample-cluster.bat +++ b/asterixdb/asterix-server/src/main/samples/local/bin/start-sample-cluster.bat @@ -43,6 +43,7 @@ echo CLUSTERDIR=%CLUSTERDIR% echo INSTALLDIR=%INSTALLDIR% +echo LOGSDIR=%LOGSDIR% echo. cd %CLUSTERDIR% if NOT EXIST %LOGSDIR% ( @@ -64,21 +65,18 @@ start /MIN "cc" cmd /c "echo See output in %LOGSDIR%\cc.log && %INSTALLDIR%\bin\${CC_COMMAND} -config-file %CLUSTERDIR%\conf\cc.conf >>%LOGSDIR%\cc.log 2>&1" echo. -echo Waiting for sample cluster [localhost:${LISTEN_PORT}] to be ready... -call %INSTALLDIR%\bin\${HELPER_COMMAND} wait_for_cluster -quiet -timeout 30 -if %ERRORLEVEL% NEQ 0 ( - echo ERROR: cluster did not start successfully - echo See output in %LOGSDIR%\ - goto :ERROR +call %INSTALLDIR%\bin\${HELPER_COMMAND} wait_for_cluster -timeout 30 +if %ERRORLEVEL% EQU 0 ( + goto :END ) -echo Sample cluster [localhost:${LISTEN_PORT}] is ready... -echo. -echo See output in %LOGSDIR% -echo. -popd -endlocal :ERROR +echo. popd endlocal exit /B 1 + +:END +echo. +popd +endlocal diff --git a/asterixdb/asterix-server/src/main/samples/local/bin/start-sample-cluster.sh b/asterixdb/asterix-server/src/main/samples/local/bin/start-sample-cluster.sh index e97f900..53ceff8 100755 --- a/asterixdb/asterix-server/src/main/samples/local/bin/start-sample-cluster.sh +++ b/asterixdb/asterix-server/src/main/samples/local/bin/start-sample-cluster.sh @@ -44,6 +44,7 @@ echo "CLUSTERDIR=$CLUSTERDIR" echo "INSTALLDIR=$INSTALLDIR" +echo "LOGSDIR=$LOGSDIR" echo cd $CLUSTERDIR mkdir -p $LOGSDIR @@ -59,11 +60,5 @@ $INSTALLDIR/bin/${NC_SERVICE_COMMAND} -logdir - -config-file $CLUSTERDIR/conf/blue.conf >> $LOGSDIR/blue-service.log 2>&1 & $INSTALLDIR/bin/${NC_SERVICE_COMMAND} -logdir - >> $LOGSDIR/red-service.log 2>&1 & $INSTALLDIR/bin/${CC_COMMAND} -config-file $CLUSTERDIR/conf/cc.conf >> $LOGSDIR/cc.log 2>&1 & -echo "Waiting for sample cluster (localhost:${LISTEN_PORT}) to be ready..." -if $INSTALLDIR/bin/${HELPER_COMMAND} wait_for_cluster -quiet -timeout 30; -then - echo "Sample cluster (localhost:${LISTEN_PORT}) is ready..." -else - echo "ERROR: cluster did not start successfully" -fi -echo "See output in $LOGSDIR/" +$INSTALLDIR/bin/${HELPER_COMMAND} wait_for_cluster -timeout 30 +exit $? diff --git a/asterixdb/asterix-server/src/main/samples/local/bin/stop-sample-cluster.bat b/asterixdb/asterix-server/src/main/samples/local/bin/stop-sample-cluster.bat index d1f31b0..8e35c5f 100644 --- a/asterixdb/asterix-server/src/main/samples/local/bin/stop-sample-cluster.bat +++ b/asterixdb/asterix-server/src/main/samples/local/bin/stop-sample-cluster.bat @@ -37,7 +37,7 @@ set INSTALLDIR=%cd% call %INSTALLDIR%\bin\${HELPER_COMMAND} get_cluster_state -quiet -if %ERRORLEVEL% EQU 0 ( +if %ERRORLEVEL% NEQ 1 ( call %INSTALLDIR%\bin\${HELPER_COMMAND} shutdown_cluster_all ) else ( echo WARNING: sample cluster does not appear to be running, will attempt to wait for diff --git a/asterixdb/asterix-server/src/main/samples/local/bin/stop-sample-cluster.sh b/asterixdb/asterix-server/src/main/samples/local/bin/stop-sample-cluster.sh index 462d53d..b69ee53 100755 --- a/asterixdb/asterix-server/src/main/samples/local/bin/stop-sample-cluster.sh +++ b/asterixdb/asterix-server/src/main/samples/local/bin/stop-sample-cluster.sh @@ -40,9 +40,8 @@ CLUSTERDIR=$(cd $DIRNAME/..; echo $PWD) INSTALLDIR=$(cd $CLUSTERDIR/../..; echo $PWD) - -if $INSTALLDIR/bin/${HELPER_COMMAND} get_cluster_state -quiet; -then +$INSTALLDIR/bin/${HELPER_COMMAND} get_cluster_state -quiet +if [ $? -ne 1 ]; then $INSTALLDIR/bin/${HELPER_COMMAND} shutdown_cluster_all else echo "WARNING: sample cluster does not appear to be running, will attempt to wait for" @@ -59,4 +58,4 @@ sleep 2s echo -n . done -[ ! $first ] && echo ".done." +[ ! $first ] && echo ".done." || true diff --git a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java index a49086b..4a5f90f 100644 --- a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java +++ b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java @@ -80,7 +80,7 @@ public void test0_startCluster() throws Exception { Process process = new ProcessBuilder(joinPath(LOCAL_SAMPLES_DIR, "bin/start-sample-cluster.sh")) .inheritIO().start(); - Assert.assertEquals(process.waitFor(), 0); + Assert.assertEquals(0, process.waitFor()); } @Test @@ -90,14 +90,14 @@ "http://127.0.0.1:19002" + Servlets.AQL_QUERY.getPath(), Collections.emptyList()); StringWriter sw = new StringWriter(); IOUtils.copy(resultStream, sw); - Assert.assertEquals(sw.toString().trim(), "2"); + Assert.assertEquals("2", sw.toString().trim()); } @Test public void test2_stopCluster() throws Exception { Process process = new ProcessBuilder(joinPath(LOCAL_SAMPLES_DIR, "bin/stop-sample-cluster.sh")) .inheritIO().start(); - Assert.assertEquals(process.waitFor(), 0); + Assert.assertEquals(0, process.waitFor()); try { new URL("http://127.0.0.1:19002").openConnection().connect(); Assert.assertTrue("Expected connection to be refused.", false); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java index 685a36b..4fde1f6 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java @@ -22,6 +22,7 @@ import java.io.FileReader; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; @@ -87,6 +88,7 @@ import org.apache.hyracks.control.cc.work.ReportProfilesWork; import org.apache.hyracks.control.cc.work.ReportResultPartitionFailureWork; import org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork; +import org.apache.hyracks.control.cc.work.ShutdownNCServiceWork; import org.apache.hyracks.control.cc.work.TaskCompleteWork; import org.apache.hyracks.control.cc.work.TaskFailureWork; import org.apache.hyracks.control.cc.work.TriggerNCWork; @@ -114,7 +116,7 @@ import org.xml.sax.InputSource; public class ClusterControllerService implements IControllerService { - private static Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName()); + private static final Logger LOGGER = Logger.getLogger(ClusterControllerService.class.getName()); private final CCConfig ccConfig; @@ -279,12 +281,43 @@ } } + private void terminateNCServices() throws Exception { + Ini ini = ccConfig.getIni(); + if (ini == null) { + return; + } + List<ShutdownNCServiceWork> shutdownNCServiceWorks = new ArrayList<>(); + for (String section : ini.keySet()) { + if (!section.startsWith("nc/")) { + continue; + } + String ncid = section.substring(3); + String address = IniUtils.getString(ini, section, "address", null); + int port = IniUtils.getInt(ini, section, "port", 9090); + if (address == null) { + address = InetAddress.getLoopbackAddress().getHostAddress(); + } + ShutdownNCServiceWork shutdownWork = new ShutdownNCServiceWork(address, port, ncid); + workQueue.schedule(shutdownWork); + shutdownNCServiceWorks.add(shutdownWork); + } + for (ShutdownNCServiceWork shutdownWork : shutdownNCServiceWorks) { + shutdownWork.sync(); + } + } + private void notifyApplication() throws Exception { if (aep != null) { // Sometimes, there is no application entry point. Check hyracks-client project aep.startupCompleted(); } } + public void stop(boolean terminateNCService) throws Exception { + if (terminateNCService) { + terminateNCServices(); + } + stop(); + } @Override public void stop() throws Exception { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java index 63ca4ae..29e1f83 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java @@ -53,7 +53,7 @@ throw new IPCException("Shutdown in Progress"); } Map<String, NodeControllerState> nodeControllerStateMap = ccs.getNodeMap(); - Set<String> nodeIds = new TreeSet<String>(); + Set<String> nodeIds = new TreeSet<>(); nodeIds.addAll(nodeControllerStateMap.keySet()); /** * set up our listener for the node ACKs @@ -70,33 +70,22 @@ @Override public void run() { try { - /** + /* * wait for all our acks */ boolean cleanShutdown = shutdownStatus.waitForCompletion(); - if (cleanShutdown) { - callback.setValue(true); - ccs.stop(); - LOGGER.info("JVM Exiting.. Bye!"); - Runtime rt = Runtime.getRuntime(); - rt.exit(0); + if (!cleanShutdown) { + /* + * best effort - just exit, user will have to kill misbehaving NCs + */ + LOGGER.severe("Clean shutdown of NCs timed out- giving up! Unresponsive nodes: " + + shutdownStatus.getRemainingNodes()); } - /** - * best effort - just exit, user will have to kill misbehaving NCs - */ - else { - LOGGER.severe("Clean shutdown of NCs timed out- CC bailing out!"); - StringBuilder unresponsive = new StringBuilder(); - for (String s : shutdownStatus.getRemainingNodes()) { - unresponsive.append(s).append(' '); - } - LOGGER.severe("Unresponsive Nodes: " + unresponsive); - callback.setValue(false); - ccs.stop(); - LOGGER.info("JVM Exiting.. Bye!"); - Runtime rt = Runtime.getRuntime(); - rt.exit(1); - } + callback.setValue(cleanShutdown); + ccs.stop(terminateNCService); + LOGGER.info("JVM Exiting.. Bye!"); + Runtime rt = Runtime.getRuntime(); + rt.exit(cleanShutdown ? 0 : 1); } catch (Exception e) { callback.setException(e); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ShutdownNCServiceWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ShutdownNCServiceWork.java new file mode 100644 index 0000000..dfc22b1 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ShutdownNCServiceWork.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.cc.work; + +import static org.apache.hyracks.control.common.controllers.ServiceConstants.NC_SERVICE_MAGIC_COOKIE; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.net.Socket; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.hyracks.control.common.controllers.ServiceConstants.ServiceCommand; +import org.apache.hyracks.control.common.work.SynchronizableWork; + +/** + * A work which is run at CC shutdown for each NC specified in the configuration file. + * It contacts the NC service on each node and instructs it to terminate. + */ +public class ShutdownNCServiceWork extends SynchronizableWork { + + private static final Logger LOGGER = Logger.getLogger(ShutdownNCServiceWork.class.getName()); + + private final String ncHost; + private final int ncPort; + private final String ncId; + + public ShutdownNCServiceWork(String ncHost, int ncPort, String ncId) { + this.ncHost = ncHost; + this.ncPort = ncPort; + this.ncId = ncId; + } + @Override + public final void doRun() { + LOGGER.info("Connecting to NC service '" + ncId + "' at " + ncHost + ":" + ncPort); + try (Socket s = new Socket(ncHost, ncPort)) { + ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream()); + oos.writeUTF(NC_SERVICE_MAGIC_COOKIE); + oos.writeUTF(ServiceCommand.TERMINATE.name()); + oos.close(); + } catch (IOException e) { + LOGGER.log(Level.WARNING, "Failed to contact NC service '" + ncId + "' at " + ncHost + ":" + ncPort, e); + } + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java index 9a52a06..a7bca25 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TriggerNCWork.java @@ -18,6 +18,8 @@ */ package org.apache.hyracks.control.cc.work; +import static org.apache.hyracks.control.common.controllers.ServiceConstants.NC_SERVICE_MAGIC_COOKIE; + import java.io.IOException; import java.io.ObjectOutputStream; import java.io.StringWriter; @@ -26,6 +28,7 @@ import java.util.logging.Logger; import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.common.controllers.ServiceConstants.ServiceCommand; import org.apache.hyracks.control.common.work.AbstractWork; import org.ini4j.Ini; @@ -35,11 +38,6 @@ */ public class TriggerNCWork extends AbstractWork { - // This constant must match the corresponding constant in - // hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java - // I didn't want to introduce a Maven-level dependency on the - // hyracks-nc-service package (or vice-versa). - public static final String NC_MAGIC_COOKIE = "hyncmagic"; private static final Logger LOGGER = Logger.getLogger(TriggerNCWork.class.getName()); private final ClusterControllerService ccs; @@ -55,32 +53,25 @@ } @Override public final void run() { - ccs.getExecutor().execute(new Runnable() { - @Override - @SuppressWarnings("squid:S2142") // suppress interrupted exception warning - public void run() { - while (true) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Connecting NC service '" + ncId + "' at " + ncHost + ":" + ncPort); - } - try (Socket s = new Socket(ncHost, ncPort)) { - ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream()); - oos.writeUTF(NC_MAGIC_COOKIE); - oos.writeUTF(serializeIni(ccs.getCCConfig().getIni())); - oos.close(); - break; - // QQQ Should probably have an ACK here - } catch (IOException e) { - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.log(Level.WARNING, "Failed to contact NC service at " + ncHost + - ":" + ncPort + "; will retry", e); - } - } - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - break; - } + ccs.getExecutor().execute(() -> { + while (true) { + LOGGER.info("Connecting NC service '" + ncId + "' at " + ncHost + ":" + ncPort); + try (Socket s = new Socket(ncHost, ncPort)) { + ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream()); + oos.writeUTF(NC_SERVICE_MAGIC_COOKIE); + oos.writeUTF(ServiceCommand.START_NC.name()); + oos.writeUTF(TriggerNCWork.this.serializeIni(ccs.getCCConfig().getIni())); + oos.close(); + return; + // QQQ Should probably have an ACK here + } catch (IOException e) { + LOGGER.log(Level.WARNING, "Failed to contact NC service at " + ncHost + ":" + ncPort + + "; will retry", e); + } + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + return; } } }); @@ -95,7 +86,7 @@ ccini.store(iniString); // Finally insert *this* NC's name into localnc section - this is a fixed // entry point so that NCs can determine where all their config is. - iniString.append("\n[localnc]\nid=" + ncId + "\n"); + iniString.append("\n[localnc]\nid=").append(ncId).append("\n"); if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine("Returning Ini file:\n" + iniString.toString()); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java index 64b9737..de336ed 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NCConfig.java @@ -129,6 +129,9 @@ + " (default: same as -messaging-port; must set -messaging-public-port also)", required = false) public int messagingPublicPort = 0; + @Option(name = "-ncservice-pid", usage = "PID of the NCService which launched this NCDriver", required = false) + public int ncservicePid = 0; + @Argument @Option(name = "--", handler = StopOptionHandler.class) public List<String> appArgs; @@ -258,5 +261,8 @@ if (appNCMainClass != null) { configuration.put("app-nc-main-class", appNCMainClass); } + if (ncservicePid != 0) { + configuration.put("ncservice-pid", String.valueOf(ncservicePid)); + } } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ServiceConstants.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ServiceConstants.java new file mode 100644 index 0000000..1b790b7 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/ServiceConstants.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.common.controllers; + +public class ServiceConstants { + public enum ServiceCommand { + START_NC, + TERMINATE + } + public static final String NC_SERVICE_MAGIC_COOKIE = "hyncmagic2"; +} diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java index 6912892..4e5c98f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java @@ -24,7 +24,7 @@ public class ShutdownRun implements IShutdownStatusConditionVariable{ - private final Set<String> shutdownNodeIds = new TreeSet<String>(); + private final Set<String> shutdownNodeIds = new TreeSet<>(); private boolean shutdownSuccess = false; private static final int SHUTDOWN_TIMER_MS = 10000; //10 seconds @@ -35,12 +35,11 @@ /** * Notify that a node is shutting down. * - * @param nodeId - * @param status + * @param nodeId the node acknowledging the shutdown */ public synchronized void notifyShutdown(String nodeId) { shutdownNodeIds.remove(nodeId); - if (shutdownNodeIds.size() == 0) { + if (shutdownNodeIds.isEmpty()) { shutdownSuccess = true; notifyAll(); } @@ -48,10 +47,14 @@ @Override public synchronized boolean waitForCompletion() throws Exception { - /* - * Either be woken up when we're done, or default to fail. - */ - wait(SHUTDOWN_TIMER_MS); + if (shutdownNodeIds.isEmpty()) { + shutdownSuccess = true; + } else { + /* + * Either be woken up when we're done, or default to fail. + */ + wait(SHUTDOWN_TIMER_MS); + } return shutdownSuccess; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java index 5b032d9..848bdd2 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-nc-service/src/main/java/org/apache/hyracks/control/nc/service/NCService.java @@ -34,6 +34,8 @@ import org.apache.commons.lang3.SystemUtils; import org.apache.hyracks.control.common.controllers.IniUtils; +import org.apache.hyracks.control.common.controllers.ServiceConstants; +import org.apache.hyracks.control.common.controllers.ServiceConstants.ServiceCommand; import org.ini4j.Ini; import org.kohsuke.args4j.CmdLineParser; @@ -70,10 +72,8 @@ */ private static Process proc = null; - private static final String MAGIC_COOKIE = "hyncmagic"; - private static List<String> buildCommand() throws IOException { - List<String> cList = new ArrayList<String>(); + List<String> cList = new ArrayList<>(); // Find the command to run. For now, we allow overriding the name, but // still assume it's located in the bin/ directory of the deployment. @@ -92,10 +92,15 @@ cList.add("-config-file"); // Store the Ini file from the CC locally so NCConfig can read it. - // QQQ should arrange to delete this when done File tempIni = File.createTempFile("ncconf", ".conf"); + tempIni.deleteOnExit(); + ini.store(tempIni); cList.add(tempIni.getCanonicalPath()); + + // pass in the PID of the NCService + cList.add("-ncservice-pid"); + cList.add(System.getProperty("app.pid", "0")); return cList; } @@ -161,14 +166,10 @@ } catch (InterruptedException ignored) { } } - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("NCDriver exited with return value " + retval); - } + LOGGER.info("NCDriver exited with return value " + retval); if (retval == 99) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Terminating NCService based on return value from NCDriver"); - } - System.exit(0); + LOGGER.info("Terminating NCService based on return value from NCDriver"); + exit(0); } return retval == 0; } catch (Exception e) { @@ -180,30 +181,42 @@ } private static boolean acceptConnection(InputStream is) { - // Simple on-wire protocol: magic cookie (string), CC address (string), - // port (string), as encoded on CC by ObjectOutputStream. If we see - // anything else or have any error, crap out and await a different - // connection. - // QQQ This should probably be changed to directly accept the full - // config file from the CC, rather than calling back to the CC's - // "config" webservice to retrieve it. Revisit when the CC is fully - // parsing and validating the master config file. + // Simple on-wire protocol: + // magic cookie (string) + // either: + // START_NC, ini file + // or: + // TERMINATE + // If we see anything else or have any error, crap out and await a different connection. try { ObjectInputStream ois = new ObjectInputStream(is); String magic = ois.readUTF(); - if (! MAGIC_COOKIE.equals(magic)) { + if (! ServiceConstants.NC_SERVICE_MAGIC_COOKIE.equals(magic)) { LOGGER.severe("Connection used incorrect magic cookie"); return false; } - String iniString = ois.readUTF(); - ini = new Ini(new StringReader(iniString)); - ncId = IniUtils.getString(ini, "localnc", "id", ""); - nodeSection = "nc/" + ncId; - return launchNCProcess(); + switch (ServiceCommand.valueOf(ois.readUTF())) { + case START_NC: + String iniString = ois.readUTF(); + ini = new Ini(new StringReader(iniString)); + ncId = IniUtils.getString(ini, "localnc", "id", ""); + nodeSection = "nc/" + ncId; + return launchNCProcess(); + case TERMINATE: + LOGGER.info("Terminating NCService based on command from CC"); + exit(0); + break; + } } catch (Exception e) { LOGGER.log(Level.SEVERE, "Error decoding connection from server", e); } return false; + } + + @SuppressWarnings("squid:S1147") // call to System.exit() + private static void exit(int exitCode) { + LOGGER.info("JVM Exiting.. Bye!"); + System.exit(exitCode); } public static void main(String[] args) throws Exception { @@ -239,25 +252,17 @@ // Loop forever - the NCService will always return to "waiting for CC" state // when the child NC terminates for any reason. while (true) { - ServerSocket listener = new ServerSocket(port, 5, addr); - try { + try (ServerSocket listener = new ServerSocket(port, 5, addr)) { boolean launched = false; while (!launched) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Waiting for connection from CC on " + addr + ":" + port); - } - Socket socket = listener.accept(); - try { + LOGGER.info("Waiting for connection from CC on " + addr + ":" + port); + try (Socket socket = listener.accept()) { // QQQ Because acceptConnection() doesn't return if the // service is started appropriately, the socket remains // open but non-responsive. launched = acceptConnection(socket.getInputStream()); - } finally { - socket.close(); } } - } finally { - listener.close(); } } } -- To view, visit https://asterix-gerrit.ics.uci.edu/1193 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ia48eb16696e48444f610fd5e8d7d4666d0257a38 Gerrit-PatchSet: 7 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
