Xikui Wang has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1748
Change subject: Add HttpAPI for active feed status
......................................................................
Add HttpAPI for active feed status
1. Added HttpAPI for active feed live status.
2. Added Event monitor for FeedEventSubscriber.
Change-Id: I46b48b52a1c9906510c5bdce778d1672845f75ca
---
M asterixdb/asterix-active/pom.xml
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
A
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FeedsStateApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
D
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedEventSubscriber.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
23 files changed, 353 insertions(+), 81 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/48/1748/1
diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml
index 950e4d6..cdee6fb 100644
--- a/asterixdb/asterix-active/pom.xml
+++ b/asterixdb/asterix-active/pom.xml
@@ -42,5 +42,15 @@
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-dataflow-std</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>0.3.2-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.5</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
index 2669990..71689b6 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
@@ -34,12 +34,14 @@
private final EntityId entityId;
private final Kind eventKind;
private final Object eventObject;
+ private final int hashcode;
public ActiveEvent(JobId jobId, Kind eventKind, EntityId entityId, Object
eventObject) {
this.jobId = jobId;
this.entityId = entityId;
this.eventKind = eventKind;
this.eventObject = eventObject;
+ hashcode = toString().hashCode();
}
public ActiveEvent(JobId jobId, Kind eventKind, EntityId entityId) {
@@ -64,6 +66,25 @@
@Override
public String toString() {
- return "JobId:" + jobId + ", " + "EntityId:" + entityId + ", " +
"Kind" + eventKind;
+ return "JobId:" + jobId + "," + "EntityId:" + entityId + ", " + "Kind"
+ eventKind;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || !(o instanceof ActiveEvent)) {
+ return false;
+ }
+ if ((this == o || (((ActiveEvent) o).entityId.equals(this.entityId)
+ && ((ActiveEvent) o).eventKind == this.getEventKind()))
+ && (eventObject == null
+ || (eventObject != null &&
eventObject.equals(((ActiveEvent) o).getEventObject())))) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return hashcode;
}
}
\ No newline at end of file
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index 44d6dae..c94fba3 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.active;
+import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -28,11 +29,16 @@
import java.util.concurrent.TimeoutException;
import org.apache.asterix.active.message.ActiveManagerMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.common.api.ThreadExecutor;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.memory.ConcurrentFramePool;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.JavaSerializationUtils;
+import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.log4j.Logger;
public class ActiveManager {
@@ -44,14 +50,17 @@
private final ConcurrentMap<ActiveRuntimeId, IActiveRuntime> runtimes;
private final ConcurrentFramePool activeFramePool;
private final String nodeId;
+ private final INCServiceContext ncsCtx;
private volatile boolean shutdown;
- public ActiveManager(ThreadExecutor executor, String nodeId, long
activeMemoryBudget, int frameSize)
+ public ActiveManager(ThreadExecutor executor, String nodeId, long
activeMemoryBudget, int frameSize,
+ INCServiceContext ncsCtx)
throws HyracksDataException {
this.executor = executor;
this.nodeId = nodeId;
this.activeFramePool = new ConcurrentFramePool(nodeId,
activeMemoryBudget, frameSize);
this.runtimes = new ConcurrentHashMap<>();
+ this.ncsCtx = ncsCtx;
}
public ConcurrentFramePool getFramePool() {
@@ -78,16 +87,37 @@
return ActiveManager.class.getSimpleName() + "[" + nodeId + "]";
}
- public void submit(ActiveManagerMessage message) {
+ public void submit(ActiveManagerMessage message) throws
HyracksDataException {
switch (message.getKind()) {
case ActiveManagerMessage.STOP_ACTIVITY:
stopRuntime(message);
break;
+ case ActiveManagerMessage.REQUEST_STATS:
+ requestStats(message);
default:
LOGGER.warn("Unknown message type received: " +
message.getKind());
}
}
+ private void requestStats(ActiveManagerMessage message) throws
HyracksDataException {
+ ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
+ IActiveRuntime runtime = runtimes.get(runtimeId);
+ Pair<Instant, String> runtimeStatus;
+ if (runtime == null) {
+ runtimeStatus = Pair.of(Instant.now(), "Request stats from a
runtime that is not registered " + runtimeId);
+ } else {
+ runtimeStatus = runtime.getStatus();
+ }
+ ActivePartitionMessage statsResponseMsg = new
ActivePartitionMessage(runtime.getRuntimeId(), runtime.getJobId(),
+ ActivePartitionMessage.ACTIVE_RUNTIME_STATUS_REQUEST,
runtimeStatus);
+ try {
+ ((NodeControllerService) ncsCtx.getControllerService())
+
.sendApplicationMessageToCC(JavaSerializationUtils.serialize(statsResponseMsg),
null);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+
public void shutdown() {
LOGGER.warn("Shutting down ActiveManager on node " + nodeId);
Map<ActiveRuntimeId, Future<Void>> stopFutures = new HashMap<>();
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index 98a6979..7b17cda 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
import
org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public abstract class ActiveSourceOperatorNodePushable extends
AbstractUnaryOutputSourceOperatorNodePushable
@@ -36,6 +37,7 @@
protected final ActiveManager activeManager;
/** A unique identifier for the runtime **/
protected final ActiveRuntimeId runtimeId;
+ protected final JobId jobId;
private volatile boolean done = false;
public ActiveSourceOperatorNodePushable(IHyracksTaskContext ctx,
ActiveRuntimeId runtimeId) {
@@ -43,6 +45,7 @@
activeManager = (ActiveManager) ((INcApplicationContext)
ctx.getJobletContext().getServiceContext()
.getApplicationContext()).getActiveManager();
this.runtimeId = runtimeId;
+ this.jobId = ctx.getJobletContext().getJobId();
}
@Override
@@ -124,4 +127,8 @@
public final IFrameWriter getInputFrameWriter(int index) {
return null;
}
+
+ public JobId getJobId() {
+ return this.jobId;
+ }
}
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index ee8e776..349b890 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -50,6 +50,8 @@
*/
IActiveEventSubscriber subscribe(ActivityState state) throws
HyracksDataException;
+ IActiveEventSubscriber subscribe(ActiveEvent event) throws
HyracksDataException;
+
/**
* @return the active entity id
*/
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
index 528c220..875063b 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveRuntime.java
@@ -18,7 +18,12 @@
*/
package org.apache.asterix.active;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+
+import java.io.Serializable;
+import java.time.Instant;
public interface IActiveRuntime {
@@ -34,4 +39,8 @@
* @throws InterruptedException
*/
void stop() throws HyracksDataException, InterruptedException;
+
+ Pair<Instant, String> getStatus();
+
+ JobId getJobId();
}
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index 231ec25..84eb60a 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -27,6 +27,7 @@
public class ActiveManagerMessage implements INcAddressedMessage {
public static final byte STOP_ACTIVITY = 0x00;
+ public static final byte REQUEST_STATS = 0x01;
private static final long serialVersionUID = 1L;
private final byte kind;
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index 335121a..5805f45 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -32,11 +32,13 @@
public static final byte ACTIVE_RUNTIME_REGISTERED = 0x00;
public static final byte ACTIVE_RUNTIME_DEREGISTERED = 0x01;
public static final byte GENERIC_EVENT = 0x02;
+ public static final byte ACTIVE_RUNTIME_STATUS_REQUEST = 0x03;
private static final long serialVersionUID = 1L;
private final ActiveRuntimeId activeRuntimeId;
private final JobId jobId;
private final Serializable payload;
private final byte event;
+ private final int hashcode;
public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId
jobId, byte event) {
this(activeRuntimeId, jobId, event, null);
@@ -47,6 +49,7 @@
this.jobId = jobId;
this.event = event;
this.payload = payload;
+ this.hashcode = toString().hashCode();
}
public ActiveRuntimeId getActiveRuntimeId() {
@@ -73,6 +76,19 @@
@Override
public String toString() {
- return ActivePartitionMessage.class.getSimpleName();
+ return ActivePartitionMessage.class.getSimpleName() + event;
+ }
+
+ @Override
+ public int hashCode() {
+ return hashcode;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || !(o instanceof ActivePartitionMessage)) {
+ return false;
+ }
+ return (this == o || this.hashCode() == o.hashCode());
}
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FeedsStateApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FeedsStateApiServlet.java
new file mode 100644
index 0000000..5e8f683
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/FeedsStateApiServlet.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.active.message.ActiveManagerMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICCMessageBroker;
+import org.apache.asterix.external.feed.management.FeedEventsListener;
+import org.apache.asterix.external.feed.watch.FeedEventSubscriber;
+import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+
+import java.io.PrintWriter;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.ConcurrentMap;
+
+public class FeedsStateApiServlet extends AbstractServlet {
+
+ private final String KEY_FEED_NAME = "Feed";
+ private static final String KEY_ACTIVE_FEED_NUM = "Active Feeds Number";
+ private static final String KEY_FEED_ENTITY_EID = "EntityId";
+ private static final String KEY_FEED_INGESTION_LOC = "Ingestion Location";
+ private static final String KEY_FEED_RUN_TIME = "Active Time (ms)";
+ private static final String KEY_FEED_STATUS = "Feed Status";
+ private static final int DEFAULT_EXPIRE_TIME = 2000;
+
+
+ private final ActiveLifecycleListener activeLifecycleListener;
+ private final ICCMessageBroker messageBroker;
+
+ public FeedsStateApiServlet(ConcurrentMap<String, Object> ctx, String[]
paths, ICcApplicationContext appCtx) {
+ super(ctx, paths);
+ this.activeLifecycleListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
+ this.messageBroker = (ICCMessageBroker)
appCtx.getServiceContext().getMessageBroker();
+ }
+
+ private synchronized void updateFeedStatus(FeedEventsListener fel, int
sourceId) throws Exception {
+ ActiveManagerMessage requestStatsMessage = new
ActiveManagerMessage(ActiveManagerMessage.REQUEST_STATS,
+ "SRC", new ActiveRuntimeId(fel.getEntityId(),
+ FeedIntakeOperatorNodePushable.class.getSimpleName(),
sourceId));
+ FeedEventSubscriber eventSubscriber = (FeedEventSubscriber)
fel.subscribe(new ActiveEvent(null,
+ ActiveEvent.Kind.PARTITION_EVENT, fel.getEntityId(),
+ new ActivePartitionMessage(null, null,
ActivePartitionMessage.ACTIVE_RUNTIME_STATUS_REQUEST)));
+ messageBroker.sendApplicationMessageToNC(requestStatsMessage,
fel.getSources()[sourceId]);
+ eventSubscriber.sync();
+ }
+
+ private ObjectNode constructNodeForFeed(ObjectMapper om,
FeedEventsListener fel, int expireTime) throws Exception {
+ Instant requestedTime = Instant.now();
+ ObjectNode feedNode = om.createObjectNode();
+ feedNode.put(KEY_FEED_ENTITY_EID, fel.getEntityId().toString());
+ for (int iter1 = 0; iter1 < fel.getSources().length; iter1++) {
+ ObjectNode ingestNode = om.createObjectNode();
+ ingestNode.put(KEY_FEED_INGESTION_LOC, fel.getSources()[iter1]);
+ Pair<Instant, String> feedStatus = fel.getFeedStatus();
+ if (Duration.between(feedStatus.getLeft(),
requestedTime).toMillis() > expireTime) {
+ updateFeedStatus(fel, iter1);
+ feedStatus = fel.getFeedStatus();
+ }
+ feedNode.put(KEY_FEED_RUN_TIME,
Duration.between(fel.getStartedTime(), feedStatus.getLeft()).toMillis());
+ ingestNode.put(KEY_FEED_STATUS, feedStatus.getRight());
+ feedNode.putPOJO("Ingest Node " + iter1, ingestNode);
+ }
+ return feedNode;
+ }
+
+ @Override
+ protected void get(IServletRequest request, IServletResponse response)
throws Exception {
+ // Obtain all feed status
+ String requestPara = localPath(request);
+ int expireTime;
+ IActiveEntityEventsListener[] aeels =
activeLifecycleListener.getNotificationHandler().getEventListeners();
+ ObjectMapper om = new ObjectMapper();
+ om.enable(SerializationFeature.INDENT_OUTPUT);
+ ObjectNode resNode = om.createObjectNode();
+
+ if (requestPara.length() == 0 || requestPara.length() == 1) {
+ expireTime = DEFAULT_EXPIRE_TIME;
+ } else {
+ expireTime = Integer.valueOf(requestPara.substring(1));
+ }
+
+ resNode.put(KEY_ACTIVE_FEED_NUM, aeels.length);
+ for (int iter1 = 0; iter1 < aeels.length; iter1++) {
+ resNode.putPOJO(KEY_FEED_NAME + iter1,
+ constructNodeForFeed(om, (FeedEventsListener)
aeels[iter1], expireTime));
+ }
+
+ // Construct Response
+ PrintWriter responseWriter = response.writer();
+
responseWriter.write(om.writerWithDefaultPrettyPrinter().writeValueAsString(resNode));
+ responseWriter.flush();
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 6ad85b3..c273473 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -203,7 +203,8 @@
isShuttingdown = false;
activeManager = new ActiveManager(threadExecutor,
getServiceContext().getNodeId(),
- activeProperties.getMemoryComponentGlobalBudget(),
compilerProperties.getFrameSize());
+ activeProperties.getMemoryComponentGlobalBudget(),
compilerProperties.getFrameSize(),
+ this.ncServiceContext);
if
(replicationProperties.isParticipant(getServiceContext().getNodeId())) {
String nodeId = getServiceContext().getNodeId();
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index b7aae59..39ec18c 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -2099,6 +2099,7 @@
FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName,
mdTxnCtx);
// Construct ActiveMessage
for (int i = 0; i < listener.getSources().length; i++) {
+ // TODO: Figure out where PartitionId is i
String intakeLocation = listener.getSources()[i];
FeedOperations.SendStopMessageToNode(appCtx, feedId,
intakeLocation, i);
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 57cc340..8884b8f 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -38,6 +38,7 @@
import org.apache.asterix.api.http.server.ConnectorApiServlet;
import org.apache.asterix.api.http.server.DdlApiServlet;
import org.apache.asterix.api.http.server.DiagnosticsApiServlet;
+import org.apache.asterix.api.http.server.FeedsStateApiServlet;
import org.apache.asterix.api.http.server.FullApiServlet;
import org.apache.asterix.api.http.server.NodeControllerDetailsApiServlet;
import org.apache.asterix.api.http.server.QueryApiServlet;
@@ -229,6 +230,7 @@
addServlet(jsonAPIServer, Servlets.CLUSTER_STATE_NODE_DETAIL); // must
not precede add of CLUSTER_STATE
addServlet(jsonAPIServer, Servlets.CLUSTER_STATE_CC_DETAIL); // must
not precede add of CLUSTER_STATE
addServlet(jsonAPIServer, Servlets.DIAGNOSTICS);
+ addServlet(jsonAPIServer, Servlets.FEEDS_STATE);
return jsonAPIServer;
}
@@ -295,6 +297,8 @@
return new ClusterControllerDetailsApiServlet(ctx, paths);
case Servlets.DIAGNOSTICS:
return new DiagnosticsApiServlet(ctx, paths, appCtx);
+ case Servlets.FEEDS_STATE:
+ return new FeedsStateApiServlet(ctx, paths, appCtx);
default:
throw new IllegalStateException(String.valueOf(key));
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
index 3047ef5..3a98568 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
@@ -39,6 +39,7 @@
public static final String CLUSTER_STATE_NODE_DETAIL =
"/admin/cluster/node/*";
public static final String CLUSTER_STATE_CC_DETAIL = "/admin/cluster/cc/*";
public static final String DIAGNOSTICS = "/admin/diagnostics";
+ public static final String FEEDS_STATE = "/admin/feeds/*";
private Servlets() {
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index 213231b..d3892c5 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -61,4 +61,6 @@
public abstract boolean stop() throws HyracksDataException;
public abstract boolean handleException(Throwable th) throws
HyracksDataException;
+
+ public abstract String getStatus();
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index d01859e..a11d93e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -39,6 +39,8 @@
protected final AtomicBoolean closed = new AtomicBoolean(false);
protected static final long INTERVAL = 1000;
protected boolean failed = false;
+ protected long incomingRecordsCount = 0;
+ protected long failedRecordsCount = 0;
public FeedRecordDataFlowController(IHyracksTaskContext ctx,
FeedTupleForwarder tupleForwarder,
FeedLogManager feedLogManager, int numOfOutputFields,
IRecordDataParser<T> dataParser,
@@ -63,7 +65,10 @@
continue;
}
tb.reset();
- parseAndForward(record);
+ incomingRecordsCount++;
+ if (!parseAndForward(record)) {
+ failedRecordsCount++;
+ }
}
} catch (InterruptedException e) {
//TODO: Find out what could cause an interrupted exception beside
termination of a job/feed
@@ -104,19 +109,20 @@
}
}
- private void parseAndForward(IRawRecord<? extends T> record) throws
IOException {
+ private boolean parseAndForward(IRawRecord<? extends T> record) throws
IOException {
try {
dataParser.parse(record, tb.getDataOutput());
} catch (Exception e) {
LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
feedLogManager.logRecord(record.toString(),
ExternalDataConstants.ERROR_PARSE_RECORD);
// continue the outer loop
- return;
+ return false;
}
tb.addFieldEndOffset();
addMetaPart(tb, record);
addPrimaryKeys(tb, record);
tupleForwarder.addTuple(tb);
+ return true;
}
protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T>
record) throws IOException {
@@ -187,4 +193,9 @@
public IRecordDataParser<T> getParser() {
return dataParser;
}
+
+ public String getStatus() {
+ return "Incoming records count " + incomingRecordsCount + ". Failed at
parser records count "
+ + failedRecordsCount + ".";
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index 0d72682..3ebaa41 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -29,6 +29,7 @@
private final IStreamDataParser dataParser;
private final AsterixInputStream stream;
+ protected long incomingRecordsCount = 0;
public FeedStreamDataFlowController(IHyracksTaskContext ctx,
FeedTupleForwarder tupleForwarder,
FeedLogManager feedLogManager, IStreamDataParser streamParser,
AsterixInputStream inputStream) {
@@ -48,6 +49,7 @@
}
tb.addFieldEndOffset();
tupleForwarder.addTuple(tb);
+ incomingRecordsCount++;
}
} catch (Exception e) {
throw new HyracksDataException(e);
@@ -83,4 +85,8 @@
}
return handled;
}
+
+ public String getStatus() {
+ return "Incoming records number: " + incomingRecordsCount + ".";
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index 8d80e6f..7c815ce 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -51,4 +51,8 @@
public boolean resume() throws HyracksDataException {
return controller.resume();
}
+
+ public String getStatus() {
+ return controller.getStatus();
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
index 6f3b667..ce08fac 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.external.feed.management;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -34,6 +35,7 @@
import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.external.feed.watch.FeedEventSubscriber;
import org.apache.asterix.external.feed.watch.NoOpSubscriber;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobStatus;
@@ -41,11 +43,14 @@
public class FeedEventsListener extends ActiveEntityEventsListener {
// constants
private static final Logger LOGGER =
Logger.getLogger(FeedEventsListener.class.getName());
+ private static final String FEED_INIT_STATUS = "Inited";
// members
private final ICcApplicationContext appCtx;
private final String[] sources;
private final List<IActiveEventSubscriber> subscribers;
private int numRegistered;
+ private Instant startedTime;
+ private Pair<Instant, String> feedStatus;
public FeedEventsListener(ICcApplicationContext appCtx, EntityId entityId,
List<IDataset> datasets,
String[] sources) {
@@ -55,11 +60,14 @@
this.sources = sources;
subscribers = new ArrayList<>();
state = ActivityState.STOPPED;
+ startedTime = Instant.MAX;
+ feedStatus = Pair.of(Instant.now(), FEED_INIT_STATUS);
}
@Override
public synchronized void notify(ActiveEvent event) {
try {
+ LOGGER.finer("EventListener is notified.");
switch (event.getEventKind()) {
case JOB_STARTED:
start(event);
@@ -102,6 +110,10 @@
if (numRegistered == getSources().length) {
state = ActivityState.STARTED;
}
+ } else if (message.getEvent() ==
ActivePartitionMessage.ACTIVE_RUNTIME_STATUS_REQUEST) {
+ synchronized (feedStatus) {
+ feedStatus = (Pair<Instant, String>) message.getPayload();
+ }
}
}
@@ -116,6 +128,7 @@
private void start(ActiveEvent event) {
this.jobId = event.getJobId();
state = ActivityState.STARTING;
+ startedTime = Instant.now();
}
@Override
@@ -129,18 +142,33 @@
} else if (this.state == state) {
return NoOpSubscriber.INSTANCE;
}
- return doSubscribe(state);
+ FeedEventSubscriber subscriber = new FeedEventSubscriber(this,
state);
+ subscribers.add(subscriber);
+ return subscriber;
}
}
- // Called within synchronized block
- private FeedEventSubscriber doSubscribe(ActivityState state) {
- FeedEventSubscriber subscriber = new FeedEventSubscriber(this, state);
- subscribers.add(subscriber);
- return subscriber;
+ @Override
+ public IActiveEventSubscriber subscribe(ActiveEvent event) throws
HyracksDataException {
+ synchronized (this) {
+ if (this.state == ActivityState.FAILED) {
+ throw new HyracksDataException("Feed has failed");
+ }
+ FeedEventSubscriber subscriber = new FeedEventSubscriber(this,
event);
+ subscribers.add(subscriber);
+ return subscriber;
+ }
}
public String[] getSources() {
return sources;
}
+
+ public Instant getStartedTime() {
+ return startedTime;
+ }
+
+ public Pair<Instant, String> getFeedStatus() {
+ return feedStatus;
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
index c71b8a2..6b4e7d8 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterExecutor.java
@@ -19,9 +19,13 @@
package org.apache.asterix.external.feed.runtime;
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.log4j.Logger;
+
+import java.io.Serializable;
+import java.time.Instant;
/**
* The class in charge of executing feed adapters.
@@ -34,6 +38,7 @@
// increase or decrease at any time)
private final FeedAdapter adapter; // The adapter
private final AdapterRuntimeManager adapterManager;// The runtime manager
<-- two way visibility -->
+ private int restartCount = 0;
public AdapterExecutor(IFrameWriter writer, FeedAdapter adapter,
AdapterRuntimeManager adapterManager) {
this.writer = writer;
@@ -81,8 +86,13 @@
LOGGER.error("Exception during feed ingestion ", e);
continueIngestion = adapter.handleException(e);
failedIngestion = !continueIngestion;
+ restartCount++;
}
}
return failedIngestion;
}
+
+ public Pair<Instant, String> getStatus() {
+ return Pair.of(Instant.now(), adapter.getStatus() + " Executor restart
times " + restartCount + ".");
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
index 6214d9f..3bd83df 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.external.feed.runtime;
+import java.io.Serializable;
+import java.time.Instant;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
@@ -25,6 +27,7 @@
import org.apache.asterix.active.EntityId;
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -137,4 +140,8 @@
public void setDone(boolean done) {
this.done = done;
}
+
+ public Pair<Instant, String> getStatus() {
+ return adapterExecutor.getStatus();
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
deleted file mode 100644
index 590af01..0000000
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.runtime;
-
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.IActiveRuntime;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public class IngestionRuntime implements IActiveRuntime {
-
- private static final Logger LOGGER =
Logger.getLogger(IngestionRuntime.class.getName());
-
- private final AdapterRuntimeManager adapterRuntimeManager;
- private final ActiveRuntimeId runtimeId;
- private final EntityId feedId;
-
- public IngestionRuntime(EntityId entityId, ActiveRuntimeId runtimeId,
AdapterRuntimeManager adaptorRuntimeManager) {
- this.feedId = entityId;
- this.runtimeId = runtimeId;
- this.adapterRuntimeManager = adaptorRuntimeManager;
- }
-
- @Override
- public ActiveRuntimeId getRuntimeId() {
- return this.runtimeId;
- }
-
- public void start() {
- adapterRuntimeManager.start();
- LOGGER.log(Level.INFO, "Feed " + feedId.getEntityName() + " running on
partition " + runtimeId);
- }
-
- @Override
- public void stop() throws InterruptedException, HyracksDataException {
- adapterRuntimeManager.stop();
- LOGGER.log(Level.INFO, "Feed " + feedId.getEntityName() + " stopped on
partition " + runtimeId);
- }
-
- public EntityId getFeedId() {
- return feedId;
- }
-}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedEventSubscriber.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedEventSubscriber.java
index 0e931f7..66b1c64 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedEventSubscriber.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/FeedEventSubscriber.java
@@ -19,28 +19,52 @@
package org.apache.asterix.external.feed.watch;
import org.apache.asterix.active.ActiveEvent;
+import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.IActiveEventSubscriber;
import org.apache.asterix.external.feed.management.FeedEventsListener;
+import org.codehaus.jackson.node.ObjectNode;
+
+import java.io.Serializable;
public class FeedEventSubscriber implements IActiveEventSubscriber {
private final FeedEventsListener listener;
private final ActivityState state;
+ private final ActiveEvent event;
private boolean done = false;
+ private Object eventPayload;
public FeedEventSubscriber(FeedEventsListener listener, ActivityState
state) {
this.listener = listener;
this.state = state;
+ this.event = null;
+ }
+ public FeedEventSubscriber(FeedEventsListener listener, ActiveEvent event)
{
+ this.listener = listener;
+ this.event = event;
+ this.state = null;
}
@Override
- public synchronized void notify(ActiveEvent event) {
- if (listener.getState() == state || listener.getState() ==
ActivityState.FAILED
- || listener.getState() == ActivityState.STOPPED) {
- done = true;
- notifyAll();
+ public synchronized void notify(ActiveEvent activeEvent) {
+ if (state == null) {
+ // monitor event
+ if (event.equals(activeEvent)) {
+ if (activeEvent.getEventObject() != null) {
+ this.eventPayload = activeEvent.getEventObject();
+ }
+ done = true;
+ notifyAll();
+ }
+ } else {
+ // monitor state
+ if (listener.getState() == state || listener.getState() ==
ActivityState.FAILED
+ || listener.getState() == ActivityState.STOPPED) {
+ done = true;
+ notifyAll();
+ }
}
}
@@ -61,4 +85,8 @@
done = true;
notifyAll();
}
+
+ public Object getEventPayload() {
+ return eventPayload;
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 8a7bda9..ba310a5 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -27,6 +27,7 @@
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -35,6 +36,9 @@
import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
import org.apache.hyracks.dataflow.common.utils.TaskUtil;
+
+import java.io.Serializable;
+import java.time.Instant;
/**
* The runtime for @see{FeedIntakeOperationDescriptor}.
@@ -105,4 +109,13 @@
adapterRuntimeManager.stop();
}
}
+
+ @Override
+ public Pair<Instant, String> getStatus() {
+ if (adapterRuntimeManager != null) {
+ return adapterRuntimeManager.getStatus();
+ } else {
+ return Pair.of(Instant.now(), "Adapter status is not available.");
+ }
+ }
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1748
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I46b48b52a1c9906510c5bdce778d1672845f75ca
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>