abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1118
Change subject: Refactor Active Listeners
......................................................................
Refactor Active Listeners
Change-Id: I260c8608329523f56dc54780d87d796f838505cf
---
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
3 files changed, 23 insertions(+), 28 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/18/1118/1
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index 156576c..77a8fdc 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -31,4 +31,6 @@
public EntityId getEntityId();
+ public boolean connectedToDataset(String dataverseName, String
datasetName);
+
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 24c678c..4644b5c 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -696,8 +696,7 @@
StringBuilder builder = null;
IActiveEntityEventsListener[] listeners =
ActiveJobNotificationHandler.INSTANCE.getEventListeners();
for (IActiveEntityEventsListener listener : listeners) {
- if (listener instanceof FeedEventsListener
- && ((FeedEventsListener)
listener).isConnectedToDataset(datasetName)) {
+ if (listener.connectedToDataset(dataverseName, datasetName)) {
if (builder == null) {
builder = new StringBuilder();
}
@@ -706,7 +705,7 @@
}
if (builder != null) {
throw new AsterixException("Dataset " + dataverseName + "." +
datasetName + " is currently being "
- + "fed into by the following feed(s).\n" +
builder.toString() + "\n" + "Operation not supported");
+ + "fed into by the following active entities.\n" +
builder.toString());
}
}
@@ -1411,22 +1410,11 @@
Map<FeedConnectionId, Pair<JobSpecification, Boolean>>
disconnectJobList = new HashMap<>();
if (ds.getDatasetType() == DatasetType.INTERNAL) {
// prepare job spec(s) that would disconnect any active feeds
involving the dataset.
- IActiveEntityEventsListener[] feedConnections =
ActiveJobNotificationHandler.INSTANCE.getEventListeners();
- for (IActiveEntityEventsListener conn : feedConnections) {
- if
(conn.getEntityId().getExtensionName().equals(Feed.EXTENSION_NAME)
- && ((FeedEventsListener)
conn).isConnectedToDataset(datasetName)) {
- FeedConnectionId connectionId = new
FeedConnectionId(conn.getEntityId(), datasetName);
- Pair<JobSpecification, Boolean> p =
FeedOperations.buildDisconnectFeedJobSpec(metadataProvider,
- connectionId);
- disconnectJobList.put(connectionId, p);
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Disconnecting feed " +
connectionId.getFeedId().getEntityName() + " from dataset "
- + datasetName + " as dataset is being
dropped");
- }
- // prepare job to remove feed log storage
-
jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE.getFeed(
- mdTxnCtx.getValue(),
connectionId.getFeedId().getDataverse(),
- connectionId.getFeedId().getEntityName())));
+ IActiveEntityEventsListener[] activeListeners =
ActiveJobNotificationHandler.INSTANCE.getEventListeners();
+ for (IActiveEntityEventsListener listener : activeListeners) {
+ if (listener.connectedToDataset(dataverseName, datasetName)) {
+ throw new AsterixException(
+ "Can't drop dataset since it is connected to
active entity: " + listener.getEntityId());
}
}
@@ -1547,8 +1535,7 @@
IActiveEntityEventsListener[] listeners =
ActiveJobNotificationHandler.INSTANCE.getEventListeners();
StringBuilder builder = null;
for (IActiveEntityEventsListener listener : listeners) {
- if
(listener.getEntityId().getExtensionName().equals(Feed.EXTENSION_NAME)
- && ((FeedEventsListener)
listener).isConnectedToDataset(datasetName)) {
+ if (listener.connectedToDataset(dataverseName, datasetName)) {
if (builder == null) {
builder = new StringBuilder();
}
@@ -1557,8 +1544,8 @@
}
if (builder != null) {
throw new AsterixException(
- "Dataset" + datasetName + " is currently being fed
into by the following feeds " + "."
- + builder.toString() + "\nOperation not
supported.");
+ "Dataset" + datasetName + " is currently being fed
into by the following active entities" + "."
+ + builder.toString());
}
if (ds.getDatasetType() == DatasetType.INTERNAL) {
@@ -1709,7 +1696,7 @@
e.addSuppressed(e2);
abort(e, e2, mdTxnCtx);
throw new IllegalStateException("System is inconsistent
state: pending index("
- + dataverseName + "." + datasetName + "."
+ + dataverseName + "." + datasetName + "."
+ indexName + ") couldn't be removed from the
metadata", e);
}
}
@@ -2008,7 +1995,8 @@
}
}
- protected void handleCreateFeedPolicyStatement(AqlMetadataProvider
metadataProvider, Statement stmt) throws Exception {
+ protected void handleCreateFeedPolicyStatement(AqlMetadataProvider
metadataProvider, Statement stmt)
+ throws Exception {
MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
String dataverse;
@@ -2350,7 +2338,7 @@
IFeedLifecycleEventSubscriber eventSubscriber = new
FeedLifecycleEventSubscriber();
FeedEventsListener listener = (FeedEventsListener)
ActiveJobNotificationHandler.INSTANCE
.getActiveEntityListener(entityId);
- if (listener == null || !listener.isConnectedToDataset(datasetName)) {
+ if (listener == null || !listener.connectedToDataset(dataverseName,
datasetName)) {
throw new AsterixException("Feed " +
feed.getFeedId().getEntityName() + " is currently not connected to "
+ cfs.getDatasetName().getValue() + ". Invalid
operation!");
}
@@ -2868,7 +2856,7 @@
default:
throw new AlgebricksException(
"The system \"" + runStmt.getSystem() +
- "\" specified in your run statement is not
supported.");
+ "\" specified in your run statement is not
supported.");
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
index c40fed6..c3c738e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -611,7 +611,7 @@
cInfo.setState(ActivityState.ACTIVE);
}
- public synchronized boolean isConnectedToDataset(String datasetName) {
+ private synchronized boolean isConnectedToDataset(String datasetName) {
for (FeedConnectionId connection : connectJobInfos.keySet()) {
if (connection.getDatasetName().equals(datasetName)) {
return true;
@@ -641,4 +641,9 @@
public IFeedJoint getSourceFeedJoint() {
return sourceFeedJoint;
}
+
+ @Override
+ public boolean connectedToDataset(String dataverseName, String
datasetName) {
+ return isConnectedToDataset(datasetName);
+ }
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1118
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I260c8608329523f56dc54780d87d796f838505cf
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>