abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1118

Change subject: Refactor Active Listeners
......................................................................

Refactor Active Listeners

Change-Id: I260c8608329523f56dc54780d87d796f838505cf
---
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
3 files changed, 23 insertions(+), 28 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/18/1118/1

diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index 156576c..77a8fdc 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -31,4 +31,6 @@
 
     public EntityId getEntityId();
 
+    public boolean connectedToDataset(String dataverseName, String 
datasetName);
+
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 24c678c..4644b5c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -696,8 +696,7 @@
         StringBuilder builder = null;
         IActiveEntityEventsListener[] listeners = 
ActiveJobNotificationHandler.INSTANCE.getEventListeners();
         for (IActiveEntityEventsListener listener : listeners) {
-            if (listener instanceof FeedEventsListener
-                    && ((FeedEventsListener) 
listener).isConnectedToDataset(datasetName)) {
+            if (listener.connectedToDataset(dataverseName, datasetName)) {
                 if (builder == null) {
                     builder = new StringBuilder();
                 }
@@ -706,7 +705,7 @@
         }
         if (builder != null) {
             throw new AsterixException("Dataset " + dataverseName + "." + 
datasetName + " is currently being "
-                    + "fed into by the following feed(s).\n" + 
builder.toString() + "\n" + "Operation not supported");
+                    + "fed into by the following active entities.\n" + 
builder.toString());
         }
     }
 
@@ -1411,22 +1410,11 @@
         Map<FeedConnectionId, Pair<JobSpecification, Boolean>> 
disconnectJobList = new HashMap<>();
         if (ds.getDatasetType() == DatasetType.INTERNAL) {
             // prepare job spec(s) that would disconnect any active feeds 
involving the dataset.
-            IActiveEntityEventsListener[] feedConnections = 
ActiveJobNotificationHandler.INSTANCE.getEventListeners();
-            for (IActiveEntityEventsListener conn : feedConnections) {
-                if 
(conn.getEntityId().getExtensionName().equals(Feed.EXTENSION_NAME)
-                        && ((FeedEventsListener) 
conn).isConnectedToDataset(datasetName)) {
-                    FeedConnectionId connectionId = new 
FeedConnectionId(conn.getEntityId(), datasetName);
-                    Pair<JobSpecification, Boolean> p = 
FeedOperations.buildDisconnectFeedJobSpec(metadataProvider,
-                            connectionId);
-                    disconnectJobList.put(connectionId, p);
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Disconnecting feed " + 
connectionId.getFeedId().getEntityName() + " from dataset "
-                                + datasetName + " as dataset is being 
dropped");
-                    }
-                    // prepare job to remove feed log storage
-                    
jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE.getFeed(
-                            mdTxnCtx.getValue(), 
connectionId.getFeedId().getDataverse(),
-                            connectionId.getFeedId().getEntityName())));
+            IActiveEntityEventsListener[] activeListeners = 
ActiveJobNotificationHandler.INSTANCE.getEventListeners();
+            for (IActiveEntityEventsListener listener : activeListeners) {
+                if (listener.connectedToDataset(dataverseName, datasetName)) {
+                    throw new AsterixException(
+                            "Can't drop dataset since it is connected to 
active entity: " + listener.getEntityId());
                 }
             }
 
@@ -1547,8 +1535,7 @@
             IActiveEntityEventsListener[] listeners = 
ActiveJobNotificationHandler.INSTANCE.getEventListeners();
             StringBuilder builder = null;
             for (IActiveEntityEventsListener listener : listeners) {
-                if 
(listener.getEntityId().getExtensionName().equals(Feed.EXTENSION_NAME)
-                        && ((FeedEventsListener) 
listener).isConnectedToDataset(datasetName)) {
+                if (listener.connectedToDataset(dataverseName, datasetName)) {
                     if (builder == null) {
                         builder = new StringBuilder();
                     }
@@ -1557,8 +1544,8 @@
             }
             if (builder != null) {
                 throw new AsterixException(
-                        "Dataset" + datasetName + " is currently being fed 
into by the following feeds " + "."
-                                + builder.toString() + "\nOperation not 
supported.");
+                        "Dataset" + datasetName + " is currently being fed 
into by the following active entities" + "."
+                                + builder.toString());
             }
 
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
@@ -1709,7 +1696,7 @@
                     e.addSuppressed(e2);
                     abort(e, e2, mdTxnCtx);
                     throw new IllegalStateException("System is inconsistent 
state: pending index("
-                    + dataverseName + "." + datasetName + "."
+                            + dataverseName + "." + datasetName + "."
                             + indexName + ") couldn't be removed from the 
metadata", e);
                 }
             }
@@ -2008,7 +1995,8 @@
         }
     }
 
-    protected void handleCreateFeedPolicyStatement(AqlMetadataProvider 
metadataProvider, Statement stmt) throws Exception {
+    protected void handleCreateFeedPolicyStatement(AqlMetadataProvider 
metadataProvider, Statement stmt)
+            throws Exception {
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         String dataverse;
@@ -2350,7 +2338,7 @@
         IFeedLifecycleEventSubscriber eventSubscriber = new 
FeedLifecycleEventSubscriber();
         FeedEventsListener listener = (FeedEventsListener) 
ActiveJobNotificationHandler.INSTANCE
                 .getActiveEntityListener(entityId);
-        if (listener == null || !listener.isConnectedToDataset(datasetName)) {
+        if (listener == null || !listener.connectedToDataset(dataverseName, 
datasetName)) {
             throw new AsterixException("Feed " + 
feed.getFeedId().getEntityName() + " is currently not connected to "
                     + cfs.getDatasetName().getValue() + ". Invalid 
operation!");
         }
@@ -2868,7 +2856,7 @@
             default:
                 throw new AlgebricksException(
                         "The system \"" + runStmt.getSystem() +
-                        "\" specified in your run statement is not 
supported.");
+                                "\" specified in your run statement is not 
supported.");
         }
 
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
index c40fed6..c3c738e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java
@@ -611,7 +611,7 @@
         cInfo.setState(ActivityState.ACTIVE);
     }
 
-    public synchronized boolean isConnectedToDataset(String datasetName) {
+    private synchronized boolean isConnectedToDataset(String datasetName) {
         for (FeedConnectionId connection : connectJobInfos.keySet()) {
             if (connection.getDatasetName().equals(datasetName)) {
                 return true;
@@ -641,4 +641,9 @@
     public IFeedJoint getSourceFeedJoint() {
         return sourceFeedJoint;
     }
+
+    @Override
+    public boolean connectedToDataset(String dataverseName, String 
datasetName) {
+        return isConnectedToDataset(datasetName);
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1118
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I260c8608329523f56dc54780d87d796f838505cf
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>

Reply via email to