Michael Blow has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2021
Change subject: [NO ISSUE] Minor active refactoring ...................................................................... [NO ISSUE] Minor active refactoring - Remove unused ActiveRuntimeManager - Rename StatsRequestMessage -> ActiveStatsRequestMessage - Add ActiveManager API to return all active runtimes Change-Id: I79249f7cd42496d6679eb9b0acbe8cda1892f9d3 --- M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java D asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java R asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java M asterixdb/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java 9 files changed, 36 insertions(+), 116 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/21/2021/1 diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java index df59dca..1d771a7 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java @@ -18,8 +18,10 @@ */ package org.apache.asterix.active; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -32,7 +34,7 @@ import org.apache.asterix.active.message.ActiveManagerMessage; import org.apache.asterix.active.message.ActiveStatsResponse; -import org.apache.asterix.active.message.StatsRequestMessage; +import org.apache.asterix.active.message.ActiveStatsRequestMessage; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.memory.ConcurrentFramePool; @@ -54,7 +56,7 @@ private volatile boolean shutdown; public ActiveManager(ExecutorService executor, String nodeId, long activeMemoryBudget, int frameSize, - INCServiceContext serviceCtx) throws HyracksDataException { + INCServiceContext serviceCtx) throws HyracksDataException { this.executor = executor; this.nodeId = nodeId; this.activeFramePool = new ConcurrentFramePool(nodeId, activeMemoryBudget, frameSize); @@ -77,6 +79,10 @@ runtimes.remove(id); } + public Set<ActiveRuntimeId> getRuntimeIds() { + return Collections.unmodifiableSet(runtimes.keySet()); + } + public IActiveRuntime getRuntime(ActiveRuntimeId runtimeId) { return runtimes.get(runtimeId); } @@ -93,14 +99,14 @@ stopRuntime(message); break; case REQUEST_STATS: - requestStats((StatsRequestMessage) message); + requestStats((ActiveStatsRequestMessage) message); break; default: LOGGER.warning("Unknown message type received: " + message.getKind()); } } - private void requestStats(StatsRequestMessage message) throws HyracksDataException { + private void requestStats(ActiveStatsRequestMessage message) throws HyracksDataException { try { ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload(); IActiveRuntime runtime = runtimes.get(runtimeId); @@ -111,13 +117,13 @@ ((NodeControllerService) serviceCtx.getControllerService()) .sendApplicationMessageToCC( JavaSerializationUtils - .serialize(new ActiveStatsResponse(reqId, null, new RuntimeDataException( - ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME, runtimeId.toString()))), - null); + .serialize(new ActiveStatsResponse(reqId, nodeId, null, + new RuntimeDataException(ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME, + runtimeId.toString()))), null); return; } String stats = runtime.getStats(); - ActiveStatsResponse response = new ActiveStatsResponse(reqId, stats, null); + ActiveStatsResponse response = new ActiveStatsResponse(reqId, nodeId, stats, null); ((NodeControllerService) serviceCtx.getControllerService()) .sendApplicationMessageToCC(JavaSerializationUtils.serialize(response), null); } catch (Exception e) { diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java deleted file mode 100644 index 18368ae..0000000 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.active; - -import java.io.IOException; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.hyracks.api.exceptions.HyracksDataException; - -public class ActiveRuntimeManager { - - private static final Logger LOGGER = Logger.getLogger(ActiveRuntimeManager.class.getName()); - private final Map<ActiveRuntimeId, ActiveSourceOperatorNodePushable> activeRuntimes; - - private final ExecutorService executorService; - - public ActiveRuntimeManager() { - this.activeRuntimes = new ConcurrentHashMap<>(); - this.executorService = Executors.newCachedThreadPool(); - } - - public void close() throws IOException { - if (executorService != null) { - executorService.shutdown(); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Shut down executor service for :" + ActiveRuntimeManager.class.getSimpleName()); - } - try { - executorService.awaitTermination(10L, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.log(Level.SEVERE, ActiveRuntimeManager.class.getSimpleName() - + " was interrupted while waiting for runtime managers to shutdown", e); - } - if (!executorService.isTerminated()) { - LOGGER.severe(ActiveRuntimeManager.class.getSimpleName() - + " failed to shutdown successfully. Will be forced to shutdown"); - executorService.shutdownNow(); - } - } - } - - public ActiveSourceOperatorNodePushable getRuntime(ActiveRuntimeId runtimeId) { - return activeRuntimes.get(runtimeId); - } - - public void registerRuntime(ActiveRuntimeId runtimeId, ActiveSourceOperatorNodePushable feedRuntime) - throws HyracksDataException { - if (activeRuntimes.containsKey(runtimeId)) { - throw new RuntimeDataException(ErrorCode.ACTIVE_RUNTIME_IS_ALREADY_REGISTERED, runtimeId); - } - activeRuntimes.put(runtimeId, feedRuntime); - } - - public void deregisterRuntime(ActiveRuntimeId runtimeId) throws HyracksDataException { - if (!activeRuntimes.containsKey(runtimeId)) { - throw new RuntimeDataException(ErrorCode.ACTIVE_RUNTIME_IS_NOT_REGISTERED, runtimeId); - } - activeRuntimes.remove(runtimeId); - } - - public ExecutorService getExecutorService() { - return executorService; - } - - public Set<ActiveRuntimeId> getFeedRuntimes() { - return activeRuntimes.keySet(); - } - -} diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java similarity index 88% rename from asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java rename to asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java index d43f00e..0dbba52 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java @@ -20,11 +20,11 @@ import java.io.Serializable; -public class StatsRequestMessage extends ActiveManagerMessage { +public class ActiveStatsRequestMessage extends ActiveManagerMessage { private static final long serialVersionUID = 1L; private final long reqId; - public StatsRequestMessage(Serializable payload, long reqId) { + public ActiveStatsRequestMessage(Serializable payload, long reqId) { super(Kind.REQUEST_STATS, payload); this.reqId = reqId; } diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java index 8738a06..b8ba271 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java @@ -27,17 +27,20 @@ import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.common.messaging.api.INcResponse; import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.api.exceptions.HyracksDataException; public class ActiveStatsResponse implements ICcAddressedMessage, INcResponse { private static final long serialVersionUID = 1L; private final long reqId; + private final String nodeId; private final String stats; private final Exception failure; - public ActiveStatsResponse(long reqId, String stats, Exception failure) { + public ActiveStatsResponse(long reqId, String nodeId, String stats, Exception failure) { this.reqId = reqId; + this.nodeId = nodeId; this.stats = stats; this.failure = failure; } @@ -54,13 +57,13 @@ switch (responseState) { case UNINITIALIZED: // First to arrive - result.setRight(new ArrayList<String>()); + result.setRight(new ArrayList<Pair<String, String>>()); // No failure, change state to success result.setLeft(ResponseState.SUCCESS); - // Fallthrough + // fall-through case SUCCESS: - List<String> response = (List<String>) result.getRight(); - response.add(stats); + List<Pair<String, String>> response = (List<Pair<String, String>>) result.getRight(); + response.add(Pair.of(nodeId, stats)); break; default: break; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index c1e772c..0d27d6c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -38,7 +38,7 @@ import org.apache.asterix.active.NoRetryPolicyFactory; import org.apache.asterix.active.message.ActivePartitionMessage; import org.apache.asterix.active.message.ActivePartitionMessage.Event; -import org.apache.asterix.active.message.StatsRequestMessage; +import org.apache.asterix.active.message.ActiveStatsRequestMessage; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.ErrorCode; @@ -290,7 +290,7 @@ List<INcAddressedMessage> requests = new ArrayList<>(); List<String> ncs = Arrays.asList(locations.getLocations()); for (int i = 0; i < ncs.size(); i++) { - requests.add(new StatsRequestMessage(new ActiveRuntimeId(entityId, runtimeName, i), reqId)); + requests.add(new ActiveStatsRequestMessage(new ActiveRuntimeId(entityId, runtimeName, i), reqId)); } try { List<String> responses = (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java index e3424ec..8bc6eb0 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java @@ -26,7 +26,6 @@ } enum ClusterState { - STARTING, // the initial state UNUSABLE, // one or more cluster partitions are inactive or max id resources have not been reported PENDING, // the metadata node has not yet joined & initialized RECOVERING, // global recovery has not yet completed diff --git a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java index f823404..c51e2cf 100644 --- a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java +++ b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java @@ -29,7 +29,7 @@ private final ZooKeeper zk; private String clusterStatePath; private boolean done = false; - private ClusterState clusterState = ClusterState.STARTING; + private ClusterState clusterState = ClusterState.UNUSABLE; private boolean failed = false; private Exception failureCause = null; private static Logger LOGGER = Logger.getLogger(ClusterStateWatcher.class.getName()); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index c85e236..824f51a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -36,6 +36,9 @@ import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController { + public static final String INCOMING_RECORDS_COUNT_FIELD_NAME = "incoming-records-count"; + public static final String FAILED_AT_PARSER_RECORDS_COUNT_FIELD_NAME = "failed-at-parser-records-count"; + public enum State { CREATED, STARTED, @@ -278,7 +281,7 @@ @Override public String getStats() { - return "{\"incoming-records-count\": " + incomingRecordsCount + ", \"failed-at-parser-records-count\": " - + failedRecordsCount + "}"; + return "{\"" + INCOMING_RECORDS_COUNT_FIELD_NAME + "\": " + incomingRecordsCount + ", \"" + + FAILED_AT_PARSER_RECORDS_COUNT_FIELD_NAME + "\": " + failedRecordsCount + "}"; } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index 334b683..51c87b4 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -165,6 +165,10 @@ @Override public synchronized void refreshState() throws HyracksDataException { + if (state == ClusterState.SHUTTING_DOWN) { + LOGGER.log(Level.INFO, "Not refreshing final state %s", state); + return; + } resetClusterPartitionConstraint(); if (clusterPartitions.isEmpty()) { LOGGER.info("Cluster does not have any registered partitions"); -- To view, visit https://asterix-gerrit.ics.uci.edu/2021 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I79249f7cd42496d6679eb9b0acbe8cda1892f9d3 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Michael Blow <mb...@apache.org>