abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1875
Change subject: WIP
......................................................................
WIP
Change-Id: Ifeac8c73e6bad39a13663b84a52121356e3c6b40
---
M asterixdb/asterix-active/pom.xml
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
D
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
R
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveNotificationHandler.java
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
R
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IAdapterFactory.java
R
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IDataSourceAdapter.java
A
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
A
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicyFactory.java
A
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
R
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
A
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
M
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
53 files changed, 775 insertions(+), 477 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/75/1875/1
diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml
index 3dd24b6..fe5efbe 100644
--- a/asterixdb/asterix-active/pom.xml
+++ b/asterixdb/asterix-active/pom.xml
@@ -31,6 +31,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-om</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
index 1141912..434801a 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java
@@ -30,7 +30,8 @@
JOB_FINISHED,
PARTITION_EVENT,
EXTENSION_EVENT,
- STATS_UPDATED
+ STATS_UPDATED,
+ FALURE
}
private final JobId jobId;
@@ -43,10 +44,6 @@
this.entityId = entityId;
this.eventKind = eventKind;
this.eventObject = eventObject;
- }
-
- public ActiveEvent(JobId jobId, Kind eventKind, EntityId entityId) {
- this(jobId, eventKind, entityId, null);
}
public JobId getJobId() {
@@ -79,8 +76,8 @@
return true;
}
ActiveEvent other = (ActiveEvent) o;
- return Objects.equals(entityId, other.entityId) &&
Objects.equals(eventKind, other.eventKind) && Objects
- .equals(eventObject, other.eventObject);
+ return Objects.equals(entityId, other.entityId) &&
Objects.equals(eventKind, other.eventKind)
+ && Objects.equals(eventObject, other.eventObject);
}
@Override
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
deleted file mode 100644
index 86c3e7d..0000000
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveLifecycleListener.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveEvent.Kind;
-import org.apache.asterix.active.message.ActivePartitionMessage;
-import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.api.job.IJobLifecycleListener;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class ActiveLifecycleListener implements IJobLifecycleListener {
-
- private static final Logger LOGGER =
Logger.getLogger(ActiveLifecycleListener.class.getName());
-
- private final ActiveJobNotificationHandler notificationHandler;
- private final LinkedBlockingQueue<ActiveEvent> jobEventInbox;
- private final ExecutorService executorService;
-
- public ActiveLifecycleListener() {
- notificationHandler = new ActiveJobNotificationHandler();
- jobEventInbox = notificationHandler.getEventInbox();
- executorService = Executors.newSingleThreadExecutor();
- executorService.execute(notificationHandler);
- }
-
- @Override
- public synchronized void notifyJobStart(JobId jobId) throws
HyracksException {
- EntityId entityId = notificationHandler.getEntity(jobId);
- if (entityId != null) {
- jobEventInbox.add(new ActiveEvent(jobId, Kind.JOB_STARTED,
entityId));
- }
- }
-
- @Override
- public synchronized void notifyJobFinish(JobId jobId) throws
HyracksException {
- EntityId entityId = notificationHandler.getEntity(jobId);
- if (entityId != null) {
- jobEventInbox.add(new ActiveEvent(jobId, Kind.JOB_FINISHED,
entityId));
- } else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
- }
- }
- }
-
- @Override
- public void notifyJobCreation(JobId jobId, JobSpecification spec) throws
HyracksException {
- notificationHandler.notifyJobCreation(jobId, spec);
- }
-
- public void receive(ActivePartitionMessage message) {
- jobEventInbox.add(new ActiveEvent(message.getJobId(),
Kind.PARTITION_EVENT,
- message.getActiveRuntimeId().getEntityId(), message));
- }
-
- public void stop() {
- executorService.shutdown();
- }
-
- public ActiveJobNotificationHandler getNotificationHandler() {
- return notificationHandler;
- }
-}
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveNotificationHandler.java
similarity index 65%
rename from
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
rename to
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveNotificationHandler.java
index d2b8a89..0853144 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveNotificationHandler.java
@@ -19,50 +19,61 @@
package org.apache.asterix.active;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.active.ActiveEvent.Kind;
+import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.IJobLifecycleListener;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
-public class ActiveJobNotificationHandler implements Runnable {
- public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob";
- private static final Logger LOGGER =
Logger.getLogger(ActiveJobNotificationHandler.class.getName());
+public class ActiveNotificationHandler implements IJobLifecycleListener,
Runnable {
+
+ private static final Logger LOGGER =
Logger.getLogger(ActiveNotificationHandler.class.getName());
private static final boolean DEBUG = false;
+ public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob";
private final LinkedBlockingQueue<ActiveEvent> eventInbox;
private final Map<EntityId, IActiveEntityEventsListener>
entityEventListeners;
- private final Map<JobId, EntityId> jobId2ActiveJobInfos;
+ private final Map<JobId, EntityId> jobId2EntityId;
+ private final ExecutorService executorService;
- public ActiveJobNotificationHandler() {
- this.eventInbox = new LinkedBlockingQueue<>();
- this.jobId2ActiveJobInfos = new HashMap<>();
- this.entityEventListeners = new HashMap<>();
+ public ActiveNotificationHandler() {
+ eventInbox = new LinkedBlockingQueue<>();
+ jobId2EntityId = new HashMap<>();
+ entityEventListeners = new HashMap<>();
+ executorService = Executors.newSingleThreadExecutor();
+ executorService.execute(this);
}
@Override
public void run() {
-
Thread.currentThread().setName(ActiveJobNotificationHandler.class.getSimpleName());
- LOGGER.log(Level.INFO, "Started " +
ActiveJobNotificationHandler.class.getSimpleName());
+
Thread.currentThread().setName(ActiveNotificationHandler.class.getSimpleName());
+ LOGGER.log(Level.INFO, "Started " +
ActiveNotificationHandler.class.getSimpleName());
while (!Thread.interrupted()) {
try {
- ActiveEvent event = getEventInbox().take();
- EntityId entityId = jobId2ActiveJobInfos.get(event.getJobId());
+ ActiveEvent event = eventInbox.take();
+ EntityId entityId = jobId2EntityId.get(event.getJobId());
if (entityId != null) {
IActiveEntityEventsListener listener =
entityEventListeners.get(entityId);
LOGGER.log(Level.FINER, "Next event is of type " +
event.getEventKind());
if (event.getEventKind() == Kind.JOB_FINISHED) {
LOGGER.log(Level.FINER, "Removing the job");
- jobId2ActiveJobInfos.remove(event.getJobId());
+ jobId2EntityId.remove(event.getJobId());
}
if (listener != null) {
LOGGER.log(Level.FINER, "Notifying the listener");
listener.notify(event);
}
-
} else {
LOGGER.log(Level.SEVERE, "Entity not found for received
message for job " + event.getJobId());
}
@@ -72,15 +83,40 @@
LOGGER.log(Level.SEVERE, "Error handling an active job event",
e);
}
}
- LOGGER.log(Level.INFO, "Stopped " +
ActiveJobNotificationHandler.class.getSimpleName());
+ LOGGER.log(Level.INFO, "Stopped " +
ActiveNotificationHandler.class.getSimpleName());
}
- public synchronized void removeListener(IActiveEntityEventsListener
listener) throws HyracksDataException {
- LOGGER.log(Level.FINER, "Removing the listener since it is not active
anymore");
- unregisterListener(listener);
+ @Override
+ public synchronized void notifyJobStart(JobId jobId) throws
HyracksException {
+ EntityId entityId = getEntity(jobId);
+ if (entityId != null) {
+ eventInbox.add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId,
null));
+ }
}
- public IActiveEntityEventsListener getActiveEntityListener(EntityId
entityId) {
+ @Override
+ public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus,
List<Exception> exceptions)
+ throws HyracksException {
+ EntityId entityId = getEntity(jobId);
+ if (entityId != null) {
+ eventInbox.add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId,
Pair.of(jobStatus, exceptions)));
+ } else {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
+ }
+ }
+ }
+
+ public void receive(ActivePartitionMessage message) {
+ eventInbox.add(new ActiveEvent(message.getJobId(),
Kind.PARTITION_EVENT,
+ message.getActiveRuntimeId().getEntityId(), message));
+ }
+
+ public void stop() {
+ executorService.shutdown();
+ }
+
+ public IActiveEntityEventsListener getListener(EntityId entityId) {
if (DEBUG) {
LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId
entityId) was called with entity " + entityId);
IActiveEntityEventsListener listener =
entityEventListeners.get(entityId);
@@ -90,10 +126,11 @@
}
public EntityId getEntity(JobId jobId) {
- return jobId2ActiveJobInfos.get(jobId);
+ return jobId2EntityId.get(jobId);
}
- public void notifyJobCreation(JobId jobId, JobSpecification
jobSpecification) {
+ @Override
+ public void notifyJobCreation(JobId jobId, JobSpecification
jobSpecification) throws HyracksDataException {
LOGGER.log(Level.FINER,
"notifyJobCreation(JobId jobId, JobSpecification
jobSpecification) was called with jobId = " + jobId);
Object property =
jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
@@ -103,18 +140,11 @@
}
EntityId entityId = (EntityId) property;
monitorJob(jobId, entityId);
- boolean found = jobId2ActiveJobInfos.get(jobId) != null;
+ boolean found = jobId2EntityId.get(jobId) != null;
LOGGER.log(Level.FINER, "Job was found to be: " + (found ? "Active" :
"Inactive"));
- IActiveEntityEventsListener listener =
entityEventListeners.get(entityId);
- if (listener != null) {
- // It is okay to bypass the event inbox in this case because we
know this is the first event for this entity
- listener.notify(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId,
jobSpecification));
+ if (!eventInbox.offer(new ActiveEvent(jobId, Kind.JOB_CREATED,
entityId, jobSpecification))) {
+ throw new HyracksDataException("Full active event inbox");
}
- LOGGER.log(Level.FINER, "Listener was notified" + jobId);
- }
-
- public LinkedBlockingQueue<ActiveEvent> getEventInbox() {
- return eventInbox;
}
public synchronized IActiveEntityEventsListener[] getEventListeners() {
@@ -147,14 +177,14 @@
}
}
- public synchronized void monitorJob(JobId jobId, EntityId activeJob) {
+ public synchronized void monitorJob(JobId jobId, EntityId entityId) {
if (DEBUG) {
LOGGER.log(Level.WARNING, "monitorJob(JobId jobId, ActiveJob
activeJob) called with job id: " + jobId);
- boolean found = jobId2ActiveJobInfos.get(jobId) != null;
+ boolean found = jobId2EntityId.get(jobId) != null;
LOGGER.log(Level.WARNING, "Job was found to be: " + (found ?
"Active" : "Inactive"));
}
- if (entityEventListeners.containsKey(activeJob)) {
- if (jobId2ActiveJobInfos.containsKey(jobId)) {
+ if (entityEventListeners.containsKey(entityId)) {
+ if (jobId2EntityId.containsKey(jobId)) {
LOGGER.severe("Job is already being monitored for job: " +
jobId);
return;
}
@@ -162,8 +192,8 @@
LOGGER.log(Level.WARNING, "monitoring started for job id: " +
jobId);
}
} else {
- LOGGER.severe("No listener was found for the entity: " +
activeJob);
+ LOGGER.severe("No listener was found for the entity: " + entityId);
}
- jobId2ActiveJobInfos.put(jobId, activeJob);
+ jobId2EntityId.put(jobId, entityId);
}
}
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
index af8f5ca..1430eb8 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java
@@ -20,27 +20,36 @@
public enum ActivityState {
/**
- * The initial state of an activity.
- */
- CREATED,
- /**
- * The starting state and a possible terminal state. Next state can only
be {@code ActivityState.STARTING}
+ * The starting state and a possible terminal state. Next state can only
be {@code ActivityState.CREATED}
*/
STOPPED,
/**
- * A terminal state
+ * An unexpected failure caused the activity to stop
*/
FAILED,
/**
- * An intermediate state. Next state can only be {@code
ActivityState.STARTED} or {@code ActivityState.FAILED}
+ * An intermediate state. Next state can only be {@code
ActivityState.RUNNING} or {@code ActivityState.FAILED}
*/
STARTING,
/**
- * An intermediate state. Next state can only be {@code
ActivityState.STOPPING} or {@code ActivityState.FAILED}
+ * An intermediate state. Next state can only be
+ * {@code ActivityState.STOPPING}, {@code ActivityState.SUSPENDING}, or
{@code ActivityState.FAILED}
*/
- STARTED,
+ RUNNING,
/**
* An intermediate state. Next state can only be {@code
ActivityState.STOPPED} or {@code ActivityState.FAILED}
*/
- STOPPING
+ STOPPING,
+ /**
+ * An intermediate state. Next state can only be {@code
ActivityState.SUSPENDED} or {@code ActivityState.FAILED}
+ */
+ SUSPENDING,
+ /**
+ * An intermediate state. Next state can only be {@code
ActivityState.STOPPED} or {@code ActivityState.RESUMING}
+ */
+ SUSPENDED,
+ /**
+ * An intermediate state. Next state can only be {@code
ActivityState.RUNNING} or {@code ActivityState.FAILED}
+ */
+ RESUMING
}
\ No newline at end of file
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index 4bc02f3..9cbb2c1 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -79,4 +79,23 @@
* @throws HyracksDataException
*/
void refreshStats(long timeout) throws HyracksDataException;
+
+ /**
+ * resume the activity
+ *
+ * @throws HyracksDataException
+ */
+ void resume() throws HyracksDataException;
+
+ /**
+ * indicates that a dataset is no longer being used by this active entity
+ *
+ * @param dataset
+ */
+ void remove(IDataset dataset);
+
+ /**
+ * @return true, if entity is active, false otherwise
+ */
+ boolean isActive();
}
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
index 69f7f1c..da7be77 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEventSubscriber.java
@@ -43,14 +43,9 @@
/**
* Wait until the terminal event has been received
*
- * @throws InterruptedException
+ * @throws Exception
*/
- void sync() throws InterruptedException;
-
- /**
- * Stop watching events
- */
- void unsubscribe();
+ void sync() throws Exception;
/**
* callback upon successful subscription
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IAdapterFactory.java
similarity index 98%
rename from
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
rename to
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IAdapterFactory.java
index 40bc7d8..1344f5f 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IAdapterFactory.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.api;
+package org.apache.asterix.active;
import java.io.Serializable;
import java.util.Map;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IDataSourceAdapter.java
similarity index 96%
rename from
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
rename to
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IDataSourceAdapter.java
index 48df79b..349c7d7 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IDataSourceAdapter.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.api;
+package org.apache.asterix.active;
import java.io.Serializable;
@@ -29,6 +29,7 @@
* to be implemented by each adapter irrespective of the the kind of
* adapter(pull or push).
*/
+@FunctionalInterface
public interface IDataSourceAdapter extends Serializable {
public enum AdapterType {
@@ -38,6 +39,7 @@
/**
* Triggers the adapter to begin ingesting data from the external source.
+ *
* @param partition
* The adapter could be running with a degree of parallelism.
* partition corresponds to the i'th parallel instance.
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
new file mode 100644
index 0000000..a010984
--- /dev/null
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicy.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+@FunctionalInterface
+public interface IRetryPolicy {
+ /**
+ * @return true if one more attempt should be done
+ */
+ boolean retry();
+}
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicyFactory.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicyFactory.java
new file mode 100644
index 0000000..eae568c
--- /dev/null
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IRetryPolicyFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+@FunctionalInterface
+public interface IRetryPolicyFactory {
+ /**
+ * @return an instance of retry policy
+ */
+ IRetryPolicy create();
+}
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
new file mode 100644
index 0000000..af6be01
--- /dev/null
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/NoRetryPolicyFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.active;
+
+public class NoRetryPolicyFactory implements IRetryPolicyFactory {
+ public static final NoRetryPolicyFactory INSTANCE = new
NoRetryPolicyFactory();
+ private static final IRetryPolicy policy = () -> false;
+
+ private NoRetryPolicyFactory() {
+ }
+
+ @Override
+ public IRetryPolicy create() {
+ return policy;
+ }
+}
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index 9391044..e0388c2 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -21,7 +21,7 @@
import java.io.Serializable;
import java.util.Objects;
-import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.active.ActiveNotificationHandler;
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
@@ -64,7 +64,7 @@
@Override
public void handle(ICcApplicationContext appCtx) throws
HyracksDataException, InterruptedException {
- ActiveLifecycleListener activeListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
+ ActiveNotificationHandler activeListener = (ActiveNotificationHandler)
appCtx.getActiveNotificationHandler();
activeListener.receive(this);
}
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
index 19f0dcc..02ea321 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutor.java
@@ -23,6 +23,8 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.lang.common.statement.Query;
@@ -164,4 +166,14 @@
*/
String getActiveDataverseName(String dataverse);
+ /**
+ * @return the storage component provider
+ */
+ IStorageComponentProvider getComponentProvider();
+
+ /**
+ * @return the application context
+ */
+ ICcApplicationContext getAppCtx();
+
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
index 593faa6..ed8d430 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveStatsApiServlet.java
@@ -18,14 +18,12 @@
*/
package org.apache.asterix.api.http.server;
-import java.io.IOException;
import java.io.PrintWriter;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.active.ActiveNotificationHandler;
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.external.feed.watch.StatsSubscriber;
@@ -38,19 +36,21 @@
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.netty.handler.codec.http.HttpResponseStatus;
+
public class ActiveStatsApiServlet extends AbstractServlet {
private static final Logger LOGGER =
Logger.getLogger(ActiveStatsApiServlet.class.getName());
private static final int DEFAULT_EXPIRE_TIME = 2000;
- private final ActiveLifecycleListener activeLifecycleListener;
+ private final ActiveNotificationHandler activeLifecycleListener;
public ActiveStatsApiServlet(ConcurrentMap<String, Object> ctx, String[]
paths, ICcApplicationContext appCtx) {
super(ctx, paths);
- this.activeLifecycleListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
+ this.activeLifecycleListener = (ActiveNotificationHandler)
appCtx.getActiveNotificationHandler();
}
private JsonNode constructNode(ObjectMapper om,
IActiveEntityEventsListener eventListener, long currentTime,
- long ttl) throws InterruptedException, IOException {
+ long ttl) throws Exception {
long statsTimeStamp = eventListener.getStatsTimeStamp();
if (currentTime - statsTimeStamp > ttl) {
StatsSubscriber subscriber = new StatsSubscriber(eventListener);
@@ -66,7 +66,7 @@
// Obtain all feed status
String localPath = localPath(request);
int expireTime;
- IActiveEntityEventsListener[] listeners =
activeLifecycleListener.getNotificationHandler().getEventListeners();
+ IActiveEntityEventsListener[] listeners =
activeLifecycleListener.getEventListeners();
ObjectMapper om = new ObjectMapper();
om.enable(SerializationFeature.INDENT_OUTPUT);
ObjectNode resNode = om.createObjectNode();
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
similarity index 65%
rename from
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
rename to
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 3216bfe..63ac5ab 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/ActiveEntityEventsListener.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.feed.management;
+package org.apache.asterix.app.active;
import java.util.ArrayList;
import java.util.Arrays;
@@ -27,21 +27,24 @@
import org.apache.asterix.active.ActiveEvent;
import org.apache.asterix.active.ActiveEvent.Kind;
-import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.active.ActiveNotificationHandler;
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.active.IActiveEventSubscriber;
+import org.apache.asterix.active.IRetryPolicyFactory;
import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.active.message.StatsRequestMessage;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.messaging.api.INcAddressedMessage;
import org.apache.asterix.common.metadata.IDataset;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.commons.lang3.tuple.Pair;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -51,41 +54,53 @@
public class ActiveEntityEventsListener implements IActiveEntityEventsListener
{
private static final Logger LOGGER =
Logger.getLogger(ActiveEntityEventsListener.class.getName());
-
- enum RequestState {
- INIT,
- STARTED,
- FINISHED
- }
-
- // members
- protected volatile ActivityState state;
- protected JobId jobId;
+ // finals
+ protected final ActiveNotificationHandler handler;
protected final List<IActiveEventSubscriber> subscribers = new
ArrayList<>();
- protected final ICcApplicationContext appCtx;
+ protected final IStatementExecutor statementExecutor;
+ protected final MetadataProvider metadataProvider;
+ protected final IHyracksClientConnection hcc;
protected final EntityId entityId;
- protected final List<IDataset> datasets;
+ protected final List<? extends IDataset> datasets;
protected final ActiveEvent statsUpdatedEvent;
+ protected final String runtimeName;
+ protected final IRetryPolicyFactory retryPolicyFactory;
+ // mutables
+ protected volatile ActivityState state;
+ protected AlgebricksAbsolutePartitionConstraint locations;
+ protected ActivityState prevState;
+ protected JobId jobId;
protected long statsTimestamp;
protected String stats;
- protected RequestState statsRequestState;
- protected final String runtimeName;
- protected final AlgebricksAbsolutePartitionConstraint locations;
+ protected boolean isFetchingStats;
protected int numRegistered;
- public ActiveEntityEventsListener(ICcApplicationContext appCtx, EntityId
entityId, List<IDataset> datasets,
- AlgebricksAbsolutePartitionConstraint locations, String
runtimeName) {
- this.appCtx = appCtx;
+ public ActiveEntityEventsListener(IStatementExecutor statementExecutor,
IHyracksClientConnection hcc,
+ EntityId entityId, List<? extends IDataset> datasets,
AlgebricksAbsolutePartitionConstraint locations,
+ String runtimeName, IRetryPolicyFactory retryPolicyFactory) throws
HyracksDataException {
+ this.statementExecutor = statementExecutor;
+ this.metadataProvider =
+ new MetadataProvider(statementExecutor.getAppCtx(), null,
statementExecutor.getComponentProvider());
+ this.hcc = hcc;
this.entityId = entityId;
this.datasets = datasets;
+ this.retryPolicyFactory = retryPolicyFactory;
this.state = ActivityState.STOPPED;
this.statsTimestamp = -1;
- this.statsRequestState = RequestState.INIT;
- this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED,
entityId);
+ this.isFetchingStats = false;
+ this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED,
entityId, null);
this.stats = "{\"Stats\":\"N/A\"}";
this.runtimeName = runtimeName;
this.locations = locations;
this.numRegistered = 0;
+ this.handler =
+ ((ActiveNotificationHandler)
metadataProvider.getApplicationContext().getActiveNotificationHandler());
+ handler.registerListener(this);
+ }
+
+ protected void setState(ActivityState newState) {
+ this.prevState = state;
+ this.state = newState;
}
@Override
@@ -95,13 +110,13 @@
ActiveEvent.Kind eventKind = event.getEventKind();
switch (eventKind) {
case JOB_CREATED:
- state = ActivityState.CREATED;
+ jobCreated(event);
break;
case JOB_STARTED:
start(event);
break;
case JOB_FINISHED:
- finish();
+ finish(event);
break;
case PARTITION_EVENT:
handle((ActivePartitionMessage) event.getEventObject());
@@ -116,26 +131,29 @@
}
}
+ protected synchronized void jobCreated(ActiveEvent event) {
+ setState(ActivityState.STARTING);
+ }
+
protected synchronized void handle(ActivePartitionMessage message) {
if (message.getEvent() ==
ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) {
numRegistered++;
if (numRegistered == locations.getLocations().length) {
- state = ActivityState.STARTED;
+ setState(ActivityState.RUNNING);
}
}
}
- private void finish() throws Exception {
- IHyracksClientConnection hcc = appCtx.getHcc();
- JobStatus status = hcc.getJobStatus(jobId);
- state = status.equals(JobStatus.FAILURE) ? ActivityState.FAILED :
ActivityState.STOPPED;
- ActiveLifecycleListener activeLcListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
- activeLcListener.getNotificationHandler().removeListener(this);
+ @SuppressWarnings("unchecked")
+ protected void finish(ActiveEvent event) throws HyracksDataException {
+ jobId = null;
+ Pair<JobStatus, List<Exception>> status = (Pair<JobStatus,
List<Exception>>) event.getEventObject();
+ setState(status.getLeft().equals(JobStatus.FAILURE) ?
ActivityState.FAILED : ActivityState.STOPPED);
}
- private void start(ActiveEvent event) {
+ protected void start(ActiveEvent event) {
this.jobId = event.getJobId();
- state = ActivityState.STARTING;
+ numRegistered = 0;
}
@Override
@@ -161,7 +179,12 @@
@Override
public boolean isEntityUsingDataset(IDataset dataset) {
- return datasets.contains(dataset);
+ return isActive() && datasets.contains(dataset);
+ }
+
+ @Override
+ public void remove(IDataset dataset) {
+ datasets.remove(dataset);
}
public JobId getJobId() {
@@ -193,15 +216,16 @@
public void refreshStats(long timeout) throws HyracksDataException {
LOGGER.log(Level.INFO, "refreshStats called");
synchronized (this) {
- if (state != ActivityState.STARTED || statsRequestState ==
RequestState.STARTED) {
- LOGGER.log(Level.INFO, "returning immediately since state = "
+ state + " and statsRequestState = "
- + statsRequestState);
+ if (state != ActivityState.RUNNING || isFetchingStats) {
+ LOGGER.log(Level.INFO,
+ "returning immediately since state = " + state + " and
fetchingStats = " + isFetchingStats);
return;
} else {
- statsRequestState = RequestState.STARTED;
+ isFetchingStats = true;
}
}
- ICCMessageBroker messageBroker = (ICCMessageBroker)
appCtx.getServiceContext().getMessageBroker();
+ ICCMessageBroker messageBroker =
+ (ICCMessageBroker)
metadataProvider.getApplicationContext().getServiceContext().getMessageBroker();
long reqId = messageBroker.newRequestId();
List<INcAddressedMessage> requests = new ArrayList<>();
List<String> ncs = Arrays.asList(locations.getLocations());
@@ -217,8 +241,7 @@
} catch (Exception e) {
throw HyracksDataException.create(e);
}
- // Same as above
- statsRequestState = RequestState.FINISHED;
+ isFetchingStats = false;
}
protected synchronized void notifySubscribers(ActiveEvent event) {
@@ -245,4 +268,29 @@
return locations;
}
+ public void suspend() throws HyracksDataException {
+ throw new HyracksDataException("Unsupported");
+ }
+
+ @Override
+ public void resume() throws HyracksDataException {
+ throw new HyracksDataException("Unsupported");
+ }
+
+ @Override
+ public boolean isActive() {
+ return state != ActivityState.STOPPED && state != ActivityState.FAILED;
+ }
+
+ public void unregister() throws HyracksDataException {
+ handler.unregisterListener(this);
+ }
+
+ public void start(MetadataProvider metadataProvider) throws
HyracksDataException {
+ throw new HyracksDataException("Unsupported");
+ }
+
+ public void stop(MetadataProvider metadataProvider) throws
HyracksDataException {
+ throw new HyracksDataException("Unsupported");
+ }
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
new file mode 100644
index 0000000..9dd1f54
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.active;
+
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActiveNotificationHandler;
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IActiveEventSubscriber;
+import org.apache.asterix.active.IRetryPolicyFactory;
+import org.apache.asterix.common.metadata.IDataset;
+import org.apache.asterix.common.utils.JobUtils;
+import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
+import org.apache.asterix.lang.common.statement.StartFeedStatement;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.utils.FeedOperations;
+import org.apache.commons.lang3.tuple.Pair;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobSpecification;
+
+public class FeedEventsListener extends ActiveEntityEventsListener {
+
+ private static final Logger LOGGER =
Logger.getLogger(FeedEventsListener.class.getName());
+ private final Feed feed;
+ private final List<FeedConnection> feedConnections;
+
+ public FeedEventsListener(IStatementExecutor statementExecutor,
IHyracksClientConnection hcc, EntityId entityId,
+ List<? extends IDataset> datasets,
AlgebricksAbsolutePartitionConstraint locations, String runtimeName,
+ IRetryPolicyFactory retryPolicyFactory, Feed feed, final
List<FeedConnection> feedConnections)
+ throws HyracksDataException {
+ super(statementExecutor, hcc, entityId, datasets, locations,
runtimeName, retryPolicyFactory);
+ this.feed = feed;
+ this.feedConnections = feedConnections;
+ }
+
+ @Override
+ public synchronized void remove(IDataset dataset) {
+ super.remove(dataset);
+ feedConnections.removeIf(o ->
o.getDataverseName().equals(dataset.getDataverseName())
+ && o.getDatasetName().equals(dataset.getDatasetName()));
+ }
+
+ @Override
+ public synchronized void start(MetadataProvider metadataProvider) throws
HyracksDataException {
+ if (state != ActivityState.FAILED && state != ActivityState.STOPPED) {
+ throw new HyracksDataException("Feed " + entityId.getEntityName()
+ " is started already.");
+ }
+ try {
+ setState(ActivityState.STARTING);
+ Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint>
jobInfo =
+ FeedOperations.buildStartFeedJob(statementExecutor,
metadataProvider, feed, feedConnections, hcc);
+ JobSpecification feedJob = jobInfo.getLeft();
+ IActiveEventSubscriber eventSubscriber = new
WaitForStateSubscriber(this, ActivityState.RUNNING);
+
feedJob.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME,
entityId);
+ // TODO(Yingyi): currently we do not check IFrameWriter protocol
violations for Feed jobs.
+ // We will need to design general exception handling mechanism for
feeds.
+ locations = jobInfo.getRight();
+ JobUtils.runJob(hcc, feedJob,
+
Boolean.valueOf(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION)));
+ eventSubscriber.sync();
+ } catch (Exception e) {
+ setState(ActivityState.FAILED);
+ throw HyracksDataException.create(e);
+ }
+ LOGGER.log(Level.INFO, "Submitted");
+ }
+
+ @Override
+ public synchronized void stop(MetadataProvider metadataProvider) throws
HyracksDataException {
+ try {
+ if (state == ActivityState.STOPPED) {
+ throw new HyracksDataException("Feed " +
entityId.getEntityName() + " is not started.");
+ }
+ if (jobId == null) {
+ setState(ActivityState.STOPPED);
+ return;
+ }
+ IActiveEventSubscriber eventSubscriber = new
WaitForStateSubscriber(this, ActivityState.STOPPED);
+ // Construct ActiveMessage
+ for (int i = 0; i < getLocations().getLocations().length; i++) {
+ String intakeLocation = getLocations().getLocations()[i];
+
FeedOperations.SendStopMessageToNode(metadataProvider.getApplicationContext(),
entityId, intakeLocation,
+ i);
+ }
+ eventSubscriber.sync();
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public Feed getFeed() {
+ return feed;
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
index 0e72976..bbfe680 100755
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryUtils.java
@@ -33,11 +33,11 @@
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;
+import org.apache.asterix.active.IDataSourceAdapter;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.external.api.IDataSourceAdapter;
import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
import org.apache.asterix.external.library.ExternalLibrary;
import org.apache.asterix.external.library.LibraryAdapter;
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 2967a38..0f3b005 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -41,17 +41,18 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.active.ActiveNotificationHandler;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveEntityEventsListener;
-import org.apache.asterix.active.IActiveEventSubscriber;
+import org.apache.asterix.active.NoRetryPolicyFactory;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.api.common.APIFramework;
import org.apache.asterix.api.http.server.AbstractQueryApiServlet;
import org.apache.asterix.api.http.server.ApiServlet;
import org.apache.asterix.api.http.server.ResultUtil;
+import org.apache.asterix.app.active.ActiveEntityEventsListener;
+import org.apache.asterix.app.active.FeedEventsListener;
import org.apache.asterix.app.result.ResultHandle;
import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.common.config.ClusterProperties;
@@ -71,16 +72,12 @@
import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.common.utils.JobUtils;
import org.apache.asterix.common.utils.JobUtils.ProgressState;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.external.feed.management.ActiveEntityEventsListener;
import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.file.StorageComponentProvider;
import org.apache.asterix.formats.nontagged.TypeTraitProvider;
import org.apache.asterix.lang.common.base.IReturningStatement;
import org.apache.asterix.lang.common.base.IRewriterFactory;
@@ -178,7 +175,6 @@
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Triple;
-import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
@@ -210,7 +206,7 @@
public static final boolean IS_DEBUG_MODE = false;// true
protected final List<Statement> statements;
- protected final ICcApplicationContext appCtx;
+ private final ICcApplicationContext appCtx;
protected final SessionOutput sessionOutput;
protected final SessionConfig sessionConfig;
protected Dataverse activeDataverse;
@@ -693,8 +689,8 @@
protected static void
validateIfResourceIsActiveInFeed(ICcApplicationContext appCtx, Dataset dataset)
throws CompilationException {
StringBuilder builder = null;
- ActiveLifecycleListener activeListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
- ActiveJobNotificationHandler activeEventHandler =
activeListener.getNotificationHandler();
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler)
appCtx.getActiveNotificationHandler();
IActiveEntityEventsListener[] listeners =
activeEventHandler.getEventListeners();
for (IActiveEntityEventsListener listener : listeners) {
if (listener.isEntityUsingDataset(dataset)) {
@@ -1169,26 +1165,31 @@
throw new AlgebricksException("There is no dataverse with
this name " + dataverseName + ".");
}
}
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ bActiveTxn = false;
// # disconnect all feeds from any datasets in the dataverse.
- ActiveLifecycleListener activeListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
- ActiveJobNotificationHandler activeEventHandler =
activeListener.getNotificationHandler();
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler)
appCtx.getActiveNotificationHandler();
IActiveEntityEventsListener[] activeListeners =
activeEventHandler.getEventListeners();
- Identifier dvId = new Identifier(dataverseName);
- MetadataProvider tempMdProvider = new MetadataProvider(appCtx,
metadataProvider.getDefaultDataverse(),
- metadataProvider.getStorageComponentProvider());
- tempMdProvider.setConfig(metadataProvider.getConfig());
for (IActiveEntityEventsListener listener : activeListeners) {
EntityId activeEntityId = listener.getEntityId();
if
(activeEntityId.getExtensionName().equals(Feed.EXTENSION_NAME)
&&
activeEntityId.getDataverse().equals(dataverseName)) {
- tempMdProvider.getLocks().reset();
- stopFeedBeforeDelete(new Pair<>(dvId, new
Identifier(activeEntityId.getEntityName())),
- tempMdProvider);
- // prepare job to remove feed log storage
-
jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(metadataProvider,
- MetadataManager.INSTANCE.getFeed(mdTxnCtx,
dataverseName, activeEntityId.getEntityName())));
+ if (listener.getState() != ActivityState.STOPPED) {
+ ((ActiveEntityEventsListener)
listener).stop(metadataProvider);
+ }
+ FeedEventsListener feedListener = (FeedEventsListener)
listener;
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ doDropFeed(hcc, metadataProvider, feedListener.getFeed());
+
MetadataManager.INSTANCE.commitTransaction(metadataProvider.getMetadataTxnContext());
+ bActiveTxn = false;
}
}
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ bActiveTxn = true;
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
// #. prepare jobs which will drop corresponding datasets with
indexes.
List<Dataset> datasets =
MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverseName);
@@ -1295,20 +1296,6 @@
}
}
- protected void stopFeedBeforeDelete(Pair<Identifier, Identifier>
feedNameComp, MetadataProvider metadataProvider) {
- StopFeedStatement disStmt = new StopFeedStatement(feedNameComp);
- try {
- handleStopFeedStatement(metadataProvider, disStmt);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Stopped feed " + feedNameComp.second.getValue());
- }
- } catch (Exception exception) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.warning("Unable to stop feed " +
feedNameComp.second.getValue() + exception);
- }
- }
- }
-
public void handleDatasetDropStatement(MetadataProvider metadataProvider,
Statement stmt,
IHyracksClientConnection hcc) throws Exception {
DropDatasetStatement stmtDelete = (DropDatasetStatement) stmt;
@@ -1403,8 +1390,8 @@
throw new AlgebricksException(
"There is no dataset with this name " + datasetName +
" in dataverse " + dataverseName);
}
- ActiveLifecycleListener activeListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
- ActiveJobNotificationHandler activeEventHandler =
activeListener.getNotificationHandler();
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler)
appCtx.getActiveNotificationHandler();
IActiveEntityEventsListener[] listeners =
activeEventHandler.getEventListeners();
StringBuilder builder = null;
for (IActiveEntityEventsListener listener : listeners) {
@@ -1968,32 +1955,35 @@
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
return;
}
-
- EntityId feedId = new EntityId(Feed.EXTENSION_NAME, dataverseName,
feedName);
- ActiveLifecycleListener activeListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
- ActiveJobNotificationHandler activeEventHandler =
activeListener.getNotificationHandler();
- ActiveEntityEventsListener listener =
- (ActiveEntityEventsListener)
activeEventHandler.getActiveEntityListener(feedId);
- if (listener != null) {
- throw new AlgebricksException("Feed " + feedId
- + " is currently active and connected to the following
dataset(s) \n" + listener.toString());
- } else {
- JobSpecification spec =
FeedOperations.buildRemoveFeedStorageJob(metadataProvider,
- MetadataManager.INSTANCE.getFeed(mdTxnCtx,
feedId.getDataverse(), feedId.getEntityName()));
- runJob(hcc, spec);
- MetadataManager.INSTANCE.dropFeed(mdTxnCtx, dataverseName,
feedName);
- }
-
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Removed feed " + feedId);
- }
+ doDropFeed(hcc, metadataProvider, feed);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
-
} catch (Exception e) {
abort(e, e, mdTxnCtx);
throw e;
} finally {
metadataProvider.getLocks().unlock();
+ }
+ }
+
+ protected void doDropFeed(IHyracksClientConnection hcc, MetadataProvider
metadataProvider, Feed feed)
+ throws Exception {
+ MetadataTransactionContext mdTxnCtx =
metadataProvider.getMetadataTxnContext();
+ EntityId feedId = feed.getFeedId();
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler)
appCtx.getActiveNotificationHandler();
+ ActiveEntityEventsListener listener = (ActiveEntityEventsListener)
activeEventHandler.getListener(feedId);
+ if (listener != null && listener.getState() != ActivityState.STOPPED) {
+ throw new AlgebricksException("Feed " + feedId
+ + " is currently active and connected to the following
dataset(s) \n" + listener.toString());
+ } else if (listener != null) {
+ listener.unregister();
+ }
+ JobSpecification spec =
FeedOperations.buildRemoveFeedStorageJob(metadataProvider,
+ MetadataManager.INSTANCE.getFeed(mdTxnCtx,
feedId.getDataverse(), feedId.getEntityName()));
+ runJob(hcc, spec);
+ MetadataManager.INSTANCE.dropFeed(mdTxnCtx, feed.getDataverseName(),
feed.getFeedName());
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Removed feed " + feedId);
}
}
@@ -2029,56 +2019,41 @@
StartFeedStatement sfs = (StartFeedStatement) stmt;
String dataverseName = getActiveDataverse(sfs.getDataverseName());
String feedName = sfs.getFeedName().getValue();
- // Transcation handler
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
- // Runtime handler
- EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName,
feedName);
- // Feed & Feed Connections
- Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName,
feedName,
- metadataProvider.getMetadataTxnContext());
- List<FeedConnection> feedConnections = MetadataManager.INSTANCE
- .getFeedConections(metadataProvider.getMetadataTxnContext(),
dataverseName, feedName);
- ILangCompilationProvider compilationProvider = new
AqlCompilationProvider();
- IStorageComponentProvider storageComponentProvider = new
StorageComponentProvider();
- DefaultStatementExecutorFactory qtFactory = new
DefaultStatementExecutorFactory();
- ActiveLifecycleListener activeListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
- ActiveJobNotificationHandler activeEventHandler =
activeListener.getNotificationHandler();
- ActiveEntityEventsListener listener = (ActiveEntityEventsListener)
activeEventHandler
- .getActiveEntityListener(entityId);
- if (listener != null) {
- throw new AlgebricksException("Feed " + feedName + " is started
already.");
- }
- // Start
+ boolean committed = false;
MetadataLockManager.INSTANCE.startFeedBegin(metadataProvider.getLocks(),
dataverseName,
- dataverseName + "." + feedName, feedConnections);
+ dataverseName + "." + feedName);
try {
- // Prepare policy
- List<IDataset> datasets = new ArrayList<>();
- for (FeedConnection connection : feedConnections) {
- Dataset ds =
metadataProvider.findDataset(connection.getDataverseName(),
connection.getDatasetName());
- datasets.add(ds);
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ // Runtime handler
+ EntityId entityId = new EntityId(Feed.EXTENSION_NAME,
dataverseName, feedName);
+ // Feed & Feed Connections
+ Feed feed = FeedMetadataUtil.validateIfFeedExists(dataverseName,
feedName,
+ metadataProvider.getMetadataTxnContext());
+ List<FeedConnection> feedConnections = MetadataManager.INSTANCE
+
.getFeedConections(metadataProvider.getMetadataTxnContext(), dataverseName,
feedName);
+
MetadataLockManager.INSTANCE.lockFeedConnections(metadataProvider.getLocks(),
feedConnections);
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler)
appCtx.getActiveNotificationHandler();
+ ActiveEntityEventsListener listener = (ActiveEntityEventsListener)
activeEventHandler.getListener(entityId);
+ if (listener == null) {
+ // Prepare policy
+ List<IDataset> datasets = new ArrayList<>();
+ for (FeedConnection connection : feedConnections) {
+ Dataset ds =
+
metadataProvider.findDataset(connection.getDataverseName(),
connection.getDatasetName());
+ datasets.add(ds);
+ }
+ listener = new FeedEventsListener(this, hcc, entityId,
datasets, null,
+ FeedIntakeOperatorNodePushable.class.getSimpleName(),
NoRetryPolicyFactory.INSTANCE, feed,
+ feedConnections);
}
- org.apache.commons.lang3.tuple.Pair<JobSpecification,
AlgebricksAbsolutePartitionConstraint> jobInfo =
- FeedOperations.buildStartFeedJob(sessionOutput,
metadataProvider, feed, feedConnections,
- compilationProvider, storageComponentProvider,
qtFactory, hcc);
-
- JobSpecification feedJob = jobInfo.getLeft();
- listener = new ActiveEntityEventsListener(appCtx, entityId,
datasets, jobInfo.getRight(),
- FeedIntakeOperatorNodePushable.class.getSimpleName());
- activeEventHandler.registerListener(listener);
- IActiveEventSubscriber eventSubscriber = new
WaitForStateSubscriber(listener, ActivityState.STARTED);
-
feedJob.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME,
entityId);
- // TODO(Yingyi): currently we do not check IFrameWriter protocol
violations for Feed jobs.
- // We will need to design general exception handling mechanism for
feeds.
- JobUtils.runJob(hcc, feedJob,
-
Boolean.valueOf(metadataProvider.getConfig().get(StartFeedStatement.WAIT_FOR_COMPLETION)));
- eventSubscriber.sync();
- LOGGER.log(Level.INFO, "Submitted");
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ committed = true;
+ listener.start(metadataProvider);
} catch (Exception e) {
- abort(e, e, mdTxnCtx);
- if (listener != null) {
- activeEventHandler.unregisterListener(listener);
+ if (!committed) {
+ abort(e, e, mdTxnCtx);
}
throw e;
} finally {
@@ -2090,32 +2065,18 @@
StopFeedStatement sfst = (StopFeedStatement) stmt;
String dataverseName = getActiveDataverse(sfst.getDataverseName());
String feedName = sfst.getFeedName().getValue();
- EntityId feedId = new EntityId(Feed.EXTENSION_NAME, dataverseName,
feedName);
- ActiveLifecycleListener activeListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
- ActiveJobNotificationHandler activeEventHandler =
activeListener.getNotificationHandler();
+ EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName,
feedName);
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler)
appCtx.getActiveNotificationHandler();
// Obtain runtime info from ActiveListener
- ActiveEntityEventsListener listener =
- (ActiveEntityEventsListener)
activeEventHandler.getActiveEntityListener(feedId);
+ ActiveEntityEventsListener listener = (ActiveEntityEventsListener)
activeEventHandler.getListener(entityId);
if (listener == null) {
throw new AlgebricksException("Feed " + feedName + " is not
started.");
}
- IActiveEventSubscriber eventSubscriber = new
WaitForStateSubscriber(listener, ActivityState.STOPPED);
- // Transaction
- MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
- metadataProvider.setMetadataTxnContext(mdTxnCtx);
-
MetadataLockManager.INSTANCE.stopFeedBegin(metadataProvider.getLocks(),
dataverseName, feedName);
+
MetadataLockManager.INSTANCE.stopFeedBegin(metadataProvider.getLocks(),
entityId.getDataverse(),
+ entityId.getEntityName());
try {
- // validate
- FeedMetadataUtil.validateIfFeedExists(dataverseName, feedName,
mdTxnCtx);
- // Construct ActiveMessage
- for (int i = 0; i < listener.getLocations().getLocations().length;
i++) {
- String intakeLocation =
listener.getLocations().getLocations()[i];
- FeedOperations.SendStopMessageToNode(appCtx, feedId,
intakeLocation, i);
- }
- eventSubscriber.sync();
- } catch (Exception e) {
- abort(e, e, mdTxnCtx);
- throw e;
+ listener.stop(metadataProvider);
} finally {
metadataProvider.getLocks().unlock();
}
@@ -2131,10 +2092,9 @@
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
// Check whether feed is alive
- ActiveLifecycleListener activeListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
- ActiveJobNotificationHandler activeEventHandler =
activeListener.getNotificationHandler();
- if (activeEventHandler
- .getActiveEntityListener(new EntityId(Feed.EXTENSION_NAME,
dataverseName, feedName)) != null) {
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler)
appCtx.getActiveNotificationHandler();
+ if (activeEventHandler.getListener(new EntityId(Feed.EXTENSION_NAME,
dataverseName, feedName)) != null) {
throw new
CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED,
feedName);
}
// Transaction handling
@@ -2185,21 +2145,26 @@
String feedName = cfs.getFeedName().getValue();
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
- ActiveLifecycleListener activeListener = (ActiveLifecycleListener)
appCtx.getActiveLifecycleListener();
- ActiveJobNotificationHandler activeEventHandler =
activeListener.getNotificationHandler();
- // Check whether feed is alive
- if (activeEventHandler
- .getActiveEntityListener(new EntityId(Feed.EXTENSION_NAME,
dataverseName, feedName)) != null) {
- throw new
CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED,
feedName);
- }
MetadataLockManager.INSTANCE.disconnectFeedBegin(metadataProvider.getLocks(),
dataverseName,
dataverseName + "." + datasetName, dataverseName + "." +
cfs.getFeedName());
try {
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler)
appCtx.getActiveNotificationHandler();
+ // Check whether feed is alive
+ IActiveEntityEventsListener listener =
+ activeEventHandler.getListener(new
EntityId(Feed.EXTENSION_NAME, dataverseName, feedName));
+ if (listener != null && listener.isActive()) {
+ throw new
CompilationException(ErrorCode.FEED_CHANGE_FEED_CONNECTIVITY_ON_ALIVE_FEED,
feedName);
+ }
FeedMetadataUtil.validateIfDatasetExists(metadataProvider,
dataverseName, cfs.getDatasetName().getValue(),
mdTxnCtx);
FeedMetadataUtil.validateIfFeedExists(dataverseName,
cfs.getFeedName().getValue(), mdTxnCtx);
FeedConnection fc =
MetadataManager.INSTANCE.getFeedConnection(metadataProvider.getMetadataTxnContext(),
dataverseName, feedName, datasetName);
+ Dataset ds = metadataProvider.findDataset(dataverseName,
datasetName);
+ if (ds == null) {
+ throw new CompilationException("Dataset " + dataverseName +
"." + datasetName + " doesn't exist");
+ }
if (fc == null) {
throw new CompilationException("Feed " + feedName + " is
currently not connected to "
+ cfs.getDatasetName().getValue() + ". Invalid
operation!");
@@ -2211,6 +2176,9 @@
MetadataManager.INSTANCE.updateFunction(mdTxnCtx, function);
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ if (listener != null) {
+ listener.remove(ds);
+ }
} catch (Exception e) {
abort(e, e, mdTxnCtx);
throw e;
@@ -2952,4 +2920,14 @@
IStatementRewriter rewriter =
rewriterFactory.createStatementRewriter();
rewriter.rewrite(stmt);
}
+
+ @Override
+ public IStorageComponentProvider getComponentProvider() {
+ return componentProvider;
+ }
+
+ @Override
+ public ICcApplicationContext getAppCtx() {
+ return appCtx;
+ }
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index cd9138a..c1def23 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -30,7 +30,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.active.ActiveNotificationHandler;
import org.apache.asterix.api.http.ctx.StatementExecutorContext;
import org.apache.asterix.api.http.server.ApiServlet;
import org.apache.asterix.api.http.server.ClusterApiServlet;
@@ -134,7 +134,7 @@
statementExecutorCtx = new StatementExecutorContext();
appCtx = new CcApplicationContext(ccServiceCtx, getHcc(),
libraryManager, resourceIdManager,
() -> MetadataManager.INSTANCE,
GlobalRecoveryManager.instance(), ftStrategy,
- new ActiveLifecycleListener(), componentProvider);
+ new ActiveNotificationHandler(), componentProvider);
ClusterStateManager.INSTANCE.setCcAppCtx(appCtx);
ccExtensionManager = new CCExtensionManager(getExtensions());
appCtx.setExtensionManager(ccExtensionManager);
@@ -147,7 +147,7 @@
setAsterixStateProxy(AsterixStateProxy.registerRemoteObject(metadataProperties.getMetadataCallbackPort()));
ccServiceCtx.setDistributedState(proxy);
MetadataManager.initialize(proxy, metadataProperties);
-
ccServiceCtx.addJobLifecycleListener(appCtx.getActiveLifecycleListener());
+
ccServiceCtx.addJobLifecycleListener(appCtx.getActiveNotificationHandler());
// create event loop groups
webManager = new WebManager();
@@ -178,7 +178,7 @@
@Override
public void stop() throws Exception {
- ((ActiveLifecycleListener) appCtx.getActiveLifecycleListener()).stop();
+ ((ActiveNotificationHandler)
appCtx.getActiveNotificationHandler()).stop();
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Stopping Asterix cluster controller");
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 09c4983..df4825a 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -30,9 +30,9 @@
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IAdapterFactory;
+import org.apache.asterix.active.IDataSourceAdapter.AdapterType;
import org.apache.asterix.active.message.ActiveManagerMessage;
-import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
-import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import
org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
import org.apache.asterix.common.exceptions.ACIDException;
@@ -40,8 +40,6 @@
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.common.utils.StoragePathUtil;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.feed.management.FeedConnectionRequest;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
@@ -50,6 +48,9 @@
import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
+import org.apache.asterix.external.provider.AdapterFactoryProvider;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.asterix.lang.aql.statement.SubscribeFeedStatement;
@@ -57,19 +58,23 @@
import org.apache.asterix.lang.common.statement.DataverseDecl;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.DatasourceAdapter;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.FeedConnection;
import org.apache.asterix.metadata.entities.FeedPolicyEntity;
import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
import org.apache.asterix.metadata.feeds.LocationConstraint;
+import org.apache.asterix.metadata.utils.MetadataConstants;
+import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
import
org.apache.asterix.runtime.job.listener.MultiTransactionJobletEventListenerFactory;
import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.asterix.runtime.utils.RuntimeUtils;
import org.apache.asterix.translator.CompiledStatements;
import org.apache.asterix.translator.IStatementExecutor;
-import org.apache.asterix.translator.SessionOutput;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -109,6 +114,55 @@
public class FeedOperations {
private FeedOperations() {
+ }
+
+ public static AlgebricksAbsolutePartitionConstraint getFeedLocations(Feed
feed, ICcApplicationContext appCtx,
+ MetadataTransactionContext mdTxnCtx) throws Exception {
+ return getFeedFactory(feed, appCtx, mdTxnCtx).getPartitionConstraint();
+ }
+
+ public static IAdapterFactory getFeedFactory(Feed feed,
ICcApplicationContext appCtx,
+ MetadataTransactionContext mdTxnCtx) throws Exception {
+ String adapterName = feed.getAdapterName();
+ Map<String, String> configuration = feed.getAdapterConfiguration();
+ ARecordType adapterOutputType = FeedMetadataUtil.getOutputType(feed,
feed.getAdapterConfiguration(),
+ ExternalDataConstants.KEY_TYPE_NAME);
+ ARecordType metaType =
+ FeedMetadataUtil.getOutputType(feed, configuration,
ExternalDataConstants.KEY_META_TYPE_NAME);
+ ExternalDataUtils.prepareFeed(configuration, feed.getDataverseName(),
feed.getFeedName());
+ // Get adapter from metadata dataset <Metadata dataverse>
+ DatasourceAdapter adapterEntity =
+ MetadataManager.INSTANCE.getAdapter(mdTxnCtx,
MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
+ // Get adapter from metadata dataset <The feed dataverse>
+ if (adapterEntity == null) {
+ adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx,
feed.getDataverseName(), adapterName);
+ }
+ IAdapterFactory adapterFactory;
+ if (adapterEntity != null) {
+ AdapterType adapterType = adapterEntity.getType();
+ String adapterFactoryClassname = adapterEntity.getClassname();
+ switch (adapterType) {
+ case INTERNAL:
+ adapterFactory = (IAdapterFactory)
Class.forName(adapterFactoryClassname).newInstance();
+ break;
+ case EXTERNAL:
+ String[] anameComponents = adapterName.split("#");
+ String libraryName = anameComponents[0];
+ ClassLoader cl =
+
appCtx.getLibraryManager().getLibraryClassLoader(feed.getDataverseName(),
libraryName);
+ adapterFactory = (IAdapterFactory)
cl.loadClass(adapterFactoryClassname).newInstance();
+ break;
+ default:
+ throw new AsterixException("Unknown Adapter type " +
adapterType);
+ }
+ adapterFactory.setOutputType(adapterOutputType);
+ adapterFactory.setMetaType(metaType);
+ adapterFactory.configure(appCtx.getServiceContext(),
configuration);
+ } else {
+ adapterFactory =
AdapterFactoryProvider.getAdapterFactory(appCtx.getServiceContext(),
adapterName,
+ configuration, adapterOutputType, metaType);
+ }
+ return adapterFactory;
}
private static Pair<JobSpecification, IAdapterFactory>
buildFeedIntakeJobSpec(Feed feed,
@@ -151,9 +205,8 @@
return spec;
}
- private static JobSpecification getConnectionJob(SessionOutput
sessionOutput, MetadataProvider metadataProvider,
- FeedConnection feedConnection, String[] locations,
ILangCompilationProvider compilationProvider,
- IStorageComponentProvider storageComponentProvider,
DefaultStatementExecutorFactory qtFactory,
+ private static JobSpecification getConnectionJob(IStatementExecutor
statementExecutor,
+ MetadataProvider metadataProvider, FeedConnection feedConnection,
String[] locations,
IHyracksClientConnection hcc) throws AlgebricksException,
RemoteException, ACIDException {
DataverseDecl dataverseDecl = new DataverseDecl(new
Identifier(feedConnection.getDataverseName()));
FeedConnectionRequest fcr =
@@ -164,8 +217,6 @@
List<Statement> statements = new ArrayList<>();
statements.add(dataverseDecl);
statements.add(subscribeStmt);
- IStatementExecutor translator =
qtFactory.create(metadataProvider.getApplicationContext(), statements,
- sessionOutput, compilationProvider, storageComponentProvider);
// configure the metadata provider
metadataProvider.getConfig().put(FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, "" +
Boolean.TRUE);
metadataProvider.getConfig().put(FeedActivityDetails.FEED_POLICY_NAME,
"" + subscribeStmt.getPolicy());
@@ -174,7 +225,7 @@
CompiledStatements.CompiledSubscribeFeedStatement csfs = new
CompiledStatements.CompiledSubscribeFeedStatement(
subscribeStmt.getSubscriptionRequest(),
subscribeStmt.getVarCounter());
- return translator.rewriteCompileQuery(hcc, metadataProvider,
subscribeStmt.getQuery(), csfs);
+ return statementExecutor.rewriteCompileQuery(hcc, metadataProvider,
subscribeStmt.getQuery(), csfs);
}
private static JobSpecification combineIntakeCollectJobs(MetadataProvider
metadataProvider, Feed feed,
@@ -272,8 +323,8 @@
}
// make connections between operators
- for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor,
Integer>,
- Pair<IOperatorDescriptor, Integer>>> entry :
subJob.getConnectorOperatorMap().entrySet()) {
+ for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor,
Integer>, Pair<IOperatorDescriptor, Integer>>> entry : subJob
+ .getConnectorOperatorMap().entrySet()) {
ConnectorDescriptorId newId =
connectorIdMapping.get(entry.getKey());
IConnectorDescriptor connDesc =
jobSpec.getConnectorMap().get(newId);
Pair<IOperatorDescriptor, Integer> leftOp =
entry.getValue().getLeft();
@@ -356,10 +407,8 @@
}
public static Pair<JobSpecification,
AlgebricksAbsolutePartitionConstraint> buildStartFeedJob(
- SessionOutput sessionOutput, MetadataProvider metadataProvider,
Feed feed,
- List<FeedConnection> feedConnections, ILangCompilationProvider
compilationProvider,
- IStorageComponentProvider storageComponentProvider,
DefaultStatementExecutorFactory qtFactory,
- IHyracksClientConnection hcc) throws Exception {
+ IStatementExecutor statementExecutor, MetadataProvider
metadataProvider, Feed feed,
+ List<FeedConnection> feedConnections, IHyracksClientConnection
hcc) throws Exception {
FeedPolicyAccessor fpa = new FeedPolicyAccessor(new HashMap<>());
// TODO: Change the default Datasource to use all possible partitions
Pair<JobSpecification, IAdapterFactory> intakeInfo =
buildFeedIntakeJobSpec(feed, metadataProvider, fpa);
@@ -371,8 +420,8 @@
String[] ingestionLocations =
ingestionAdaptorFactory.getPartitionConstraint().getLocations();
// Add connection job
for (FeedConnection feedConnection : feedConnections) {
- JobSpecification connectionJob = getConnectionJob(sessionOutput,
metadataProvider, feedConnection,
- ingestionLocations, compilationProvider,
storageComponentProvider, qtFactory, hcc);
+ JobSpecification connectionJob =
+ getConnectionJob(statementExecutor, metadataProvider,
feedConnection, ingestionLocations, hcc);
jobsList.add(connectionJob);
}
return Pair.of(combineIntakeCollectJobs(metadataProvider, feed,
intakeJob, jobsList, feedConnections,
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index 956d111..a9e269a 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -21,23 +21,28 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.active.ActiveNotificationHandler;
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveRuntime;
+import org.apache.asterix.active.NoRetryPolicyFactory;
import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.algebra.base.ILangExtension.Language;
+import org.apache.asterix.app.active.ActiveEntityEventsListener;
+import org.apache.asterix.app.cc.CCExtensionManager;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.metadata.IDataset;
-import org.apache.asterix.external.feed.management.ActiveEntityEventsListener;
import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.test.runtime.ExecutionTestUtil;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.SessionOutput;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
@@ -71,8 +76,8 @@
String requestedStats;
CcApplicationContext appCtx =
(CcApplicationContext)
ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
- ActiveLifecycleListener activeLifecycleListener =
(ActiveLifecycleListener) appCtx.getActiveLifecycleListener();
- ActiveJobNotificationHandler activeJobNotificationHandler =
activeLifecycleListener.getNotificationHandler();
+ ActiveNotificationHandler activeJobNotificationHandler =
+ (ActiveNotificationHandler)
appCtx.getActiveNotificationHandler();
JobId jobId = new JobId(1);
// Mock ActiveRuntime
@@ -82,13 +87,18 @@
// Mock JobSpecification
JobSpecification jobSpec = Mockito.mock(JobSpecification.class);
-
Mockito.when(jobSpec.getProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME))
- .thenReturn(entityId);
+
Mockito.when(jobSpec.getProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME)).thenReturn(entityId);
+ // Mock MetadataProvider
+ CCExtensionManager extensionManager = (CCExtensionManager)
appCtx.getExtensionManager();
+ IStatementExecutor statementExecutor = extensionManager
+
.getStatementExecutorFactory(appCtx.getServiceContext().getControllerService().getExecutor())
+ .create(appCtx, Collections.emptyList(),
Mockito.mock(SessionOutput.class),
+
extensionManager.getCompilationProvider(Language.SQLPP),
appCtx.getStorageComponentProvider());
// Add event listener
- ActiveEntityEventsListener eventsListener = new
ActiveEntityEventsListener(appCtx, entityId, datasetList,
- partitionConstraint,
FeedIntakeOperatorNodePushable.class.getSimpleName());
- activeJobNotificationHandler.registerListener(eventsListener);
+ ActiveEntityEventsListener eventsListener =
+ new ActiveEntityEventsListener(statementExecutor, null,
entityId, datasetList, partitionConstraint,
+ FeedIntakeOperatorNodePushable.class.getSimpleName(),
NoRetryPolicyFactory.INSTANCE);
// Register mock runtime
NCAppRuntimeContext nc1AppCtx =
@@ -107,14 +117,14 @@
eventsListener.subscribe(startingSubscriber);
// Update stats of created/started job without joined partition
activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec);
- activeLifecycleListener.notifyJobStart(jobId);
+ activeJobNotificationHandler.notifyJobStart(jobId);
startingSubscriber.sync();
eventsListener.refreshStats(1000);
requestedStats = eventsListener.getStats();
Assert.assertTrue(requestedStats.contains("N/A"));
// Fake partition message and notify eventListener
- WaitForStateSubscriber startedSubscriber = new
WaitForStateSubscriber(eventsListener, ActivityState.STARTED);
+ WaitForStateSubscriber startedSubscriber = new
WaitForStateSubscriber(eventsListener, ActivityState.RUNNING);
eventsListener.subscribe(startedSubscriber);
ActivePartitionMessage partitionMessage = new
ActivePartitionMessage(activeRuntimeId, jobId,
ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null);
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
index 249bd56..8e84782 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/ICcApplicationContext.java
@@ -63,9 +63,9 @@
IFaultToleranceStrategy getFaultToleranceStrategy();
/**
- * @return the active lifecycle listener at Cluster controller
+ * @return the active notification handler at Cluster controller
*/
- IJobLifecycleListener getActiveLifecycleListener();
+ IJobLifecycleListener getActiveNotificationHandler();
/**
* @return a new instance of {@link IHyracksClientConnection}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java
index a0e6e71..48507a7 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IDataset.java
@@ -32,4 +32,13 @@
*/
int[] getPrimaryBloomFilterFields();
+ /**
+ * @return the dataverse name
+ */
+ String getDataverseName();
+
+ /**
+ * @return the dataset name
+ */
+ String getDatasetName();
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 2eb81d4..f29a5e5 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -23,13 +23,13 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.asterix.active.IAdapterFactory;
+import org.apache.asterix.active.IDataSourceAdapter;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.api.IDataFlowController;
import org.apache.asterix.external.api.IDataParserFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IIndexibleExternalDataSource;
import org.apache.asterix.external.api.IIndexingAdapterFactory;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
index 37cc1cf..d142cba 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIndexingAdapterFactory.java
@@ -20,6 +20,7 @@
import java.util.List;
+import org.apache.asterix.active.IAdapterFactory;
import org.apache.asterix.external.indexing.ExternalFile;
public interface IIndexingAdapterFactory extends IAdapterFactory {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
index e6d81d3..8904a18 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/FeedAdapter.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.external.dataset.adapter;
-import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.active.IDataSourceAdapter;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
index 0681d71..f3c3726 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataset/adapter/GenericAdapter.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.external.dataset.adapter;
+import org.apache.asterix.active.IDataSourceAdapter;
import org.apache.asterix.external.api.IDataFlowController;
-import org.apache.asterix.external.api.IDataSourceAdapter;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
index 822d725..d351c51 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/AbstractSubscriber.java
@@ -20,37 +20,41 @@
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.active.IActiveEventSubscriber;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
public abstract class AbstractSubscriber implements IActiveEventSubscriber {
protected final IActiveEntityEventsListener listener;
- private boolean done = false;
+ private volatile boolean done = false;
+ private volatile Exception failure = null;
public AbstractSubscriber(IActiveEntityEventsListener listener) {
this.listener = listener;
}
@Override
- public synchronized boolean isDone() {
+ public boolean isDone() {
return done;
}
- public synchronized void complete() throws HyracksDataException {
- done = true;
- notifyAll();
- }
-
- @Override
- public synchronized void sync() throws InterruptedException {
- while (!done) {
- wait();
+ public void complete(Exception failure) {
+ synchronized (listener) {
+ done = true;
+ if (failure != null) {
+ this.failure = failure;
+ }
+ listener.notifyAll();
}
}
@Override
- public synchronized void unsubscribe() {
- done = true;
- notifyAll();
+ public void sync() throws Exception {
+ synchronized (listener) {
+ while (!done) {
+ if (failure != null) {
+ throw failure;
+ }
+ listener.wait();
+ }
+ }
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
index 42f7a74..90cf45f 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/NoOpSubscriber.java
@@ -49,11 +49,6 @@
}
@Override
- public void unsubscribe() {
- // no op
- }
-
- @Override
public void subscribed(IActiveEntityEventsListener eventsListener) throws
HyracksDataException {
// no op
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
index fa2fa7f..804fbd8 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/StatsSubscriber.java
@@ -32,7 +32,17 @@
@Override
public void notify(ActiveEvent event) throws HyracksDataException {
if (event.getEventKind() == ActiveEvent.Kind.STATS_UPDATED) {
- complete();
+ try {
+ complete(null);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ } else if (event.getEventKind() == ActiveEvent.Kind.FALURE) {
+ try {
+ complete((Exception) event.getEventObject());
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
index 7bab421..6ae62ae 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/watch/WaitForStateSubscriber.java
@@ -39,7 +39,13 @@
@Override
public void notify(ActiveEvent event) throws HyracksDataException {
if (listener.getState() == targetState) {
- complete();
+ complete(null);
+ } else if (event.getEventKind() == ActiveEvent.Kind.FALURE) {
+ try {
+ complete((Exception) event.getEventObject());
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
}
}
@@ -49,7 +55,7 @@
throw new
RuntimeDataException(ErrorCode.CANNOT_SUBSCRIBE_TO_FAILED_ACTIVE_ENTITY);
}
if (listener.getState() == targetState) {
- complete();
+ complete(null);
}
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
index 93acb26..7ff429d 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalScanOperatorDescriptor.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.external.operators;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.active.IAdapterFactory;
+import org.apache.asterix.active.IDataSourceAdapter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
index 3a06a2b..f38a005 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorDescriptor.java
@@ -22,11 +22,11 @@
import java.util.logging.Logger;
import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IAdapterFactory;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.feed.api.IFeed;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.om.types.ARecordType;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 8c6a420..4ece727 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -21,9 +21,9 @@
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
import org.apache.asterix.active.EntityId;
+import org.apache.asterix.active.IAdapterFactory;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
index d6ac5d1..aecb790 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/AdapterFactoryProvider.java
@@ -21,9 +21,9 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.active.IAdapterFactory;
import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
-import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.api.IIndexingAdapterFactory;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.util.ExternalDataCompatibilityUtils;
diff --git
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 1c28940..72efc95 100644
---
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -22,10 +22,10 @@
import java.io.InputStream;
import java.util.Map;
+import org.apache.asterix.active.IAdapterFactory;
+import org.apache.asterix.active.IDataSourceAdapter;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.ITupleForwarder;
import org.apache.asterix.external.parser.ADMDataParser;
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
index c84a5bd..08ca03b 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
@@ -57,8 +57,7 @@
// Key is dataverse name. Key of value map is dataset name.
protected final Map<String, Map<String, Dataset>> datasets = new
HashMap<>();
// Key is dataverse name. Key of value map is dataset name. Key of value
map of value map is index name.
- protected final Map<String, Map<String, Map<String, Index>>> indexes =
- new HashMap<>();
+ protected final Map<String, Map<String, Map<String, Index>>> indexes = new
HashMap<>();
// Key is dataverse name. Key of value map is datatype name.
protected final Map<String, Map<String, Datatype>> datatypes = new
HashMap<>();
// Key is dataverse name.
@@ -66,19 +65,16 @@
// Key is function Identifier . Key of value map is function name.
protected final Map<FunctionSignature, Function> functions = new
HashMap<>();
// Key is adapter dataverse. Key of value map is the adapter name
- protected final Map<String, Map<String, DatasourceAdapter>> adapters =
- new HashMap<>();
+ protected final Map<String, Map<String, DatasourceAdapter>> adapters = new
HashMap<>();
// Key is DataverseName, Key of the value map is the Policy name
- protected final Map<String, Map<String, FeedPolicyEntity>> feedPolicies =
- new HashMap<>();
+ protected final Map<String, Map<String, FeedPolicyEntity>> feedPolicies =
new HashMap<>();
// Key is library dataverse. Key of value map is the library name
protected final Map<String, Map<String, Library>> libraries = new
HashMap<>();
// Key is library dataverse. Key of value map is the feed name
protected final Map<String, Map<String, Feed>> feeds = new HashMap<>();
// Key is DataverseName, Key of the value map is the Policy name
- protected final Map<String, Map<String, CompactionPolicy>>
compactionPolicies =
- new HashMap<>();
+ protected final Map<String, Map<String, CompactionPolicy>>
compactionPolicies = new HashMap<>();
// Key is DataverseName, Key of value map is feedConnectionId
protected final Map<String, Map<String, FeedConnection>> feedConnections =
new HashMap<>();
@@ -247,8 +243,7 @@
datatypes.remove(dataverse.getDataverseName());
adapters.remove(dataverse.getDataverseName());
compactionPolicies.remove(dataverse.getDataverseName());
- List<FunctionSignature>
markedFunctionsForRemoval =
- new ArrayList<>();
+ List<FunctionSignature>
markedFunctionsForRemoval = new ArrayList<>();
for (FunctionSignature signature :
functions.keySet()) {
if
(signature.getNamespace().equals(dataverse.getDataverseName())) {
markedFunctionsForRemoval.add(signature);
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 93b19f1..b35fff7 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -26,6 +26,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.asterix.active.IAdapterFactory;
+import org.apache.asterix.active.IDataSourceAdapter;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.ClusterProperties;
@@ -39,8 +41,6 @@
import
org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.metadata.IDatasetDetails;
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index e2b1761..e626892 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -20,8 +20,8 @@
import java.util.List;
+import org.apache.asterix.active.IAdapterFactory;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.metadata.IDatasetDetails;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.entities.Dataset;
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
index b13f4c2..18799a5 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
@@ -23,8 +23,8 @@
import java.util.List;
import java.util.Map;
+import org.apache.asterix.active.IAdapterFactory;
import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
import org.apache.asterix.om.types.ARecordType;
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 3b70ea9..9960645 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -28,6 +28,8 @@
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
+import org.apache.asterix.active.IAdapterFactory;
+import org.apache.asterix.active.IDataSourceAdapter;
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.context.IStorageComponentProvider;
@@ -38,8 +40,6 @@
import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.indexing.IndexingConstants;
@@ -397,23 +397,23 @@
}
public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint,
IAdapterFactory> buildFeedIntakeRuntime(
- JobSpecification jobSpec, Feed primaryFeed, FeedPolicyAccessor
policyAccessor) throws Exception {
+ JobSpecification jobSpec, Feed feed, FeedPolicyAccessor
policyAccessor) throws Exception {
Triple<IAdapterFactory, RecordDescriptor,
IDataSourceAdapter.AdapterType> factoryOutput;
- factoryOutput =
FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(primaryFeed, policyAccessor,
mdTxnCtx,
+ factoryOutput = FeedMetadataUtil.getPrimaryFeedFactoryAndOutput(feed,
policyAccessor, mdTxnCtx,
getApplicationContext());
- ARecordType recordType = FeedMetadataUtil.getOutputType(primaryFeed,
primaryFeed.getAdapterConfiguration(),
+ ARecordType recordType = FeedMetadataUtil.getOutputType(feed,
feed.getAdapterConfiguration(),
ExternalDataConstants.KEY_TYPE_NAME);
IAdapterFactory adapterFactory = factoryOutput.first;
FeedIntakeOperatorDescriptor feedIngestor = null;
switch (factoryOutput.third) {
case INTERNAL:
- feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec,
primaryFeed, adapterFactory, recordType,
+ feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feed,
adapterFactory, recordType,
policyAccessor, factoryOutput.second);
break;
case EXTERNAL:
- String libraryName = primaryFeed.getAdapterName().trim()
+ String libraryName = feed.getAdapterName().trim()
.split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
- feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec,
primaryFeed, libraryName,
+ feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feed,
libraryName,
adapterFactory.getClass().getName(), recordType,
policyAccessor, factoryOutput.second);
break;
default:
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 9131692..2383eb6 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -27,7 +27,7 @@
import java.util.logging.Logger;
import java.util.stream.IntStream;
-import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.active.ActiveNotificationHandler;
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
@@ -179,8 +179,8 @@
public Dataset(Dataset dataset, boolean forRebalance, String
targetNodeGroupName) {
this(dataset.dataverseName, dataset.datasetName,
dataset.recordTypeDataverseName, dataset.recordTypeName,
dataset.metaTypeDataverseName, dataset.metaTypeName,
targetNodeGroupName,
- dataset.compactionPolicyFactory,
- dataset.compactionPolicyProperties, dataset.datasetDetails,
dataset.hints, dataset.datasetType,
+ dataset.compactionPolicyFactory,
dataset.compactionPolicyProperties, dataset.datasetDetails,
+ dataset.hints, dataset.datasetType,
forRebalance ?
DatasetIdFactory.generateAlternatingDatasetId(dataset.datasetId) :
dataset.datasetId,
dataset.pendingOp, forRebalance ? dataset.rebalanceCount + 1 :
dataset.rebalanceCount);
}
@@ -206,10 +206,12 @@
this.rebalanceCount = rebalanceCount;
}
+ @Override
public String getDataverseName() {
return dataverseName;
}
+ @Override
public String getDatasetName() {
return datasetName;
}
@@ -325,9 +327,9 @@
Map<FeedConnectionId, Pair<JobSpecification, Boolean>>
disconnectJobList = new HashMap<>();
if (getDatasetType() == DatasetType.INTERNAL) {
// prepare job spec(s) that would disconnect any active feeds
involving the dataset.
- ActiveLifecycleListener activeListener =
- (ActiveLifecycleListener)
metadataProvider.getApplicationContext().getActiveLifecycleListener();
- IActiveEntityEventsListener[] activeListeners =
activeListener.getNotificationHandler().getEventListeners();
+ ActiveNotificationHandler activeListener =
+ (ActiveNotificationHandler)
metadataProvider.getApplicationContext().getActiveNotificationHandler();
+ IActiveEntityEventsListener[] activeListeners =
activeListener.getEventListeners();
for (IActiveEntityEventsListener listener : activeListeners) {
if (listener.isEntityUsingDataset(this)) {
throw new
CompilationException(ErrorCode.COMPILATION_CANT_DROP_ACTIVE_DATASET,
@@ -706,8 +708,8 @@
public RecordDescriptor getPrimaryRecordDescriptor(MetadataProvider
metadataProvider) throws AlgebricksException {
List<List<String>> partitioningKeys = getPrimaryKeys();
int numPrimaryKeys = partitioningKeys.size();
- ISerializerDeserializer[] primaryRecFields = new
ISerializerDeserializer[numPrimaryKeys + 1
- + (hasMetaPart() ? 1 : 0)];
+ ISerializerDeserializer[] primaryRecFields =
+ new ISerializerDeserializer[numPrimaryKeys + 1 +
(hasMetaPart() ? 1 : 0)];
ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 +
(hasMetaPart() ? 1 : 0)];
ISerializerDeserializerProvider serdeProvider =
metadataProvider.getFormat().getSerdeProvider();
List<Integer> indicators = null;
@@ -719,9 +721,9 @@
// Set the serde/traits for primary keys
for (int i = 0; i < numPrimaryKeys; i++) {
- IAType keyType = (indicators == null || indicators.get(i) == 0)
- ? itemType.getSubFieldType(partitioningKeys.get(i))
- : metaType.getSubFieldType(partitioningKeys.get(i));
+ IAType keyType =
+ (indicators == null || indicators.get(i) == 0) ?
itemType.getSubFieldType(partitioningKeys.get(i))
+ :
metaType.getSubFieldType(partitioningKeys.get(i));
primaryRecFields[i] =
serdeProvider.getSerializerDeserializer(keyType);
primaryTypeTraits[i] =
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
}
@@ -731,8 +733,8 @@
primaryTypeTraits[numPrimaryKeys] =
TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
if (hasMetaPart()) {
// Set the serde and traits for the meta record field
- primaryRecFields[numPrimaryKeys + 1] =
SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(metaType);
+ primaryRecFields[numPrimaryKeys + 1] =
+
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType);
primaryTypeTraits[numPrimaryKeys + 1] =
TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
}
return new RecordDescriptor(primaryRecFields, primaryTypeTraits);
@@ -758,9 +760,9 @@
indicators = ((InternalDatasetDetails)
getDatasetDetails()).getKeySourceIndicator();
}
for (int i = 0; i < numPrimaryKeys; i++) {
- IAType keyType = (indicators == null || indicators.get(i) == 0)
- ? recordType.getSubFieldType(partitioningKeys.get(i))
- : metaType.getSubFieldType(partitioningKeys.get(i));
+ IAType keyType =
+ (indicators == null || indicators.get(i) == 0) ?
recordType.getSubFieldType(partitioningKeys.get(i))
+ :
metaType.getSubFieldType(partitioningKeys.get(i));
cmpFactories[i] =
cmpFactoryProvider.getBinaryComparatorFactory(keyType, true);
}
return cmpFactories;
@@ -803,8 +805,8 @@
// Gets an array of partition numbers for this dataset.
protected int[] getDatasetPartitions(MetadataProvider metadataProvider)
throws AlgebricksException {
- FileSplit[] splitsForDataset =
metadataProvider.splitsForIndex(metadataProvider.getMetadataTxnContext(), this,
- getDatasetName());
+ FileSplit[] splitsForDataset =
+
metadataProvider.splitsForIndex(metadataProvider.getMetadataTxnContext(), this,
getDatasetName());
return IntStream.range(0, splitsForDataset.length).toArray();
}
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
index 5a85327..f263703 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/DatasourceAdapter.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.metadata.entities;
-import org.apache.asterix.external.api.IDataSourceAdapter.AdapterType;
+import org.apache.asterix.active.IDataSourceAdapter.AdapterType;
import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
import org.apache.asterix.metadata.MetadataCache;
import org.apache.asterix.metadata.api.IMetadataEntity;
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
index 7f4e28d..8a7a9b8 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasourceAdapterTupleTranslator.java
@@ -24,7 +24,7 @@
import java.io.DataInputStream;
import java.util.Calendar;
-import org.apache.asterix.external.api.IDataSourceAdapter;
+import org.apache.asterix.active.IDataSourceAdapter;
import org.apache.asterix.external.dataset.adapter.AdapterIdentifier;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.metadata.MetadataException;
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
index 94b36ed..d1001030 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java
@@ -21,15 +21,15 @@
import java.rmi.RemoteException;
import java.util.Map;
+import org.apache.asterix.active.IAdapterFactory;
+import org.apache.asterix.active.IDataSourceAdapter;
+import org.apache.asterix.active.IDataSourceAdapter.AdapterType;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.external.api.IAdapterFactory;
-import org.apache.asterix.external.api.IDataSourceAdapter;
-import org.apache.asterix.external.api.IDataSourceAdapter.AdapterType;
import org.apache.asterix.external.feed.api.IFeed;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.provider.AdapterFactoryProvider;
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java
index 43d72e3..393af7b 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/MetadataLockManager.java
@@ -266,13 +266,16 @@
acquireDataverseReadLock(locks, dataverseName);
}
- public void startFeedBegin(LockList locks, String dataverseName, String
feedName,
- List<FeedConnection> feedConnections) throws AsterixException {
+ public void startFeedBegin(LockList locks, String dataverseName, String
feedName) throws AsterixException {
acquireDataverseReadLock(locks, dataverseName);
acquireFeedReadLock(locks, feedName);
+
+ }
+
+ public void lockFeedConnections(LockList locks, List<FeedConnection>
feedConnections) throws AsterixException {
for (FeedConnection feedConnection : feedConnections) {
// what if the dataset is in a different dataverse
- String fqName = dataverseName + "." +
feedConnection.getDatasetName();
+ String fqName = feedConnection.getDataverseName() + "." +
feedConnection.getDatasetName();
acquireDatasetReadLock(locks, fqName);
}
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
index 5bb0aa9..fb655eb 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java
@@ -28,11 +28,11 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.asterix.active.IAdapterFactory;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
import org.apache.asterix.common.config.DatasetConfig.TransactionState;
import org.apache.asterix.common.context.IStorageComponentProvider;
-import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.indexing.IndexingConstants;
import
org.apache.asterix.external.operators.ExternalDatasetIndexesAbortOperatorDescriptor;
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
index 4cd243c..d43af8c 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/CcApplicationContext.java
@@ -210,7 +210,7 @@
}
@Override
- public IJobLifecycleListener getActiveLifecycleListener() {
+ public IJobLifecycleListener getActiveNotificationHandler() {
return activeLifeCycleListener;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
index 30ffebe..1506872 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java
@@ -18,12 +18,14 @@
*/
package org.apache.hyracks.api.job;
+import java.util.List;
+
import org.apache.hyracks.api.exceptions.HyracksException;
public interface IJobLifecycleListener {
- public void notifyJobCreation(JobId jobId, JobSpecification spec) throws
HyracksException;
+ void notifyJobCreation(JobId jobId, JobSpecification spec) throws
HyracksException;
- public void notifyJobStart(JobId jobId) throws HyracksException;
+ void notifyJobStart(JobId jobId) throws HyracksException;
- public void notifyJobFinish(JobId jobId) throws HyracksException;
+ void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception>
exceptions) throws HyracksException;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
index 5075081..26245e1 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java
@@ -36,6 +36,7 @@
import org.apache.hyracks.api.job.IJobLifecycleListener;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.api.service.IControllerService;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.application.ServiceContext;
@@ -88,14 +89,14 @@
}
}
- public synchronized void notifyJobFinish(JobId jobId) throws
HyracksException {
+ public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus,
List<Exception> exceptions)
+ throws HyracksException {
for (IJobLifecycleListener l : jobLifecycleListeners) {
- l.notifyJobFinish(jobId);
+ l.notifyJobFinish(jobId, jobStatus, exceptions);
}
}
- public synchronized void notifyJobCreation(JobId jobId, JobSpecification
spec)
- throws HyracksException {
+ public synchronized void notifyJobCreation(JobId jobId, JobSpecification
spec) throws HyracksException {
for (IJobLifecycleListener l : jobLifecycleListeners) {
l.notifyJobCreation(jobId, spec);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 3cc41c5..32403ce 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -41,6 +41,7 @@
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.control.cc.PreDistributedJobStore;
import org.apache.hyracks.control.common.dataset.ResultStateSweeper;
import org.apache.hyracks.control.common.work.IResultCallback;
@@ -69,7 +70,7 @@
this.resultTTL = resultTTL;
this.resultSweepThreshold = resultSweepThreshold;
this.preDistributedJobStore = preDistributedJobStore;
- jobResultLocations = new LinkedHashMap<JobId, JobResultInfo>();
+ jobResultLocations = new LinkedHashMap<>();
}
@Override
@@ -94,7 +95,7 @@
}
@Override
- public void notifyJobFinish(JobId jobId) throws HyracksException {
+ public void notifyJobFinish(JobId jobId, JobStatus jobStatus,
List<Exception> exceptions) throws HyracksException {
// Auto-generated method stub
}
@@ -189,7 +190,7 @@
@Override
public synchronized long getResultTimestamp(JobId jobId) {
- if (preDistributedJobStore.jobIsPredistributed(jobId)){
+ if (preDistributedJobStore.jobIsPredistributed(jobId)) {
return -1;
}
return getState(jobId).getTimestamp();
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 45c7711..c1a7899 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -132,8 +132,8 @@
// Removes a pending job.
JobRun jobRun = jobQueue.remove(jobId);
if (jobRun != null) {
- List<Exception> exceptions = Collections
-
.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId));
+ List<Exception> exceptions =
+
Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED,
jobId));
// Since the job has not been executed, we only need to update its
status and lifecyle here.
jobRun.setStatus(JobStatus.FAILURE, exceptions);
runMapArchive.put(jobId, jobRun);
@@ -179,7 +179,7 @@
} catch (Exception e) {
LOGGER.log(Level.SEVERE, e.getMessage(), e);
if (caughtException == null) {
- caughtException = new HyracksException(e);
+ caughtException = HyracksException.create(e);
} else {
caughtException.addSuppressed(e);
}
@@ -208,7 +208,7 @@
CCServiceContext serviceCtx = ccs.getContext();
if (serviceCtx != null) {
try {
- serviceCtx.notifyJobFinish(jobId);
+ serviceCtx.notifyJobFinish(jobId, run.getPendingStatus(),
run.getPendingExceptions());
} catch (HyracksException e) {
LOGGER.log(Level.SEVERE, e.getMessage(), e);
caughtException = e;
@@ -248,8 +248,6 @@
throw caughtException;
}
}
-
-
@Override
public Collection<JobRun> getRunningJobs() {
@@ -320,9 +318,8 @@
try {
run.getExecutor().startJob();
} catch (Exception e) {
- ccs.getWorkQueue()
- .schedule(new JobCleanupWork(ccs.getJobManager(),
run.getJobId(), JobStatus.FAILURE,
- Collections.singletonList(e)));
+ ccs.getWorkQueue().schedule(new
JobCleanupWork(ccs.getJobManager(), run.getJobId(), JobStatus.FAILURE,
+ Collections.singletonList(e)));
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1875
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ifeac8c73e6bad39a13663b84a52121356e3c6b40
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>