Steven Jacobs has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2249
Change subject: [ASTERIXDB-2214][FAIL] Prevent dropping of Functions Used By
Active Entities
......................................................................
[ASTERIXDB-2214][FAIL] Prevent dropping of Functions Used By Active Entities
Remove Specialized Feed checks for functional dependency
Add Active Entity checks for functional dependency
Change-Id: I62393b65eddc4c2520fc8a0f3f80960551f4a159
---
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/functions/FunctionSignature.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
11 files changed, 69 insertions(+), 91 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/49/2249/1
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
index 37120e4..d4d1ef6 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.active;
+import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.metadata.IDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -52,6 +53,8 @@
*/
boolean isEntityUsingDataset(IDataset dataset);
+ boolean dependsOnFunction(FunctionSignature function);
+
/**
* subscribe to events. subscription ends when subscriber.done() returns
true
*
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 8cbc109..475ffa6 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -41,6 +41,7 @@
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.common.messaging.api.INcAddressedMessage;
import org.apache.asterix.common.metadata.IDataset;
@@ -77,6 +78,7 @@
protected final IHyracksClientConnection hcc;
protected final EntityId entityId;
private final List<Dataset> datasets;
+ private final List<FunctionSignature> functions;
protected final ActiveEvent statsUpdatedEvent;
protected final String runtimeName;
protected final IRetryPolicyFactory retryPolicyFactory;
@@ -100,7 +102,7 @@
protected Exception recoverFailure;
public ActiveEntityEventsListener(IStatementExecutor statementExecutor,
ICcApplicationContext appCtx,
- IHyracksClientConnection hcc, EntityId entityId, List<Dataset>
datasets,
+ IHyracksClientConnection hcc, EntityId entityId, List<Dataset>
datasets, List<FunctionSignature> functions,
AlgebricksAbsolutePartitionConstraint locations, String
runtimeName, IRetryPolicyFactory retryPolicyFactory)
throws HyracksDataException {
this.statementExecutor = statementExecutor;
@@ -110,6 +112,7 @@
this.hcc = hcc;
this.entityId = entityId;
this.datasets = datasets;
+ this.functions = functions;
this.retryPolicyFactory = retryPolicyFactory;
this.state = ActivityState.STOPPED;
this.statsTimestamp = -1;
@@ -178,7 +181,6 @@
}
}
- @SuppressWarnings("unchecked")
protected void finish(ActiveEvent event) throws HyracksDataException {
LOGGER.log(level, "the job " + jobId + " finished");
if (numRegistered != numDeRegistered) {
@@ -233,6 +235,11 @@
}
@Override
+ public synchronized boolean dependsOnFunction(FunctionSignature function) {
+ return getFunctions().contains(function);
+ }
+
+ @Override
public synchronized void remove(Dataset dataset) throws
HyracksDataException {
if (isActive()) {
throw new
RuntimeDataException(ErrorCode.CANNOT_REMOVE_DATASET_FROM_ACTIVE_ENTITY,
entityId, state);
@@ -272,7 +279,6 @@
return strBuilder.toString();
}
- @SuppressWarnings("unchecked")
@Override
public void refreshStats(long timeout) throws HyracksDataException {
LOGGER.log(level, "refreshStats called");
@@ -540,6 +546,11 @@
}
@Override
+ public List<FunctionSignature> getFunctions() {
+ return functions;
+ }
+
+ @Override
public synchronized void replace(Dataset dataset) {
if (getDatasets().contains(dataset)) {
getDatasets().remove(dataset);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
index c0ce6ec..64bc12a 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java
@@ -25,17 +25,12 @@
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveEntityEventSubscriber;
import org.apache.asterix.active.IRetryPolicyFactory;
-import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
-import org.apache.asterix.app.translator.QueryTranslator;
-import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.utils.JobUtils;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
-import org.apache.asterix.file.StorageComponentProvider;
import org.apache.asterix.lang.common.statement.StartFeedStatement;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
@@ -45,7 +40,6 @@
import org.apache.asterix.utils.FeedOperations;
import org.apache.commons.lang3.tuple.Pair;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobSpecification;
@@ -56,10 +50,11 @@
private final List<FeedConnection> feedConnections;
public FeedEventsListener(IStatementExecutor statementExecutor,
ICcApplicationContext appCtx,
- IHyracksClientConnection hcc, EntityId entityId, List<Dataset>
datasets,
+ IHyracksClientConnection hcc, EntityId entityId, List<Dataset>
datasets, List<FunctionSignature> functions,
AlgebricksAbsolutePartitionConstraint locations, String
runtimeName, IRetryPolicyFactory retryPolicyFactory,
Feed feed, final List<FeedConnection> feedConnections) throws
HyracksDataException {
- super(statementExecutor, appCtx, hcc, entityId, datasets, locations,
runtimeName, retryPolicyFactory);
+ super(statementExecutor, appCtx, hcc, entityId, datasets, functions,
locations, runtimeName,
+ retryPolicyFactory);
this.feed = feed;
this.feedConnections = feedConnections;
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index c69f5dc..13ab95f 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -123,6 +123,7 @@
import org.apache.asterix.lang.common.statement.TypeDropStatement;
import org.apache.asterix.lang.common.statement.WriteStatement;
import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.util.MergePolicyUtils;
import org.apache.asterix.lang.sqlpp.rewrites.SqlppRewriterFactory;
import org.apache.asterix.metadata.IDatasetDetails;
import org.apache.asterix.metadata.MetadataManager;
@@ -157,7 +158,6 @@
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.types.TypeSignature;
-import org.apache.asterix.lang.common.util.MergePolicyUtils;
import
org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
import org.apache.asterix.translator.AbstractLangTranslator;
import
org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
@@ -1221,8 +1221,9 @@
List<Function> functionsInDataverse =
MetadataManager.INSTANCE.getDataverseFunctions(mdTxnCtx,
dataverseName);
for (Function function : functionsInDataverse) {
- if (checkWhetherFunctionIsBeingUsed(mdTxnCtx,
function.getDataverseName(), function.getName(),
- function.getArity(), dataverseName)) {
+ if (checkWhetherFunctionIsBeingUsed(
+ new FunctionSignature(function.getDataverseName(),
function.getName(), function.getArity()),
+ false)) {
throw new
MetadataException(ErrorCode.METADATA_DROP_FUCTION_IN_USE,
function.getDataverseName() + "." +
function.getName() + "@" + function.getArity());
}
@@ -1704,22 +1705,17 @@
}
}
- protected boolean
checkWhetherFunctionIsBeingUsed(MetadataTransactionContext ctx, String
dataverseName,
- String functionName, int arity, String currentDataverse) throws
AlgebricksException {
- List<Dataverse> allDataverses =
MetadataManager.INSTANCE.getDataverses(ctx);
- for (Dataverse dataverse : allDataverses) {
- if (currentDataverse != null &&
dataverse.getDataverseName().equals(currentDataverse)) {
+ protected boolean checkWhetherFunctionIsBeingUsed(FunctionSignature
function, boolean checkLocalDataverse)
+ throws AlgebricksException {
+ ActiveNotificationHandler activeEventHandler =
+ (ActiveNotificationHandler)
appCtx.getActiveNotificationHandler();
+ IActiveEntityEventsListener[] listeners =
activeEventHandler.getEventListeners();
+ for (IActiveEntityEventsListener listener : listeners) {
+ if (!checkLocalDataverse &&
listener.getEntityId().getDataverse().equals(function.getNamespace())) {
continue;
}
- List<Feed> feeds = MetadataManager.INSTANCE.getFeeds(ctx,
dataverse.getDataverseName());
- for (Feed feed : feeds) {
- List<FeedConnection> feedConnections =
MetadataManager.INSTANCE.getFeedConections(ctx,
- dataverse.getDataverseName(), feed.getFeedName());
- for (FeedConnection conn : feedConnections) {
- if (conn.containsFunction(dataverseName, functionName,
arity)) {
- return true;
- }
- }
+ if (listener.dependsOnFunction(function)) {
+ return true;
}
}
return false;
@@ -1737,8 +1733,7 @@
Function function = MetadataManager.INSTANCE.getFunction(mdTxnCtx,
signature);
if (function == null && !stmtDropFunction.getIfExists()) {
throw new AlgebricksException("Unknonw function " + signature);
- } else if (checkWhetherFunctionIsBeingUsed(mdTxnCtx,
signature.getNamespace(), signature.getName(),
- signature.getArity(), null)) {
+ } else if (checkWhetherFunctionIsBeingUsed(signature, true)) {
throw new
MetadataException(ErrorCode.METADATA_DROP_FUCTION_IN_USE, signature);
} else {
MetadataManager.INSTANCE.dropFunction(mdTxnCtx, signature);
@@ -2126,13 +2121,15 @@
if (listener == null) {
// Prepare policy
List<Dataset> datasets = new ArrayList<>();
+ List<FunctionSignature> functions = new ArrayList<>();
for (FeedConnection connection : feedConnections) {
Dataset ds =
metadataProvider.findDataset(connection.getDataverseName(),
connection.getDatasetName());
datasets.add(ds);
+ functions.addAll(connection.getAppliedFunctions());
}
listener = new FeedEventsListener(this,
metadataProvider.getApplicationContext(), hcc, entityId,
- datasets, null,
FeedIntakeOperatorNodePushable.class.getSimpleName(),
+ datasets, functions, null,
FeedIntakeOperatorNodePushable.class.getSimpleName(),
NoRetryPolicyFactory.INSTANCE, feed, feedConnections);
}
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
index 40d4f6a..17c21ae 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveEventsListenerTest.java
@@ -135,7 +135,7 @@
nodeControllers[0] = new TestNodeControllerActor(nodes[0],
clusterController);
nodeControllers[1] = new TestNodeControllerActor(nodes[1],
clusterController);
listener = new TestEventsListener(clusterController, nodeControllers,
jobIdFactory, entityId,
- new ArrayList<>(allDatasets), statementExecutor, appCtx, hcc,
locations,
+ new ArrayList<>(allDatasets), new ArrayList<>(),
statementExecutor, appCtx, hcc, locations,
new InfiniteRetryPolicyFactory());
users = new TestUserActor[3];
users[0] = newUser("Till", appCtx);
@@ -346,7 +346,6 @@
Assert.assertEquals(ActivityState.RUNNING, listener.getState());
}
- @SuppressWarnings("deprecation")
@Test
public void testRecovery() throws Exception {
testStartWhenStartSucceed();
@@ -459,12 +458,11 @@
Assert.assertEquals(ActivityState.STOPPED, listener.getState());
}
- @SuppressWarnings("deprecation")
@Test
public void testRecoveryFailureAfterOneAttemptCompilationFailure() throws
Exception {
handler.unregisterListener(listener);
listener = new TestEventsListener(clusterController, nodeControllers,
jobIdFactory, entityId,
- new ArrayList<>(allDatasets), statementExecutor, appCtx, hcc,
locations,
+ new ArrayList<>(allDatasets), new ArrayList<>(),
statementExecutor, appCtx, hcc, locations,
new CountRetryPolicyFactory(1));
testStartWhenStartSucceed();
WaitForStateSubscriber tempFailSubscriber =
@@ -503,12 +501,11 @@
Assert.assertEquals(ActivityState.STOPPED, listener.getState());
}
- @SuppressWarnings("deprecation")
@Test
public void testRecoveryFailureAfterOneAttemptRuntimeFailure() throws
Exception {
handler.unregisterListener(listener);
listener = new TestEventsListener(clusterController, nodeControllers,
jobIdFactory, entityId,
- new ArrayList<>(allDatasets), statementExecutor, appCtx, hcc,
locations,
+ new ArrayList<>(allDatasets), new ArrayList<>(),
statementExecutor, appCtx, hcc, locations,
new CountRetryPolicyFactory(1));
testStartWhenStartSucceed();
WaitForStateSubscriber tempFailSubscriber =
@@ -523,12 +520,12 @@
Assert.assertEquals(ActivityState.PERMANENTLY_FAILED,
listener.getState());
}
- @SuppressWarnings("deprecation")
@Test
public void testRecoveryFailure() throws Exception {
handler.unregisterListener(listener);
listener = new TestEventsListener(clusterController, nodeControllers,
jobIdFactory, entityId,
- new ArrayList<>(allDatasets), statementExecutor, appCtx, hcc,
locations, NoRetryPolicyFactory.INSTANCE);
+ new ArrayList<>(allDatasets), new ArrayList<>(),
statementExecutor, appCtx, hcc, locations,
+ NoRetryPolicyFactory.INSTANCE);
testStartWhenStartSucceed();
WaitForStateSubscriber tempFailSubscriber =
new WaitForStateSubscriber(listener,
EnumSet.of(ActivityState.TEMPORARILY_FAILED));
@@ -541,7 +538,6 @@
Assert.assertEquals(ActivityState.PERMANENTLY_FAILED,
listener.getState());
}
- @SuppressWarnings("deprecation")
@Test
public void testStopDuringRecoveryAttemptThatSucceeds() throws Exception {
testStartWhenStartSucceed();
@@ -569,7 +565,6 @@
Assert.assertEquals(ActivityState.STOPPED, listener.getState());
}
- @SuppressWarnings("deprecation")
@Test
public void testStopDuringRecoveryAttemptThatFailsCompile() throws
Exception {
testStartWhenStartSucceed();
@@ -597,7 +592,6 @@
Assert.assertEquals(ActivityState.STOPPED, listener.getState());
}
- @SuppressWarnings("deprecation")
@Test
public void testStopDuringRecoveryAttemptThatFailsRuntime() throws
Exception {
testStartWhenStartSucceed();
@@ -625,7 +619,6 @@
Assert.assertEquals(ActivityState.STOPPED, listener.getState());
}
- @SuppressWarnings("deprecation")
@Test
public void testStartDuringRecoveryAttemptThatSucceeds() throws Exception {
testStartWhenStartSucceed();
@@ -651,7 +644,6 @@
Assert.assertEquals(ActivityState.RUNNING, listener.getState());
}
- @SuppressWarnings("deprecation")
@Test
public void testStartDuringRecoveryAttemptThatFailsCompile() throws
Exception {
testStartWhenStartSucceed();
@@ -676,7 +668,6 @@
assertFailure(action, ErrorCode.ACTIVE_ENTITY_ALREADY_STARTED);
}
- @SuppressWarnings("deprecation")
@Test
public void testStartDuringRecoveryAttemptThatFailsRuntime() throws
Exception {
testStartWhenStartSucceed();
@@ -700,7 +691,6 @@
assertFailure(action, ErrorCode.ACTIVE_ENTITY_ALREADY_STARTED);
}
- @SuppressWarnings("deprecation")
@Test
public void
testSuspendDuringRecoveryAttemptThatSucceedsThenResumeSucceeds() throws
Exception {
testStartWhenStartSucceed();
@@ -731,7 +721,6 @@
Assert.assertEquals(ActivityState.RUNNING, listener.getState());
}
- @SuppressWarnings("deprecation")
@Test
public void
testSuspendDuringRecoveryAttemptThatSucceedsThenResumeFailsCompile() throws
Exception {
testStartWhenStartSucceed();
@@ -774,7 +763,6 @@
Assert.assertEquals(ActivityState.RUNNING, listener.getState());
}
- @SuppressWarnings("deprecation")
@Test
public void
testSuspendDuringRecoveryAttemptThatSucceedsThenResumeFailsRuntime() throws
Exception {
testStartWhenStartSucceed();
@@ -817,7 +805,6 @@
Assert.assertEquals(ActivityState.RUNNING, listener.getState());
}
- @SuppressWarnings("deprecation")
@Test
public void testSuspendDuringRecoveryAttemptThatFailsCompile() throws
Exception {
testStartWhenStartSucceed();
@@ -859,7 +846,6 @@
Assert.assertEquals(ActivityState.RUNNING, listener.getState());
}
- @SuppressWarnings("deprecation")
@Test
public void testSuspendDuringRecoveryAttemptThatFailsRuntime() throws
Exception {
testStartWhenStartSucceed();
@@ -901,7 +887,6 @@
Assert.assertEquals(ActivityState.RUNNING, listener.getState());
}
- @SuppressWarnings("deprecation")
@Test
public void testCreateNewShadowDuringRecoveryAttemptThatSucceeds() throws
Exception {
testStartWhenStartSucceed();
@@ -929,7 +914,6 @@
Assert.assertEquals(clusterController.getAllDatasets().size(),
listener.getDatasets().size());
}
- @SuppressWarnings("deprecation")
@Test
public void testCreateNewShadowDuringRecoveryAttemptThatFailsCompile()
throws Exception {
testStartWhenStartSucceed();
@@ -956,7 +940,6 @@
Assert.assertEquals(clusterController.getAllDatasets().size(),
listener.getDatasets().size());
}
- @SuppressWarnings("deprecation")
@Test
public void testCreateNewShadowDuringRecoveryAttemptThatFailsRuntime()
throws Exception {
testStartWhenStartSucceed();
@@ -1056,7 +1039,6 @@
Assert.assertEquals(clusterController.getAllDatasets().size(),
listener.getDatasets().size());
}
- @SuppressWarnings("deprecation")
@Test
public void testDeleteShadowDuringRecoveryAttemptThatSucceeds() throws
Exception {
testStartWhenStartSucceed();
@@ -1082,7 +1064,6 @@
Assert.assertEquals(clusterController.getAllDatasets().size(),
listener.getDatasets().size());
}
- @SuppressWarnings("deprecation")
@Test
public void testDeleteShadowDuringRecoveryAttemptThatFailsCompile() throws
Exception {
testStartWhenStartSucceed();
@@ -1107,7 +1088,6 @@
Assert.assertEquals(clusterController.getAllDatasets().size(),
listener.getDatasets().size());
}
- @SuppressWarnings("deprecation")
@Test
public void testDeleteShadowDuringRecoveryAttemptThatFailsRuntime() throws
Exception {
testStartWhenStartSucceed();
@@ -1197,7 +1177,6 @@
Assert.assertEquals(clusterController.getAllDatasets().size(),
listener.getDatasets().size());
}
- @SuppressWarnings("deprecation")
@Test
public void testCreateNewIndexDuringRecoveryAttemptThatSucceeds() throws
Exception {
testStartWhenStartSucceed();
@@ -1221,7 +1200,6 @@
assertFailure(add,
ErrorCode.CANNOT_ADD_INDEX_TO_DATASET_CONNECTED_TO_ACTIVE_ENTITY);
}
- @SuppressWarnings("deprecation")
@Test
public void testCreateNewIndexDuringRecoveryAttemptThatFailsCompile()
throws Exception {
testStartWhenStartSucceed();
@@ -1244,7 +1222,6 @@
assertFailure(add,
ErrorCode.CANNOT_ADD_INDEX_TO_DATASET_CONNECTED_TO_ACTIVE_ENTITY);
}
- @SuppressWarnings("deprecation")
@Test
public void testCreateNewIndexDuringRecoveryAttemptThatFailsRuntime()
throws Exception {
testStartWhenStartSucceed();
@@ -1321,7 +1298,6 @@
assertFailure(add,
ErrorCode.CANNOT_ADD_INDEX_TO_DATASET_CONNECTED_TO_ACTIVE_ENTITY);
}
- @SuppressWarnings("deprecation")
@Test
public void testDeleteIndexDuringRecoveryAttemptThatSucceeds() throws
Exception {
testStartWhenStartSucceed();
@@ -1345,7 +1321,6 @@
assertFailure(drop,
ErrorCode.CANNOT_REMOVE_INDEX_FROM_DATASET_CONNECTED_TO_ACTIVE_ENTITY);
}
- @SuppressWarnings("deprecation")
@Test
public void testDeleteIndexDuringRecoveryAttemptThatFailsCompile() throws
Exception {
testStartWhenStartSucceed();
@@ -1368,7 +1343,6 @@
assertFailure(drop,
ErrorCode.CANNOT_REMOVE_INDEX_FROM_DATASET_CONNECTED_TO_ACTIVE_ENTITY);
}
- @SuppressWarnings("deprecation")
@Test
public void testDeleteIndexDuringRecoveryAttemptThatFailsRuntime() throws
Exception {
testStartWhenStartSucceed();
@@ -1470,8 +1444,8 @@
Mockito.when(ccAppCtx.getStorageComponentProvider()).thenReturn(componentProvider);
AlgebricksAbsolutePartitionConstraint locations = new
AlgebricksAbsolutePartitionConstraint(nodes);
additionalListeners[i] = listener = new
TestEventsListener(clusterController, nodeControllers, jobIdFactory,
- entityId, new ArrayList<>(allDatasets), statementExecutor,
ccAppCtx, hcc, locations,
- new InfiniteRetryPolicyFactory());
+ entityId, new ArrayList<>(allDatasets), new ArrayList<>(),
statementExecutor, ccAppCtx, hcc,
+ locations, new InfiniteRetryPolicyFactory());
}
Action suspension = users[0].suspendAllActivities(handler);
suspension.sync();
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index df7756b..5f9a2f7 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -37,6 +37,7 @@
import org.apache.asterix.app.cc.CCExtensionManager;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -74,6 +75,7 @@
ActiveRuntimeId activeRuntimeId = new ActiveRuntimeId(entityId,
FeedIntakeOperatorNodePushable.class.getSimpleName(), 0);
List<Dataset> datasetList = new ArrayList<>();
+ List<FunctionSignature> functionList = new ArrayList<>();
AlgebricksAbsolutePartitionConstraint partitionConstraint = new
AlgebricksAbsolutePartitionConstraint(
new String[] { "asterix_nc1" });
String requestedStats;
@@ -101,7 +103,8 @@
MetadataProvider mdProvider = new MetadataProvider(appCtx, null);
// Add event listener
ActiveEntityEventsListener eventsListener = new
DummyFeedEventsListener(statementExecutor, appCtx, null,
- entityId, datasetList, partitionConstraint,
FeedIntakeOperatorNodePushable.class.getSimpleName(),
+ entityId, datasetList, functionList, partitionConstraint,
+ FeedIntakeOperatorNodePushable.class.getSimpleName(),
NoRetryPolicyFactory.INSTANCE, null, Collections.emptyList());
// Register mock runtime
NCAppRuntimeContext nc1AppCtx = (NCAppRuntimeContext)
ExecutionTestUtil.integrationUtil.ncs[0]
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
index c269803..bf86658 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/DummyFeedEventsListener.java
@@ -28,6 +28,7 @@
import org.apache.asterix.active.IRetryPolicyFactory;
import org.apache.asterix.app.active.FeedEventsListener;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
@@ -35,18 +36,17 @@
import org.apache.asterix.metadata.entities.FeedConnection;
import org.apache.asterix.translator.IStatementExecutor;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class DummyFeedEventsListener extends FeedEventsListener {
public DummyFeedEventsListener(IStatementExecutor statementExecutor,
ICcApplicationContext appCtx,
- IHyracksClientConnection hcc, EntityId entityId, List<Dataset>
datasets,
+ IHyracksClientConnection hcc, EntityId entityId, List<Dataset>
datasets, List<FunctionSignature> functions,
AlgebricksAbsolutePartitionConstraint locations, String
runtimeName, IRetryPolicyFactory retryPolicyFactory,
Feed feed, List<FeedConnection> feedConnections) throws
HyracksDataException {
- super(statementExecutor, appCtx, hcc, entityId, datasets, locations,
runtimeName, retryPolicyFactory, feed,
- feedConnections);
+ super(statementExecutor, appCtx, hcc, entityId, datasets, functions,
locations, runtimeName, retryPolicyFactory,
+ feed, feedConnections);
}
@Override
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
index 1cedc96..fbbde6b 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestEventsListener.java
@@ -30,6 +30,7 @@
import org.apache.asterix.app.active.ActiveEntityEventsListener;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.metadata.LockList;
import org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -63,11 +64,12 @@
private Behavior onStop = Behavior.FAIL_COMPILE;
public TestEventsListener(TestClusterControllerActor clusterController,
TestNodeControllerActor[] nodeControllers,
- JobIdFactory jobIdFactory, EntityId entityId, List<Dataset>
datasets, IStatementExecutor statementExecutor,
- ICcApplicationContext appCtx, IHyracksClientConnection hcc,
AlgebricksAbsolutePartitionConstraint locations,
- IRetryPolicyFactory retryPolicyFactory) throws
HyracksDataException {
- super(statementExecutor, appCtx, hcc, entityId, datasets, locations,
TestEventsListener.class.getSimpleName(),
- retryPolicyFactory);
+ JobIdFactory jobIdFactory, EntityId entityId, List<Dataset>
datasets, List<FunctionSignature> functions,
+ IStatementExecutor statementExecutor, ICcApplicationContext
appCtx, IHyracksClientConnection hcc,
+ AlgebricksAbsolutePartitionConstraint locations,
IRetryPolicyFactory retryPolicyFactory)
+ throws HyracksDataException {
+ super(statementExecutor, appCtx, hcc, entityId, datasets, functions,
locations,
+ TestEventsListener.class.getSimpleName(), retryPolicyFactory);
this.clusterController = clusterController;
this.nodeControllers = nodeControllers;
this.jobIdFactory = jobIdFactory;
@@ -84,7 +86,6 @@
}
}
- @SuppressWarnings("deprecation")
private void failCompile(Behavior behavior) throws HyracksDataException {
if (behavior == Behavior.FAIL_COMPILE || behavior ==
Behavior.STEP_FAIL_COMPILE) {
throw new HyracksDataException("Compilation Failure");
@@ -103,7 +104,6 @@
}
}
- @SuppressWarnings("deprecation")
@Override
protected void doStart(MetadataProvider metadataProvider) throws
HyracksDataException {
step(onStart);
@@ -142,7 +142,6 @@
}
}
- @SuppressWarnings("deprecation")
@Override
protected Void doStop(MetadataProvider metadataProvider) throws
HyracksDataException {
step(onStop);
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/functions/FunctionSignature.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/functions/FunctionSignature.java
index d4b4215..6331c9f 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/functions/FunctionSignature.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/functions/FunctionSignature.java
@@ -57,7 +57,7 @@
@Override
public int hashCode() {
- return (namespace + "." + name).hashCode();
+ return (namespace + "." + name + "@" + arity).hashCode();
}
public String getNamespace() {
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java
index c73a433..f9333f3 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java
@@ -21,6 +21,7 @@
import java.util.List;
import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -98,6 +99,11 @@
List<Dataset> getDatasets();
/**
+ * @return the list of associated functions
+ */
+ List<FunctionSignature> getFunctions();
+
+ /**
* replace the dataset object with the passed updated object
*
* @param target
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
index 8e14ee5..6818cc5 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/FeedConnection.java
@@ -19,13 +19,13 @@
package org.apache.asterix.metadata.entities;
+import java.util.List;
+
import org.apache.asterix.active.EntityId;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.metadata.MetadataCache;
import org.apache.asterix.metadata.api.IMetadataEntity;
-
-import java.util.List;
/**
* Feed connection records the feed --> dataset mapping.
@@ -111,15 +111,5 @@
public EntityId getFeedId() {
return feedId;
- }
-
- public boolean containsFunction(String dataverseName, String functionName,
int arity) {
- for (FunctionSignature signature : this.appliedFunctions) {
- if (signature.getNamespace().equals(dataverseName) &&
signature.getName().equals(functionName)
- && signature.getArity() == arity) {
- return true;
- }
- }
- return false;
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2249
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I62393b65eddc4c2520fc8a0f3f80960551f4a159
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <[email protected]>