Michael Blow has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2021

Change subject: [NO ISSUE] Minor active refactoring
......................................................................

[NO ISSUE] Minor active refactoring

- Remove unused ActiveRuntimeManager
- Rename StatsRequestMessage -> ActiveStatsRequestMessage
- Add ActiveManager API to return all active runtimes

Change-Id: I79249f7cd42496d6679eb9b0acbe8cda1892f9d3
---
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
D 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
R 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
M 
asterixdb/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
9 files changed, 36 insertions(+), 116 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/21/2021/1

diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index df59dca..1d771a7 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -18,8 +18,10 @@
  */
 package org.apache.asterix.active;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
@@ -32,7 +34,7 @@
 
 import org.apache.asterix.active.message.ActiveManagerMessage;
 import org.apache.asterix.active.message.ActiveStatsResponse;
-import org.apache.asterix.active.message.StatsRequestMessage;
+import org.apache.asterix.active.message.ActiveStatsRequestMessage;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.memory.ConcurrentFramePool;
@@ -54,7 +56,7 @@
     private volatile boolean shutdown;
 
     public ActiveManager(ExecutorService executor, String nodeId, long 
activeMemoryBudget, int frameSize,
-            INCServiceContext serviceCtx) throws HyracksDataException {
+                         INCServiceContext serviceCtx) throws 
HyracksDataException {
         this.executor = executor;
         this.nodeId = nodeId;
         this.activeFramePool = new ConcurrentFramePool(nodeId, 
activeMemoryBudget, frameSize);
@@ -77,6 +79,10 @@
         runtimes.remove(id);
     }
 
+    public Set<ActiveRuntimeId> getRuntimeIds() {
+        return Collections.unmodifiableSet(runtimes.keySet());
+    }
+
     public IActiveRuntime getRuntime(ActiveRuntimeId runtimeId) {
         return runtimes.get(runtimeId);
     }
@@ -93,14 +99,14 @@
                 stopRuntime(message);
                 break;
             case REQUEST_STATS:
-                requestStats((StatsRequestMessage) message);
+                requestStats((ActiveStatsRequestMessage) message);
                 break;
             default:
                 LOGGER.warning("Unknown message type received: " + 
message.getKind());
         }
     }
 
-    private void requestStats(StatsRequestMessage message) throws 
HyracksDataException {
+    private void requestStats(ActiveStatsRequestMessage message) throws 
HyracksDataException {
         try {
             ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
             IActiveRuntime runtime = runtimes.get(runtimeId);
@@ -111,13 +117,13 @@
                 ((NodeControllerService) serviceCtx.getControllerService())
                         .sendApplicationMessageToCC(
                                 JavaSerializationUtils
-                                        .serialize(new 
ActiveStatsResponse(reqId, null, new RuntimeDataException(
-                                                
ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME, runtimeId.toString()))),
-                                null);
+                                        .serialize(new 
ActiveStatsResponse(reqId, nodeId, null,
+                                                new 
RuntimeDataException(ErrorCode.ACTIVE_MANAGER_INVALID_RUNTIME,
+                                                        
runtimeId.toString()))), null);
                 return;
             }
             String stats = runtime.getStats();
-            ActiveStatsResponse response = new ActiveStatsResponse(reqId, 
stats, null);
+            ActiveStatsResponse response = new ActiveStatsResponse(reqId, 
nodeId, stats, null);
             ((NodeControllerService) serviceCtx.getControllerService())
                     
.sendApplicationMessageToCC(JavaSerializationUtils.serialize(response), null);
         } catch (Exception e) {
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
deleted file mode 100644
index 18368ae..0000000
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveRuntimeManager.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class ActiveRuntimeManager {
-
-    private static final Logger LOGGER = 
Logger.getLogger(ActiveRuntimeManager.class.getName());
-    private final Map<ActiveRuntimeId, ActiveSourceOperatorNodePushable> 
activeRuntimes;
-
-    private final ExecutorService executorService;
-
-    public ActiveRuntimeManager() {
-        this.activeRuntimes = new ConcurrentHashMap<>();
-        this.executorService = Executors.newCachedThreadPool();
-    }
-
-    public void close() throws IOException {
-        if (executorService != null) {
-            executorService.shutdown();
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("Shut down executor service for :" + 
ActiveRuntimeManager.class.getSimpleName());
-            }
-            try {
-                executorService.awaitTermination(10L, TimeUnit.SECONDS);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                LOGGER.log(Level.SEVERE, 
ActiveRuntimeManager.class.getSimpleName()
-                        + " was interrupted while waiting for runtime managers 
to shutdown", e);
-            }
-            if (!executorService.isTerminated()) {
-                LOGGER.severe(ActiveRuntimeManager.class.getSimpleName()
-                        + " failed to shutdown successfully. Will be forced to 
shutdown");
-                executorService.shutdownNow();
-            }
-        }
-    }
-
-    public ActiveSourceOperatorNodePushable getRuntime(ActiveRuntimeId 
runtimeId) {
-        return activeRuntimes.get(runtimeId);
-    }
-
-    public void registerRuntime(ActiveRuntimeId runtimeId, 
ActiveSourceOperatorNodePushable feedRuntime)
-            throws HyracksDataException {
-        if (activeRuntimes.containsKey(runtimeId)) {
-            throw new 
RuntimeDataException(ErrorCode.ACTIVE_RUNTIME_IS_ALREADY_REGISTERED, runtimeId);
-        }
-        activeRuntimes.put(runtimeId, feedRuntime);
-    }
-
-    public void deregisterRuntime(ActiveRuntimeId runtimeId) throws 
HyracksDataException {
-        if (!activeRuntimes.containsKey(runtimeId)) {
-            throw new 
RuntimeDataException(ErrorCode.ACTIVE_RUNTIME_IS_NOT_REGISTERED, runtimeId);
-        }
-        activeRuntimes.remove(runtimeId);
-    }
-
-    public ExecutorService getExecutorService() {
-        return executorService;
-    }
-
-    public Set<ActiveRuntimeId> getFeedRuntimes() {
-        return activeRuntimes.keySet();
-    }
-
-}
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
similarity index 88%
rename from 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
rename to 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
index d43f00e..0dbba52 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java
@@ -20,11 +20,11 @@
 
 import java.io.Serializable;
 
-public class StatsRequestMessage extends ActiveManagerMessage {
+public class ActiveStatsRequestMessage extends ActiveManagerMessage {
     private static final long serialVersionUID = 1L;
     private final long reqId;
 
-    public StatsRequestMessage(Serializable payload, long reqId) {
+    public ActiveStatsRequestMessage(Serializable payload, long reqId) {
         super(Kind.REQUEST_STATS, payload);
         this.reqId = reqId;
     }
diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java
index 8738a06..b8ba271 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsResponse.java
@@ -27,17 +27,20 @@
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.asterix.common.messaging.api.INcResponse;
 import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class ActiveStatsResponse implements ICcAddressedMessage, INcResponse {
 
     private static final long serialVersionUID = 1L;
     private final long reqId;
+    private final String nodeId;
     private final String stats;
     private final Exception failure;
 
-    public ActiveStatsResponse(long reqId, String stats, Exception failure) {
+    public ActiveStatsResponse(long reqId, String nodeId, String stats, 
Exception failure) {
         this.reqId = reqId;
+        this.nodeId = nodeId;
         this.stats = stats;
         this.failure = failure;
     }
@@ -54,13 +57,13 @@
         switch (responseState) {
             case UNINITIALIZED:
                 // First to arrive
-                result.setRight(new ArrayList<String>());
+                result.setRight(new ArrayList<Pair<String, String>>());
                 // No failure, change state to success
                 result.setLeft(ResponseState.SUCCESS);
-                // Fallthrough
+                // fall-through
             case SUCCESS:
-                List<String> response = (List<String>) result.getRight();
-                response.add(stats);
+                List<Pair<String, String>> response = (List<Pair<String, 
String>>) result.getRight();
+                response.add(Pair.of(nodeId, stats));
                 break;
             default:
                 break;
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index c1e772c..0d27d6c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -38,7 +38,7 @@
 import org.apache.asterix.active.NoRetryPolicyFactory;
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.active.message.ActivePartitionMessage.Event;
-import org.apache.asterix.active.message.StatsRequestMessage;
+import org.apache.asterix.active.message.ActiveStatsRequestMessage;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -290,7 +290,7 @@
         List<INcAddressedMessage> requests = new ArrayList<>();
         List<String> ncs = Arrays.asList(locations.getLocations());
         for (int i = 0; i < ncs.size(); i++) {
-            requests.add(new StatsRequestMessage(new ActiveRuntimeId(entityId, 
runtimeName, i), reqId));
+            requests.add(new ActiveStatsRequestMessage(new 
ActiveRuntimeId(entityId, runtimeName, i), reqId));
         }
         try {
             List<String> responses = (List<String>) 
messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout);
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
index e3424ec..8bc6eb0 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
@@ -26,7 +26,6 @@
     }
 
     enum ClusterState {
-        STARTING,       // the initial state
         UNUSABLE,       // one or more cluster partitions are inactive or max 
id resources have not been reported
         PENDING,        // the metadata node has not yet joined & initialized
         RECOVERING,     // global recovery has not yet completed
diff --git 
a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java
 
b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java
index f823404..c51e2cf 100644
--- 
a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java
+++ 
b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/service/ClusterStateWatcher.java
@@ -29,7 +29,7 @@
     private final ZooKeeper zk;
     private String clusterStatePath;
     private boolean done = false;
-    private ClusterState clusterState = ClusterState.STARTING;
+    private ClusterState clusterState = ClusterState.UNUSABLE;
     private boolean failed = false;
     private Exception failureCause = null;
     private static Logger LOGGER = 
Logger.getLogger(ClusterStateWatcher.class.getName());
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index c85e236..824f51a 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -36,6 +36,9 @@
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
 public class FeedRecordDataFlowController<T> extends 
AbstractFeedDataFlowController {
+    public static final String INCOMING_RECORDS_COUNT_FIELD_NAME = 
"incoming-records-count";
+    public static final String FAILED_AT_PARSER_RECORDS_COUNT_FIELD_NAME = 
"failed-at-parser-records-count";
+
     public enum State {
         CREATED,
         STARTED,
@@ -278,7 +281,7 @@
 
     @Override
     public String getStats() {
-        return "{\"incoming-records-count\": " + incomingRecordsCount + ", 
\"failed-at-parser-records-count\": "
-                + failedRecordsCount + "}";
+        return "{\"" + INCOMING_RECORDS_COUNT_FIELD_NAME + "\": " + 
incomingRecordsCount + ", \"" +
+                FAILED_AT_PARSER_RECORDS_COUNT_FIELD_NAME + "\": " + 
failedRecordsCount + "}";
     }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 334b683..51c87b4 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -165,6 +165,10 @@
 
     @Override
     public synchronized void refreshState() throws HyracksDataException {
+        if (state == ClusterState.SHUTTING_DOWN) {
+            LOGGER.log(Level.INFO, "Not refreshing final state %s", state);
+            return;
+        }
         resetClusterPartitionConstraint();
         if (clusterPartitions.isEmpty()) {
             LOGGER.info("Cluster does not have any registered partitions");

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2021
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I79249f7cd42496d6679eb9b0acbe8cda1892f9d3
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <mb...@apache.org>

Reply via email to