Steven Jacobs has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1473
Change subject: Added Procedures to BAD
......................................................................
Added Procedures to BAD
Change-Id: I03550a74e2c90179e72345103b3d2c4f98148631
---
M asterix-bad/pom.xml
M asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
R asterix-bad/src/main/java/org/apache/asterix/bad/DistributedJobInfo.java
M asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
A
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
A
asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
A
asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseProceduresSearchKey.java
R
asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
M
asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
D
asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
D
asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
M asterix-bad/src/main/resources/lang-extension/lang.txt
A
asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.1.ddl.aql
A
asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.2.query.aql
A
asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.3.update.aql
A
asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.4.query.aql
A
asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.1.ddl.aql
A
asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.2.update.aql
A
asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.3.update.aql
A
asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.4.update.aql
A
asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.5.query.aql
A
asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.1.ddl.aql
A
asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.2.update.aql
A
asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.3.update.aql
A
asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.4.update.aql
A
asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.5.update.aql
A
asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.1.ddl.aql
A
asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.2.update.aql
A
asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.3.sleep.aql
A
asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.4.ddl.aql
A
asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.5.query.aql
A
asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure/delete_procedure.2.adm
A
asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure/delete_procedure.4.adm
A
asterix-bad/src/test/resources/runtimets/results/procedure/insert_procedure/insert_procedure.5.adm
A
asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure/query_procedure.3.adm
A
asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure/query_procedure.5.adm
A
asterix-bad/src/test/resources/runtimets/results/procedure/repetitive_insert_procedure/repetitive_insert_procedure.5.adm
M asterix-bad/src/test/resources/runtimets/testsuite.xml
46 files changed, 870 insertions(+), 362 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad
refs/changes/73/1473/1
diff --git a/asterix-bad/pom.xml b/asterix-bad/pom.xml
index 9065680..0d32652 100644
--- a/asterix-bad/pom.xml
+++ b/asterix-bad/pom.xml
@@ -247,11 +247,6 @@
<version>${hyracks.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-dataflow-std</artifactId>
- <version>${hyracks.version}</version>
- </dependency>
- <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
index d1df438..eba8ca1 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobService.java
@@ -70,7 +70,7 @@
if (jobId == null) {
hcc.startJob(jobSpec, jobFlags);
} else {
- hcc.startJob(jobSpec, jobFlags, jobId);
+ hcc.startJob(jobId);
}
}
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/DistributedJobInfo.java
similarity index 90%
rename from asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java
rename to
asterix-bad/src/main/java/org/apache/asterix/bad/DistributedJobInfo.java
index da0c43b..b937c59 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/ChannelJobInfo.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/DistributedJobInfo.java
@@ -27,12 +27,12 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
-public class ChannelJobInfo extends ActiveJob {
+public class DistributedJobInfo extends ActiveJob {
private static final long serialVersionUID = 1L;
private List<String> locations;
- public ChannelJobInfo(EntityId entityId, JobId jobId, ActivityState state,
JobSpecification spec) {
+ public DistributedJobInfo(EntityId entityId, JobId jobId, ActivityState
state, JobSpecification spec) {
super(entityId, jobId, state, ChannelJobType.REPETITIVE, spec);
}
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
index 959600f..1d46bc4 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADLangExtension.java
@@ -27,6 +27,7 @@
import org.apache.asterix.bad.metadata.ChannelSearchKey;
import org.apache.asterix.bad.metadata.DataverseBrokersSearchKey;
import org.apache.asterix.bad.metadata.DataverseChannelsSearchKey;
+import org.apache.asterix.bad.metadata.DataverseProceduresSearchKey;
import org.apache.asterix.bad.metadata.Procedure;
import org.apache.asterix.bad.metadata.ProcedureSearchKey;
import org.apache.asterix.common.api.ExtensionId;
@@ -119,4 +120,10 @@
return MetadataManager.INSTANCE.getEntities(mdTxnCtx,
channelSearchKey);
}
+ public static List<Procedure> getProcedures(MetadataTransactionContext
mdTxnCtx, String dataverseName)
+ throws AlgebricksException {
+ DataverseProceduresSearchKey proceduresSearchKey = new
DataverseProceduresSearchKey(dataverseName);
+ return MetadataManager.INSTANCE.getEntities(mdTxnCtx,
proceduresSearchKey);
+ }
+
}
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
index fa18867..202fee8 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java
@@ -23,8 +23,11 @@
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.lang.statement.BrokerDropStatement;
import org.apache.asterix.bad.lang.statement.ChannelDropStatement;
+import org.apache.asterix.bad.lang.statement.ProcedureDropStatement;
import org.apache.asterix.bad.metadata.Broker;
import org.apache.asterix.bad.metadata.Channel;
+import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.compiler.provider.ILangCompilationProvider;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.statement.DataverseDropStatement;
@@ -63,6 +66,12 @@
new Identifier(channel.getChannelId().getEntityName()),
false);
drop.handle(this, metadataProvider, hcc, null, null, null, 0);
}
+ List<Procedure> procedures = BADLangExtension.getProcedures(mdTxnCtx,
dvId.getValue());
+ for (Procedure procedure : procedures) {
+ ProcedureDropStatement drop = new ProcedureDropStatement(new
FunctionSignature(dvId.getValue(),
+ procedure.getEntityId().getEntityName(),
procedure.getArity()), false);
+ drop.handle(this, metadataProvider, hcc, null, null, null, 0);
+ }
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
}
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
index 94206df..7eb72f8 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelDropStatement.java
@@ -18,26 +18,17 @@
*/
package org.apache.asterix.bad.lang.statement;
-import java.util.HashSet;
-import java.util.Set;
-
import org.apache.asterix.active.ActiveJobNotificationHandler;
-import org.apache.asterix.active.ActiveRuntimeId;
+import org.apache.asterix.active.ActiveLifecycleListener;
import org.apache.asterix.active.EntityId;
-import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.ChannelJobInfo;
import org.apache.asterix.bad.lang.BADLangExtension;
import org.apache.asterix.bad.metadata.Channel;
-import org.apache.asterix.bad.metadata.ChannelEventsListener;
-import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorNodePushable;
-import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.common.messaging.api.ICCMessageBroker;
import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
-import
org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
import
org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
import org.apache.asterix.lang.common.statement.DropDatasetStatement;
import org.apache.asterix.lang.common.struct.Identifier;
@@ -45,15 +36,14 @@
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.runtime.util.AppContextInfo;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
-import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
public class ChannelDropStatement implements IExtensionStatement {
@@ -102,7 +92,7 @@
String dataverse = ((QueryTranslator)
statementExecutor).getActiveDataverse(dataverseName);
boolean txnActive = false;
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME,
dataverse, channelName.getValue());
- ChannelEventsListener listener = (ChannelEventsListener)
ActiveJobNotificationHandler.INSTANCE
+ PrecompiledJobEventListener listener = (PrecompiledJobEventListener)
ActiveJobNotificationHandler.INSTANCE
.getActiveEntityListener(entityId);
IActiveLifecycleEventSubscriber eventSubscriber = new
ActiveLifecycleEventSubscriber();
boolean subscriberRegistered = false;
@@ -122,29 +112,14 @@
throw new AlgebricksException("There is no channel with
this name " + channelName + ".");
}
}
- if (listener != null) {
- subscriberRegistered = listener.isChannelActive(entityId,
eventSubscriber);
- }
- if (!subscriberRegistered) {
- throw new AsterixException("Channel " + channelName + " is not
running");
- }
- ICCMessageBroker messageBroker =
- (ICCMessageBroker)
AppContextInfo.INSTANCE.getCCApplicationContext().getMessageBroker();
-
- ChannelJobInfo cInfo =
listener.getJobInfo(channel.getChannelId());;
- Set<String> ncs = new HashSet<>(cInfo.getLocations());
- AlgebricksAbsolutePartitionConstraint locations = new
AlgebricksAbsolutePartitionConstraint(
- ncs.toArray(new String[ncs.size()]));
- int partition = 0;
- for (String location : locations.getLocations()) {
- messageBroker.sendApplicationMessageToNC(
- new
ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY, "cc",
- new ActiveRuntimeId(channel.getChannelId(),
-
RepetitiveChannelOperatorNodePushable.class.getSimpleName(), partition++)),
- location);
+ listener.getExecutorService().shutdownNow();
+ JobId hyracksJobId = listener.getHyracksJobId();
+ if (hyracksJobId != null) {
+ hcc.destroyJob(hyracksJobId);
}
- eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_ENDED);
+ listener.deActivate();
+ ActiveLifecycleListener.INSTANCE.notifyJobFinish(hyracksJobId);
//Drop the Channel Datasets
//TODO: Need to find some way to handle if this fails.
@@ -157,9 +132,6 @@
new Identifier(channel.getSubscriptionsDataset()), true);
((QueryTranslator)
statementExecutor).handleDatasetDropStatement(metadataProvider, dropStmt, hcc);
- if (subscriberRegistered) {
- listener.deregisterEventSubscriber(eventSubscriber);
- }
//Remove the Channel Metadata
MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, channel);
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
index fa57503..1558508 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelUnsubscribeStatement.java
@@ -156,7 +156,7 @@
AqlDeleteRewriteVisitor visitor = new AqlDeleteRewriteVisitor();
delete.accept(visitor, null);
- ((QueryTranslator)
statementExecutor).handleDeleteStatement(metadataProvider, delete, hcc);
+ ((QueryTranslator)
statementExecutor).handleDeleteStatement(metadataProvider, delete, hcc, false);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
QueryTranslator.abort(e, e, mdTxnCtx);
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
index 671fff1..391cbc8 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java
@@ -22,12 +22,10 @@
import java.io.DataOutputStream;
import java.io.StringReader;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -37,12 +35,12 @@
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.ChannelJobInfo;
import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.bad.DistributedJobInfo;
import org.apache.asterix.bad.lang.BADLangExtension;
import org.apache.asterix.bad.metadata.Channel;
-import org.apache.asterix.bad.metadata.ChannelEventsListener;
-import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorDescriptor;
+import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
+import
org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
@@ -50,7 +48,6 @@
import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
import
org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
import
org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
-import org.apache.asterix.file.JobSpecificationUtils;
import org.apache.asterix.lang.aql.parser.AQLParserFactory;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.Statement;
@@ -69,21 +66,11 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Function;
import org.apache.asterix.om.base.temporal.ADurationParserFactory;
-import org.apache.asterix.runtime.util.AppContextInfo;
-import org.apache.asterix.runtime.util.ClusterStateManager;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.asterix.util.JobUtils;
-import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
-import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.api.application.ICCApplicationContext;
-import org.apache.hyracks.api.client.ClusterControllerInfo;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobFlag;
@@ -183,36 +170,6 @@
return Kind.EXTENSION;
}
- public Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint>
buildChannelJobSpec(String dataverse,
- String channelName, String duration, MetadataProvider
metadataProvider, JobSpecification channeljobSpec,
- String strIP, int port) throws Exception {
- JobSpecification spec = JobSpecificationUtils.createJobSpecification();
- IOperatorDescriptor channelQueryExecuter;
- AlgebricksPartitionConstraint executerPc;
-
- Pair<IOperatorDescriptor, AlgebricksAbsolutePartitionConstraint> p =
buildChannelRuntime(spec, dataverse,
- channelName, duration, channeljobSpec, strIP, port);
- channelQueryExecuter = p.first;
- executerPc = p.second;
-
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
channelQueryExecuter, executerPc);
- spec.addRoot(channelQueryExecuter);
- return new Pair<>(spec, p.second);
-
- }
-
- public Pair<IOperatorDescriptor, AlgebricksAbsolutePartitionConstraint>
buildChannelRuntime(
- JobSpecification jobSpec, String dataverse, String channelName,
String duration,
- JobSpecification channeljobSpec, String strIP, int port) throws
Exception {
- RepetitiveChannelOperatorDescriptor channelOp = new
RepetitiveChannelOperatorDescriptor(jobSpec, dataverse,
- channelName, duration, channeljobSpec, strIP, port);
-
- String partition =
ClusterStateManager.INSTANCE.getClusterLocations().getLocations()[0];
- Set<String> ncs = new HashSet<>(Arrays.asList(partition));
- AlgebricksAbsolutePartitionConstraint partitionConstraint = new
AlgebricksAbsolutePartitionConstraint(
- ncs.toArray(new String[ncs.size()]));
- return new Pair<IOperatorDescriptor,
AlgebricksAbsolutePartitionConstraint>(channelOp, partitionConstraint);
- }
-
private void createDatasets(IStatementExecutor statementExecutor,
Identifier subscriptionsName,
Identifier resultsName, MetadataProvider metadataProvider,
IHyracksClientConnection hcc,
IHyracksDataset hdc, Stats stats, String dataverse) throws
AsterixException, Exception {
@@ -281,28 +238,19 @@
hcc, hdc, ResultDelivery.ASYNC, stats, true);
}
- private void setupCompiledJob(MetadataProvider metadataProvider, String
dataverse, EntityId entityId,
- JobSpecification channeljobSpec, IHyracksClientConnection hcc)
throws Exception {
- ICCApplicationContext iCCApp =
AppContextInfo.INSTANCE.getCCApplicationContext();
- ClusterControllerInfo ccInfo =
iCCApp.getCCContext().getClusterControllerInfo();
- String strIP = ccInfo.getClientNetAddress();
- int port = ccInfo.getClientNetPort();
- //Create Channel Operator
- Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint>
alteredJobSpec = buildChannelJobSpec(dataverse,
- channelName.getValue(), duration, metadataProvider,
channeljobSpec, strIP, port);
-
- ChannelJobInfo channelJobInfo = new ChannelJobInfo(entityId, null,
ActivityState.ACTIVE, alteredJobSpec.first);
-
alteredJobSpec.first.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME,
channelJobInfo);
- JobUtils.runJob(hcc, alteredJobSpec.first, false);
- }
-
- private void setupDistributedJob(EntityId entityId, JobSpecification
channeljobSpec, IHyracksClientConnection hcc)
+ private void setupExecutorJob(EntityId entityId, JobSpecification
channeljobSpec, IHyracksClientConnection hcc,
+ PrecompiledJobEventListener listener, boolean predistributed)
throws Exception {
- ChannelJobInfo channelJobInfo = new ChannelJobInfo(entityId, null,
ActivityState.ACTIVE, channeljobSpec);
+ DistributedJobInfo channelJobInfo = new DistributedJobInfo(entityId,
null, ActivityState.ACTIVE, channeljobSpec);
channeljobSpec.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME,
channelJobInfo);
- JobId jobId = hcc.startJob(channeljobSpec,
EnumSet.of(JobFlag.STORE_JOB));
- ChannelJobService.startJob(channeljobSpec,
EnumSet.of(JobFlag.STORE_JOB), jobId, hcc,
- ChannelJobService.findPeriod(duration));
+ JobId jobId = null;
+ if (predistributed) {
+ jobId = hcc.distributeJob(channeljobSpec);
+ }
+ ScheduledExecutorService ses =
ChannelJobService.startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class), jobId,
+ hcc, ChannelJobService.findPeriod(duration));
+ listener.storeDistributedInfo(jobId, ses, null, null);
+ ActiveJobNotificationHandler.INSTANCE.monitorJob(jobId,
channelJobInfo);
}
@Override
@@ -324,7 +272,7 @@
Identifier subscriptionsName = new Identifier(channelName +
BADConstants.subscriptionEnding);
Identifier resultsName = new Identifier(channelName +
BADConstants.resultsEnding);
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME,
dataverse, channelName.getValue());
- ChannelEventsListener listener = (ChannelEventsListener)
ActiveJobNotificationHandler.INSTANCE
+ PrecompiledJobEventListener listener = (PrecompiledJobEventListener)
ActiveJobNotificationHandler.INSTANCE
.getActiveEntityListener(entityId);
IActiveLifecycleEventSubscriber eventSubscriber = new
ActiveLifecycleEventSubscriber();
boolean subscriberRegistered = false;
@@ -358,7 +306,7 @@
// Now we subscribe
if (listener == null) {
- listener = new ChannelEventsListener(entityId);
+ listener = new PrecompiledJobEventListener(entityId,
PrecompiledType.CHANNEL);
ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
}
listener.registerEventSubscriber(eventSubscriber);
@@ -373,9 +321,9 @@
metadataProvider, hcc, hdc, stats, dataverse);
if (distributed) {
- setupDistributedJob(entityId, channeljobSpec, hcc);
+ setupExecutorJob(entityId, channeljobSpec, hcc, listener,
true);
} else {
- setupCompiledJob(metadataProvider, dataverse, entityId,
channeljobSpec, hcc);
+ setupExecutorJob(entityId, channeljobSpec, hcc, listener,
false);
}
eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_STARTED);
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
index 66dd2ae..79afa99 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.bad.lang.statement;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
@@ -25,12 +27,16 @@
import java.util.logging.Logger;
import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.DistributedJobInfo;
import org.apache.asterix.bad.lang.BADLangExtension;
-import org.apache.asterix.bad.metadata.ChannelEventsListener;
+import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
+import
org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
import org.apache.asterix.bad.metadata.Procedure;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
@@ -39,22 +45,35 @@
import
org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
import
org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
import org.apache.asterix.lang.aql.parser.AQLParserFactory;
+import org.apache.asterix.lang.aql.visitor.AqlDeleteRewriteVisitor;
+import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.Statement;
+import org.apache.asterix.lang.common.expression.CallExpr;
+import org.apache.asterix.lang.common.expression.LiteralExpr;
+import org.apache.asterix.lang.common.literal.StringLiteral;
+import org.apache.asterix.lang.common.statement.Query;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.struct.VarIdentifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.om.base.temporal.ADurationParserFactory;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.asterix.translator.SessionConfig;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
public class CreateProcedureStatement implements IExtensionStatement {
@@ -63,6 +82,8 @@
private final FunctionSignature signature;
private final String functionBody;
private final List<String> paramList;
+ private final CallExpr period;
+ private String duration = "";
public FunctionSignature getaAterixFunction() {
return signature;
@@ -72,14 +93,15 @@
return functionBody;
}
- public CreateProcedureStatement(FunctionSignature signature,
List<VarIdentifier> parameterList,
- String functionBody) {
+ public CreateProcedureStatement(FunctionSignature signature,
List<VarIdentifier> parameterList, String functionBody,
+ Expression period) {
this.signature = signature;
this.functionBody = functionBody;
this.paramList = new ArrayList<String>();
for (VarIdentifier varId : parameterList) {
this.paramList.add(varId.getValue());
}
+ this.period = (CallExpr) period;
}
@Override
@@ -100,12 +122,32 @@
return Category.DDL;
}
+ public Expression getPeriod() {
+ return period;
+ }
+
@Override
public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws
CompilationException {
return null;
}
- private JobSpecification createProcedureJob(String body,
IStatementExecutor statementExecutor,
+ private void initialize() throws MetadataException, HyracksDataException {
+ if (period == null) {
+ return;
+ }
+ if (!period.getFunctionSignature().getName().equals("duration")) {
+ throw new MetadataException(
+ "Expected argument period as a duration, but got " +
period.getFunctionSignature().getName() + ".");
+ }
+ duration = ((StringLiteral) ((LiteralExpr)
period.getExprList().get(0)).getValue()).getValue();
+ IValueParser durationParser =
ADurationParserFactory.INSTANCE.createValueParser();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream outputStream = new DataOutputStream(bos);
+ durationParser.parse(duration.toCharArray(), 0,
duration.toCharArray().length, outputStream);
+ }
+
+ private Pair<JobSpecification, PrecompiledType> createProcedureJob(String
body,
+ IStatementExecutor statementExecutor,
MetadataProvider metadataProvider, IHyracksClientConnection hcc,
IHyracksDataset hdc, Stats stats)
throws Exception {
StringBuilder builder = new StringBuilder();
@@ -114,10 +156,32 @@
AQLParserFactory aqlFact = new AQLParserFactory();
List<Statement> fStatements = aqlFact.createParser(new
StringReader(builder.toString())).parse();
if (fStatements.size() > 1) {
- throw new Exception("Procedure can only execute a single
statement");
+ throw new CompilationException("Procedure can only execute a
single statement");
}
- return ((QueryTranslator)
statementExecutor).handleInsertUpsertStatement(metadataProvider,
fStatements.get(0),
- hcc, hdc, ResultDelivery.ASYNC, stats, true);
+ if (fStatements.get(0).getKind() == Statement.Kind.INSERT) {
+ return new Pair<>(((QueryTranslator)
statementExecutor).handleInsertUpsertStatement(metadataProvider,
+ fStatements.get(0), hcc, hdc, ResultDelivery.ASYNC, stats,
true), PrecompiledType.INSERT);
+ } else if (fStatements.get(0).getKind() == Statement.Kind.QUERY) {
+ return new Pair<>(((QueryTranslator)
statementExecutor).rewriteCompileQuery(hcc, metadataProvider,
+ (Query) fStatements.get(0), null), PrecompiledType.QUERY);
+ } else if (fStatements.get(0).getKind() == Statement.Kind.DELETE) {
+ AqlDeleteRewriteVisitor visitor = new AqlDeleteRewriteVisitor();
+ fStatements.get(0).accept(visitor, null);
+ return new Pair<>(((QueryTranslator)
statementExecutor).handleDeleteStatement(metadataProvider,
+ fStatements.get(0), hcc, true), PrecompiledType.DELETE);
+ }else{
+ throw new CompilationException("Procedure can only execute a
single delete, insert, or query");
+ }
+ }
+
+ private void setupDistributedJob(EntityId entityId, JobSpecification
jobSpec, IHyracksClientConnection hcc,
+ PrecompiledJobEventListener listener, MetadataProvider
metadataProvider,
+ IHyracksDataset hdc, SessionConfig sessionConfig, Stats stats)
throws Exception {
+ DistributedJobInfo distributedJobInfo = new
DistributedJobInfo(entityId, null, ActivityState.ACTIVE, jobSpec);
+
jobSpec.setProperty(ActiveJobNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME,
distributedJobInfo);
+ JobId jobId = hcc.distributeJob(jobSpec);
+ listener.storeDistributedInfo(jobId, null, new ResultReader(hdc),
metadataProvider.getResultSetId());
+ ActiveJobNotificationHandler.INSTANCE.monitorJob(jobId,
distributedJobInfo);
}
@Override
@@ -125,12 +189,14 @@
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery
resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException,
AlgebricksException {
+ initialize();
+
String dataverse =
((QueryTranslator) statementExecutor).getActiveDataverse(new
Identifier(signature.getNamespace()));
EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD,
dataverse, signature.getName());
- ChannelEventsListener listener =
- (ChannelEventsListener)
ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId);
+ PrecompiledJobEventListener listener =
+ (PrecompiledJobEventListener)
ActiveJobNotificationHandler.INSTANCE.getActiveEntityListener(entityId);
IActiveLifecycleEventSubscriber eventSubscriber = new
ActiveLifecycleEventSubscriber();
boolean subscriberRegistered = false;
Procedure procedure = null;
@@ -152,22 +218,25 @@
}
procedure = new Procedure(dataverse, signature.getName(),
signature.getArity(), getParamList(),
- Function.RETURNTYPE_VOID, getFunctionBody(),
Function.LANGUAGE_AQL);
+ Function.RETURNTYPE_VOID, getFunctionBody(),
Function.LANGUAGE_AQL, duration);
+
+ metadataProvider.setResultSetId(new ResultSetId(0));
+ metadataProvider.setResultSetId(new
ResultSetId(resultSetIdCounter++));
+
+ //Create Procedure Internal Job
+ Pair<JobSpecification, PrecompiledType> procedureJobSpec =
+ createProcedureJob(getFunctionBody(), statementExecutor,
metadataProvider, hcc, hdc, stats);
// Now we subscribe
if (listener == null) {
- listener = new ChannelEventsListener(entityId);
+ listener = new PrecompiledJobEventListener(entityId,
procedureJobSpec.second);
ActiveJobNotificationHandler.INSTANCE.registerListener(listener);
}
listener.registerEventSubscriber(eventSubscriber);
subscriberRegistered = true;
-
- //Create Procedure Internal Job
- JobSpecification channeljobSpec =
- createProcedureJob(getFunctionBody(), statementExecutor,
metadataProvider, hcc, hdc, stats);
-
- // setupDistributedJob(entityId, channeljobSpec, hcc);
+ setupDistributedJob(entityId, procedureJobSpec.first, hcc,
listener, metadataProvider,
+ hdc, ((QueryTranslator)
statementExecutor).getSessionConfig(), stats);
eventSubscriber.assertEvent(ActiveLifecycleEvent.ACTIVE_JOB_STARTED);
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
new file mode 100644
index 0000000..b2d84cc
--- /dev/null
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import java.util.EnumSet;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.result.ResultReader;
+import org.apache.asterix.app.result.ResultUtil;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.ChannelJobService;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
+import
org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
+import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
+import
org.apache.asterix.external.feed.management.ActiveLifecycleEventSubscriber;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobFlag;
+import org.apache.hyracks.api.job.JobId;
+
+public class ExecuteProcedureStatement implements IExtensionStatement {
+
+ private final String dataverseName;
+ private final String procedureName;
+ private final int arity;
+
+ public ExecuteProcedureStatement(String dataverseName, String
procedureName, int arity) {
+ this.dataverseName = dataverseName;
+ this.procedureName = procedureName;
+ this.arity = arity;
+ }
+
+ public String getDataverseName() {
+ return dataverseName;
+ }
+
+ public String getProcedureName() {
+ return procedureName;
+ }
+
+ public int getArity() {
+ return arity;
+ }
+
+ @Override
+ public byte getKind() {
+ return Kind.EXTENSION;
+ }
+
+ @Override
+ public byte getCategory() {
+ return Category.UPDATE;
+ }
+
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws
CompilationException {
+ return null;
+ }
+
+ @Override
+ public void handle(IStatementExecutor statementExecutor, MetadataProvider
metadataProvider,
+ IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery
resultDelivery, Stats stats,
+ int resultSetIdCounter) throws HyracksDataException,
AlgebricksException {
+
+
+ String dataverse = ((QueryTranslator)
statementExecutor).getActiveDataverse(new Identifier(dataverseName));
+ boolean txnActive = false;
+ EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD,
dataverse, procedureName);
+ PrecompiledJobEventListener listener = (PrecompiledJobEventListener)
ActiveJobNotificationHandler.INSTANCE
+ .getActiveEntityListener(entityId);
+ IActiveLifecycleEventSubscriber eventSubscriber = new
ActiveLifecycleEventSubscriber();
+ boolean subscriberRegistered = false;
+ Procedure procedure = null;
+
+ MetadataTransactionContext mdTxnCtx = null;
+ try {
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ txnActive = true;
+ procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse,
procedureName,
+ Integer.toString(getArity()));
+ if (procedure == null) {
+ throw new AlgebricksException("There is no procedure with this
name " + procedureName + ".");
+ }
+
+ JobId hyracksJobId = listener.getHyracksJobId();
+ if (procedure.getDuration().equals("")) {
+ hcc.startJob(hyracksJobId);
+
+ if (listener.getType() == PrecompiledType.QUERY) {
+ hcc.waitForCompletion(hyracksJobId);
+ ResultReader resultReader = listener.resultReader;
+ resultReader.open(hyracksJobId, listener.resultSetId);
+ ResultUtil.printResults(resultReader, ((QueryTranslator)
statementExecutor).getSessionConfig(),
+ new Stats(), null);
+ }
+
+ } else {
+ ScheduledExecutorService ses =
ChannelJobService.startJob(null, EnumSet.noneOf(JobFlag.class),
+ hyracksJobId, hcc,
ChannelJobService.findPeriod(procedure.getDuration()));
+ listener.storeDistributedInfo(hyracksJobId, ses,
listener.resultReader, listener.resultSetId);
+ }
+
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ txnActive = false;
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (txnActive) {
+ QueryTranslator.abort(e, e, mdTxnCtx);
+ }
+ throw new HyracksDataException(e);
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
new file mode 100644
index 0000000..2e9f26d
--- /dev/null
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ProcedureDropStatement.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.lang.statement;
+
+import org.apache.asterix.active.ActiveJobNotificationHandler;
+import org.apache.asterix.active.ActiveLifecycleListener;
+import org.apache.asterix.active.EntityId;
+import org.apache.asterix.algebra.extension.IExtensionStatement;
+import org.apache.asterix.app.translator.QueryTranslator;
+import org.apache.asterix.bad.BADConstants;
+import org.apache.asterix.bad.lang.BADLangExtension;
+import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
+import org.apache.asterix.bad.metadata.Procedure;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.functions.FunctionSignature;
+import org.apache.asterix.lang.common.struct.Identifier;
+import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
+import org.apache.asterix.translator.IStatementExecutor.Stats;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+
+public class ProcedureDropStatement implements IExtensionStatement {
+
+ private final FunctionSignature signature;
+ private boolean ifExists;
+
+ public ProcedureDropStatement(FunctionSignature signature, boolean
ifExists) {
+ this.signature = signature;
+ this.ifExists = ifExists;
+ }
+
+ public FunctionSignature getFunctionSignature() {
+ return signature;
+ }
+
+ public boolean getIfExists() {
+ return ifExists;
+ }
+
+ @Override
+ public byte getKind() {
+ return Kind.EXTENSION;
+ }
+
+ @Override
+ public byte getCategory() {
+ return Category.DDL;
+ }
+
+ @Override
+ public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws
CompilationException {
+ return null;
+ }
+
+ @Override
+ public void handle(IStatementExecutor statementExecutor, MetadataProvider
metadataProvider,
+ IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery
resultDelivery, Stats stats,
+ int resultSetIdCounter) throws HyracksDataException,
AlgebricksException {
+ FunctionSignature signature = getFunctionSignature();
+ String dataverse =
+ ((QueryTranslator) statementExecutor).getActiveDataverse(new
Identifier(signature.getNamespace()));
+ signature.setNamespace(dataverse);
+
+ boolean txnActive = false;
+ EntityId entityId = new EntityId(BADConstants.PROCEDURE_KEYWORD,
dataverse, signature.getName());
+ PrecompiledJobEventListener listener = (PrecompiledJobEventListener)
ActiveJobNotificationHandler.INSTANCE
+ .getActiveEntityListener(entityId);
+ Procedure procedure = null;
+
+ MetadataTransactionContext mdTxnCtx = null;
+ try {
+ mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
+ txnActive = true;
+ procedure = BADLangExtension.getProcedure(mdTxnCtx, dataverse,
signature.getName(),
+ Integer.toString(signature.getArity()));
+ txnActive = false;
+ if (procedure == null) {
+ if (ifExists) {
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ return;
+ } else {
+ throw new AlgebricksException("There is no procedure with
this name " + signature.getName() + ".");
+ }
+ }
+
+ if (listener.getExecutorService() != null) {
+ listener.getExecutorService().shutdownNow();
+ }
+ JobId hyracksJobId = listener.getHyracksJobId();
+ if (hyracksJobId != null) {
+ hcc.destroyJob(hyracksJobId);
+ }
+ listener.deActivate();
+ ActiveLifecycleListener.INSTANCE.notifyJobFinish(hyracksJobId);
+
+ //Remove the Channel Metadata
+ MetadataManager.INSTANCE.deleteEntity(mdTxnCtx, procedure);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ e.printStackTrace();
+ if (txnActive) {
+ QueryTranslator.abort(e, e, mdTxnCtx);
+ }
+ throw new HyracksDataException(e);
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
index 7222b1a..9148274 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataExtension.java
@@ -79,7 +79,8 @@
@Override
public List<ExtensionMetadataDataset> getExtensionIndexes() {
try {
- return Arrays.asList(BADMetadataIndexes.CHANNEL_DATASET,
BADMetadataIndexes.BROKER_DATASET);
+ return Arrays.asList(BADMetadataIndexes.CHANNEL_DATASET,
BADMetadataIndexes.BROKER_DATASET,
+ BADMetadataIndexes.PROCEDURE_DATASET);
} catch (Throwable th) {
th.printStackTrace();
throw th;
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
index 6ee5735..0430118 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java
@@ -83,17 +83,18 @@
public static final int
PROCEDURE_ARECORD_PROCEDURE_RETURN_TYPE_FIELD_INDEX = 4;
public static final int PROCEDURE_ARECORD_PROCEDURE_DEFINITION_FIELD_INDEX
= 5;
public static final int PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX =
6;
+ public static final int PROCEDURE_ARECORD_PROCEDURE_DURATION_FIELD_INDEX =
7;
public static final ARecordType PROCEDURE_RECORDTYPE =
MetadataRecordTypes.createRecordType(
// RecordTypeName
BADConstants.RECORD_TYPENAME_PROCEDURE,
// FieldNames
new String[] { BADConstants.DataverseName,
BADConstants.ProcedureName, BADConstants.FIELD_NAME_ARITY,
BADConstants.FIELD_NAME_PARAMS,
BADConstants.FIELD_NAME_RETURN_TYPE,
- BADConstants.FIELD_NAME_DEFINITION,
BADConstants.FIELD_NAME_LANGUAGE },
+ BADConstants.FIELD_NAME_DEFINITION,
BADConstants.FIELD_NAME_LANGUAGE, BADConstants.Duration },
// FieldTypes
new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING,
BuiltinType.ASTRING,
new AOrderedListType(BuiltinType.ASTRING, null),
BuiltinType.ASTRING, BuiltinType.ASTRING,
- BuiltinType.ASTRING },
+ BuiltinType.ASTRING, BuiltinType.ASTRING },
//IsOpen?
true);
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseProceduresSearchKey.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseProceduresSearchKey.java
new file mode 100644
index 0000000..9699e21
--- /dev/null
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DataverseProceduresSearchKey.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.bad.metadata;
+
+import org.apache.asterix.metadata.MetadataNode;
+import org.apache.asterix.metadata.api.ExtensionMetadataDatasetId;
+import org.apache.asterix.metadata.api.IExtensionMetadataSearchKey;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class DataverseProceduresSearchKey implements
IExtensionMetadataSearchKey {
+ private static final long serialVersionUID = 1L;
+ private final String dataverse;
+
+ public DataverseProceduresSearchKey(String dataverse) {
+ this.dataverse = dataverse;
+ }
+
+ @Override
+ public ExtensionMetadataDatasetId getDatasetId() {
+ return BADMetadataIndexes.BAD_PROCEDURE_INDEX_ID;
+ }
+
+ @Override
+ public ITupleReference getSearchKey() {
+ return MetadataNode.createTuple(dataverse);
+ }
+}
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
similarity index 65%
rename from
asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
rename to
asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
index 631acf7..3e4cc39 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ChannelEventsListener.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/PrecompiledJobEventListener.java
@@ -22,41 +22,60 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.asterix.active.ActiveEvent;
import org.apache.asterix.active.ActiveJob;
import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveEntityEventsListener;
+import org.apache.asterix.app.result.ResultReader;
import org.apache.asterix.bad.BADConstants;
-import org.apache.asterix.bad.ChannelJobInfo;
-import org.apache.asterix.bad.runtime.RepetitiveChannelOperatorDescriptor;
+import org.apache.asterix.bad.DistributedJobInfo;
import org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber;
import
org.apache.asterix.external.feed.api.IActiveLifecycleEventSubscriber.ActiveLifecycleEvent;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.runtime.util.AppContextInfo;
import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
+import org.apache.hyracks.api.dataset.ResultSetId;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobInfo;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.log4j.Logger;
-public class ChannelEventsListener implements IActiveEntityEventsListener {
- private static final Logger LOGGER =
Logger.getLogger(ChannelEventsListener.class);
+public class PrecompiledJobEventListener implements
IActiveEntityEventsListener {
+ private static final Logger LOGGER =
Logger.getLogger(PrecompiledJobEventListener.class);
private final List<IActiveLifecycleEventSubscriber> subscribers;
private final Map<Long, ActiveJob> jobs;
- private final Map<EntityId, ChannelJobInfo> jobInfos;
+ private final Map<EntityId, DistributedJobInfo> jobInfos;
private EntityId entityId;
+ private JobId hyracksJobId;
+ private ScheduledExecutorService executorService = null;
+ private boolean active;
+ public ResultReader resultReader;
+ public ResultSetId resultSetId;
- public ChannelEventsListener(EntityId entityId) {
+ public enum PrecompiledType {
+ CHANNEL,
+ QUERY,
+ INSERT,
+ DELETE
+ }
+
+ private final PrecompiledType type;
+
+ public PrecompiledJobEventListener(EntityId entityId, PrecompiledType
type) {
this.entityId = entityId;
subscribers = new ArrayList<>();
jobs = new HashMap<>();
jobInfos = new HashMap<>();
+ active = false;
+ this.type = type;
+ }
+
+ public PrecompiledType getType() {
+ return type;
}
@Override
@@ -81,9 +100,26 @@
}
}
+ public void storeDistributedInfo(JobId jobId, ScheduledExecutorService
ses, ResultReader resultReader,
+ ResultSetId resultSetId) {
+ this.hyracksJobId = jobId;
+ this.executorService = ses;
+ this.resultReader = resultReader;
+ this.resultSetId = resultSetId;
+ }
+
+ public ScheduledExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public JobId getHyracksJobId() {
+ return hyracksJobId;
+ }
+
private synchronized void handleJobStartEvent(ActiveEvent message) throws
Exception {
ActiveJob jobInfo = jobs.get(message.getJobId().getId());
- handleJobStartMessage((ChannelJobInfo) jobInfo);
+ handleJobStartMessage((DistributedJobInfo) jobInfo);
+ active = true;
}
private synchronized void handleJobFinishEvent(ActiveEvent message) throws
Exception {
@@ -91,23 +127,25 @@
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Channel Job finished for " + jobInfo);
}
- handleJobFinishMessage((ChannelJobInfo) jobInfo);
+ handleJobFinishMessage((DistributedJobInfo) jobInfo);
}
- private synchronized void handleJobFinishMessage(ChannelJobInfo cInfo)
throws Exception {
- EntityId channelJobId = cInfo.getEntityId();
+ private synchronized void handleJobFinishMessage(DistributedJobInfo cInfo)
throws Exception {
+ if (!isEntityActive()) {
+ EntityId channelJobId = cInfo.getEntityId();
- IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc();
- JobInfo info = hcc.getJobInfo(cInfo.getJobId());
- JobStatus status = info.getStatus();
- boolean failure = status != null && status.equals(JobStatus.FAILURE);
+ IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc();
+ JobInfo info = hcc.getJobInfo(cInfo.getJobId());
+ JobStatus status = info.getStatus();
+ boolean failure = status != null &&
status.equals(JobStatus.FAILURE);
- jobInfos.remove(channelJobId);
- jobs.remove(cInfo.getJobId().getId());
- // notify event listeners
- ActiveLifecycleEvent event = failure ?
ActiveLifecycleEvent.ACTIVE_JOB_FAILED
- : ActiveLifecycleEvent.ACTIVE_JOB_ENDED;
- notifyEventSubscribers(event);
+ jobInfos.remove(channelJobId);
+ jobs.remove(cInfo.getJobId().getId());
+ // notify event listeners
+ ActiveLifecycleEvent event =
+ failure ? ActiveLifecycleEvent.ACTIVE_JOB_FAILED :
ActiveLifecycleEvent.ACTIVE_JOB_ENDED;
+ notifyEventSubscribers(event);
+ }
}
private void notifyEventSubscribers(ActiveLifecycleEvent event) {
@@ -118,27 +156,7 @@
}
}
- private static synchronized void handleJobStartMessage(ChannelJobInfo
cInfo) throws Exception {
- List<OperatorDescriptorId> channelOperatorIds = new ArrayList<>();
- Map<OperatorDescriptorId, IOperatorDescriptor> operators =
cInfo.getSpec().getOperatorMap();
- for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry :
operators.entrySet()) {
- IOperatorDescriptor opDesc = entry.getValue();
- if (opDesc instanceof RepetitiveChannelOperatorDescriptor) {
- channelOperatorIds.add(opDesc.getOperatorId());
- }
- }
-
- IHyracksClientConnection hcc = AppContextInfo.INSTANCE.getHcc();
- JobInfo info = hcc.getJobInfo(cInfo.getJobId());
- List<String> locations = new ArrayList<>();
- for (OperatorDescriptorId channelOperatorId : channelOperatorIds) {
- Map<Integer, String> operatorLocations =
info.getOperatorLocations().get(channelOperatorId);
- int nOperatorInstances = operatorLocations.size();
- for (int i = 0; i < nOperatorInstances; i++) {
- locations.add(operatorLocations.get(i));
- }
- }
- cInfo.setLocations(locations);
+ private static synchronized void handleJobStartMessage(DistributedJobInfo
cInfo) throws Exception {
cInfo.setState(ActivityState.ACTIVE);
}
@@ -161,7 +179,7 @@
throw new IllegalStateException("Channel job already registered");
}
- ChannelJobInfo cInfo = new ChannelJobInfo(entityId, jobId,
ActivityState.CREATED, jobSpec);
+ DistributedJobInfo cInfo = new DistributedJobInfo(entityId, jobId,
ActivityState.CREATED, jobSpec);
jobs.put(jobId.getId(), cInfo);
jobInfos.put(entityId, cInfo);
@@ -177,7 +195,7 @@
return jobInfos.get(activeJobId).getSpec();
}
- public ChannelJobInfo getJobInfo(EntityId activeJobId) {
+ public DistributedJobInfo getJobInfo(EntityId activeJobId) {
return jobInfos.get(activeJobId);
}
@@ -190,15 +208,7 @@
}
public synchronized boolean isChannelActive(EntityId activeJobId,
IActiveLifecycleEventSubscriber eventSubscriber) {
- boolean active = false;
- ChannelJobInfo cInfo = jobInfos.get(activeJobId);
- if (cInfo != null) {
- active = cInfo.getState().equals(ActivityState.ACTIVE);
- }
- if (active) {
- registerEventSubscriber(eventSubscriber);
- }
- return active;
+ return isEntityActive();
}
public FeedConnectionId[] getConnections() {
@@ -207,7 +217,11 @@
@Override
public boolean isEntityActive() {
- return !jobs.isEmpty();
+ return active;
+ }
+
+ public void deActivate() {
+ active = false;
}
@Override
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
index b64bf1b..a77a14d 100644
--- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
+++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java
@@ -39,15 +39,17 @@
private final String body;
private final String returnType;
private final String language;
+ private final String duration;
public Procedure(String dataverseName, String functionName, int arity,
List<String> params, String returnType,
- String functionBody, String language) {
+ String functionBody, String language, String duration) {
this.procedureId = new EntityId(BADConstants.PROCEDURE_KEYWORD,
dataverseName, functionName);
this.params = params;
this.body = functionBody;
this.returnType = returnType == null ? RETURNTYPE_VOID : returnType;
this.language = language;
this.arity = arity;
+ this.duration = duration;
}
public EntityId getEntityId() {
@@ -74,6 +76,10 @@
return arity;
}
+ public String getDuration() {
+ return duration;
+ }
+
@Override
public boolean equals(Object other) {
if (this == other) {
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
index f2eab9b..e151aea 100644
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
+++
b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java
@@ -104,8 +104,12 @@
.getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX))
.getStringValue();
+ String duration = ((AString) procedureRecord
+
.getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_DURATION_FIELD_INDEX))
+ .getStringValue();
+
return new Procedure(dataverseName, procedureName,
Integer.parseInt(arity), params, returnType, definition,
- language);
+ language, duration);
}
@@ -178,6 +182,12 @@
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX,
fieldValue);
+ // write field 7
+ fieldValue.reset();
+ aString.setValue(procedure.getDuration());
+ stringSerde.serialize(aString, fieldValue.getDataOutput());
+
recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_DURATION_FIELD_INDEX,
fieldValue);
+
// write record
recordBuilder.write(tupleBuilder.getDataOutput(), true);
tupleBuilder.addFieldEndOffset();
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
deleted file mode 100644
index 8093977..0000000
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorDescriptor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.asterix.bad.runtime;
-
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.bad.BADConstants;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobSpecification;
-import
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-
-/**
- * A repetitive channel operator, which uses a Java timer to run a given query
periodically
- */
-public class RepetitiveChannelOperatorDescriptor extends
AbstractSingleActivityOperatorDescriptor {
-
- private static final long serialVersionUID = 1L;
-
- private static final Logger LOGGER =
Logger.getLogger(RepetitiveChannelOperatorDescriptor.class.getName());
-
- /** The unique identifier of the job. **/
- protected final EntityId entityId;
-
- protected final JobSpecification jobSpec;
-
- private final String duration;
-
- private String strIP;
- private int port;
-
- public RepetitiveChannelOperatorDescriptor(JobSpecification spec, String
dataverseName, String channelName,
- String duration, JobSpecification channeljobSpec, String strIP,
int port) {
- super(spec, 0, 0);
- this.entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME,
dataverseName, channelName);
- this.jobSpec = channeljobSpec;
- this.duration = duration;
- this.strIP = strIP;
- this.port = port;
- }
-
- @Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int
nPartitions) throws HyracksDataException {
- ActiveRuntimeId runtimeId = new ActiveRuntimeId(entityId,
- RepetitiveChannelOperatorNodePushable.class.getSimpleName(),
partition);
- return new RepetitiveChannelOperatorNodePushable(ctx, runtimeId,
jobSpec, duration, strIP, port);
- }
-
- public String getDuration() {
- return duration;
- }
-
- public EntityId getEntityId() {
- return entityId;
- }
-
- public JobSpecification getJobSpec() {
- return jobSpec;
- }
-
-}
diff --git
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
b/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
deleted file mode 100644
index 1bbe331..0000000
---
a/asterix-bad/src/main/java/org/apache/asterix/bad/runtime/RepetitiveChannelOperatorNodePushable.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.bad.runtime;
-
-import java.util.EnumSet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.logging.Logger;
-
-import org.apache.asterix.active.ActiveRuntimeId;
-import org.apache.asterix.active.ActiveSourceOperatorNodePushable;
-import org.apache.asterix.bad.ChannelJobService;
-import org.apache.hyracks.api.client.HyracksConnection;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobFlag;
-import org.apache.hyracks.api.job.JobSpecification;
-
-public class RepetitiveChannelOperatorNodePushable extends
ActiveSourceOperatorNodePushable {
-
- private static final Logger LOGGER =
Logger.getLogger(RepetitiveChannelOperatorNodePushable.class.getName());
-
- private ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(1);
- private final JobSpecification jobSpec;
- private long duration;
- private final HyracksConnection hcc;
-
- public RepetitiveChannelOperatorNodePushable(IHyracksTaskContext ctx,
ActiveRuntimeId runtimeId,
- JobSpecification channeljobSpec, String duration, String strIP,
int port) throws HyracksDataException {
- super(ctx, runtimeId);
- this.jobSpec = channeljobSpec;
- this.duration = ChannelJobService.findPeriod(duration);
- try {
- hcc = new HyracksConnection(strIP, port);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- }
-
-
- @Override
- protected void start() throws HyracksDataException, InterruptedException {
- try {
- scheduledExecutorService =
- ChannelJobService.startJob(jobSpec,
EnumSet.noneOf(JobFlag.class), null, hcc, duration);
- } catch (Exception e) {
- throw new HyracksDataException(e);
- }
- while (!scheduledExecutorService.isTerminated()) {
-
- }
-
- }
-
- @Override
- protected void abort() throws HyracksDataException, InterruptedException {
- scheduledExecutorService.shutdown();
- }
-
-}
diff --git a/asterix-bad/src/main/resources/lang-extension/lang.txt
b/asterix-bad/src/main/resources/lang-extension/lang.txt
index 94b4c78..b433f5f 100644
--- a/asterix-bad/src/main/resources/lang-extension/lang.txt
+++ b/asterix-bad/src/main/resources/lang-extension/lang.txt
@@ -5,6 +5,8 @@
import org.apache.asterix.bad.lang.statement.CreateBrokerStatement;
import org.apache.asterix.bad.lang.statement.CreateChannelStatement;
import org.apache.asterix.bad.lang.statement.CreateProcedureStatement;
+import org.apache.asterix.bad.lang.statement.ExecuteProcedureStatement;
+import org.apache.asterix.bad.lang.statement.ProcedureDropStatement;
@merge
@@ -18,7 +20,7 @@
(
// merge area 2
before:
- after: | stmt = ChannelSubscriptionStatement())
+ after: | stmt = ChannelSubscriptionStatement() | stmt =
ProcedureExecution())
{
// merge area 3
}
@@ -60,6 +62,10 @@
{
stmt = new BrokerDropStatement(pairId.first, pairId.second, ifExists);
}
+ | "procedure" funcSig = FunctionSignature() ifExists = IfExists()
+ {
+ stmt = new ProcedureDropStatement(funcSig, ifExists);
+ }
)
{
// merge area 3
@@ -74,13 +80,13 @@
CreateChannelStatement ccs = null;
String fqFunctionName = null;
Expression period = null;
- boolean distributed = false;
+ boolean distributed = true;
}
{
(
"repetitive" "channel" nameComponents = QualifiedName()
<USING> appliedFunction = FunctionSignature()
- "period" period = FunctionCallExpr() ("distributed" { distributed = true;
})?
+ "period" period = FunctionCallExpr() ("nondistributed" { distributed =
false; })?
{
ccs = new CreateChannelStatement(nameComponents.first,
nameComponents.second, appliedFunction,
period, distributed);
@@ -91,37 +97,67 @@
}
}
-
@new
CreateProcedureStatement ProcedureSpecification() throws ParseException:
{
- Pair<Identifier,Identifier> nameComponents = null;
+ FunctionName fctName = null;
FunctionSignature signature;
List<VarIdentifier> paramList = new ArrayList<VarIdentifier>();
String functionBody;
Token beginPos;
Token endPos;
- Expression functionBodyExpr;
+ Statement functionBodyExpr;
+ Expression period = null;
}
{
- "procedure" nameComponents = QualifiedName()
+ "procedure" fctName = FunctionName()
paramList = ParameterList()
<LEFTBRACE>
{
beginPos = token;
}
- functionBodyExpr = Expression() <RIGHTBRACE>
+ functionBodyExpr = SingleStatement() <RIGHTBRACE>
{
endPos = token;
functionBody = extractFragment(beginPos.beginLine, beginPos.beginColumn,
endPos.beginLine, endPos.beginColumn);
- signature = new FunctionSignature(nameComponents.first.toString(),
nameComponents.second.toString(), paramList.size());
+ signature = new FunctionSignature(fctName.dataverse, fctName.function,
paramList.size());
removeCurrentScope();
- return new CreateProcedureStatement(signature, paramList, functionBody);
}
+ ("period" period = FunctionCallExpr())?
+ {
+ return new CreateProcedureStatement(signature, paramList, functionBody,
period);
+ }
}
-
-
+@new
+ExecuteProcedureStatement ProcedureExecution() throws ParseException:
+{
+ ExecuteProcedureStatement callExpr;
+ List<Expression> argList = new ArrayList<Expression>();
+ Expression tmp;
+ int arity = 0;
+ FunctionName funcName = null;
+ String hint = null;
+}
+{
+ "execute"
+ funcName = FunctionName()
+ <LEFTPAREN> (tmp = Expression()
+ {
+ argList.add(tmp);
+ arity ++;
+ }
+ (<COMMA> tmp = Expression()
+ {
+ argList.add(tmp);
+ arity++;
+ }
+ )*)? <RIGHTPAREN>
+ {
+ String fqFunctionName = funcName.function;
+ return new ExecuteProcedureStatement(funcName.dataverse, fqFunctionName,
arity);
+ }
+}
@new
CreateBrokerStatement BrokerSpecification() throws ParseException:
diff --git
a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.1.ddl.aql
b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.1.ddl.aql
new file mode 100644
index 0000000..be1fc86
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.1.ddl.aql
@@ -0,0 +1,27 @@
+/*
+* Description : Simple Delete Procedure
+* Expected Res : Success
+* Date : Jan 2017
+* Author : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type myLocation as {
+ timeStamp: datetime,
+ roomNumber: int
+}
+
+
+create dataset UserLocations(myLocation)
+primary key timeStamp;
+
+insert into dataset UserLocations([
+ {"timeStamp":current-datetime(), "roomNumber":222}]
+);
+
+create procedure deleteAll() {
+delete $i from dataset UserLocations
+};
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.2.query.aql
b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.2.query.aql
new file mode 100644
index 0000000..0a0c582
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.2.query.aql
@@ -0,0 +1,10 @@
+/*
+* Description : Simple Delete Procedure
+* Expected Res : Success
+* Date : Jan 2017
+* Author : Steven Jacobs
+*/
+
+use dataverse channels;
+count(for $i in dataset UserLocations
+return $i);
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.3.update.aql
b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.3.update.aql
new file mode 100644
index 0000000..e9adaa8
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.3.update.aql
@@ -0,0 +1,9 @@
+/*
+* Description : Simple Delete Procedure
+* Expected Res : Success
+* Date : Jan 2017
+* Author : Steven Jacobs
+*/
+
+use dataverse channels;
+execute deleteAll();
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.4.query.aql
b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.4.query.aql
new file mode 100644
index 0000000..0a0c582
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/queries/procedure/delete_procedure/delete_procedure.4.query.aql
@@ -0,0 +1,10 @@
+/*
+* Description : Simple Delete Procedure
+* Expected Res : Success
+* Date : Jan 2017
+* Author : Steven Jacobs
+*/
+
+use dataverse channels;
+count(for $i in dataset UserLocations
+return $i);
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.1.ddl.aql
b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.1.ddl.aql
new file mode 100644
index 0000000..1110d94
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.1.ddl.aql
@@ -0,0 +1,25 @@
+/*
+* Description : Simple Insert Procedure
+* Expected Res : Success
+* Date : Jan 2017
+* Author : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type myLocation as {
+ timeStamp: datetime,
+ roomNumber: int
+}
+
+
+create dataset UserLocations(myLocation)
+primary key timeStamp;
+
+create procedure findMe() {
+ insert into dataset UserLocations([
+ {"timeStamp":current-datetime(), "roomNumber":222}]
+ )
+};
diff --git
a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.2.update.aql
b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.2.update.aql
new file mode 100644
index 0000000..ddc2790
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.2.update.aql
@@ -0,0 +1,9 @@
+/*
+* Description : Simple Insert Procedure
+* Expected Res : Success
+* Date : Jan 2017
+* Author : Steven Jacobs
+*/
+
+use dataverse channels;
+execute findMe();
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.3.update.aql
b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.3.update.aql
new file mode 100644
index 0000000..ddc2790
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.3.update.aql
@@ -0,0 +1,9 @@
+/*
+* Description : Simple Insert Procedure
+* Expected Res : Success
+* Date : Jan 2017
+* Author : Steven Jacobs
+*/
+
+use dataverse channels;
+execute findMe();
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.4.update.aql
b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.4.update.aql
new file mode 100644
index 0000000..ddc2790
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.4.update.aql
@@ -0,0 +1,9 @@
+/*
+* Description : Simple Insert Procedure
+* Expected Res : Success
+* Date : Jan 2017
+* Author : Steven Jacobs
+*/
+
+use dataverse channels;
+execute findMe();
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.5.query.aql
b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.5.query.aql
new file mode 100644
index 0000000..8a1872c
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/queries/procedure/insert_procedure/insert_procedure.5.query.aql
@@ -0,0 +1,10 @@
+/*
+* Description : Simple Insert Procedure
+* Expected Res : 3
+* Date : Jan 2017
+* Author : Steven Jacobs
+*/
+
+use dataverse channels;
+count(for $i in dataset UserLocations
+return $i);
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.1.ddl.aql
b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.1.ddl.aql
new file mode 100644
index 0000000..eaebfbd
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.1.ddl.aql
@@ -0,0 +1,25 @@
+/*
+* Description : Simple Query Procedure
+* Expected Res : Success
+* Date : Jan 2017
+* Author : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type myLocation as {
+ timeStamp: datetime,
+ roomNumber: int
+}
+
+
+create dataset UserLocations(myLocation)
+primary key timeStamp;
+
+create procedure findMe() {
+for $i in dataset UserLocations
+order by $i.timeStamp
+return $i.roomNumber
+};
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.2.update.aql
b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.2.update.aql
new file mode 100644
index 0000000..2f0e968
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.2.update.aql
@@ -0,0 +1,12 @@
+/*
+* Description : Simple Query Procedure
+* Expected Res : Success
+* Date : Jan 2017
+* Author : Steven Jacobs
+*/
+
+use dataverse channels;
+
+insert into dataset UserLocations([
+ {"timeStamp":current-datetime(), "roomNumber":222}]
+)
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.3.update.aql
b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.3.update.aql
new file mode 100644
index 0000000..824d026
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.3.update.aql
@@ -0,0 +1,9 @@
+/*
+* Description : Simple Query Procedure
+* Expected Res : 222
+* Date : Jan 2017
+* Author : Steven Jacobs
+*/
+
+use dataverse channels;
+execute findMe();
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.4.update.aql
b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.4.update.aql
new file mode 100644
index 0000000..199d0fc
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.4.update.aql
@@ -0,0 +1,11 @@
+/*
+* Description : Simple Query Procedure
+* Expected Res : Success
+* Date : Jan 2017
+* Author : Steven Jacobs
+*/
+
+use dataverse channels;
+insert into dataset UserLocations([
+ {"timeStamp":current-datetime(), "roomNumber":225}]
+)
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.5.update.aql
b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.5.update.aql
new file mode 100644
index 0000000..25e0ba5
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/queries/procedure/query_procedure/query_procedure.5.update.aql
@@ -0,0 +1,9 @@
+/*
+* Description : Simple Query Procedure
+* Expected Res : 222,225
+* Date : Jan 2017
+* Author : Steven Jacobs
+*/
+
+use dataverse channels;
+execute findMe();
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.1.ddl.aql
b/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.1.ddl.aql
new file mode 100644
index 0000000..f4d45c9
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.1.ddl.aql
@@ -0,0 +1,25 @@
+/*
+* Description : Simple Insert Procedure
+* Expected Res : Success
+* Date : Jan 2017
+* Author : Steven Jacobs
+*/
+
+drop dataverse channels if exists;
+create dataverse channels;
+use dataverse channels;
+
+create type myLocation as {
+ timeStamp: datetime,
+ roomNumber: int
+}
+
+
+create dataset UserLocations(myLocation)
+primary key timeStamp;
+
+create procedure findMe() {
+ insert into dataset UserLocations([
+ {"timeStamp":current-datetime(), "roomNumber":222}]
+ )
+} period duration("PT5S");
diff --git
a/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.2.update.aql
b/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.2.update.aql
new file mode 100644
index 0000000..ddc2790
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.2.update.aql
@@ -0,0 +1,9 @@
+/*
+* Description : Simple Insert Procedure
+* Expected Res : Success
+* Date : Jan 2017
+* Author : Steven Jacobs
+*/
+
+use dataverse channels;
+execute findMe();
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.3.sleep.aql
b/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.3.sleep.aql
new file mode 100644
index 0000000..8adb253
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.3.sleep.aql
@@ -0,0 +1,8 @@
+/*
+* Description : Simple Insert Procedure
+* Expected Res : Success
+* Date : Jan 2017
+* Author : Steven Jacobs
+*/
+
+11000
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.4.ddl.aql
b/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.4.ddl.aql
new file mode 100644
index 0000000..3b6e6c4
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.4.ddl.aql
@@ -0,0 +1,9 @@
+/*
+* Description : Simple Insert Procedure
+* Expected Res : Success
+* Date : Jan 2017
+* Author : Steven Jacobs
+*/
+
+use dataverse channels;
+drop procedure findMe@0;
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.5.query.aql
b/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.5.query.aql
new file mode 100644
index 0000000..8a1872c
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/queries/procedure/repetitive_insert_procedure/repetitive_insert_procedure.5.query.aql
@@ -0,0 +1,10 @@
+/*
+* Description : Simple Insert Procedure
+* Expected Res : 3
+* Date : Jan 2017
+* Author : Steven Jacobs
+*/
+
+use dataverse channels;
+count(for $i in dataset UserLocations
+return $i);
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure/delete_procedure.2.adm
b/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure/delete_procedure.2.adm
new file mode 100644
index 0000000..56a6051
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure/delete_procedure.2.adm
@@ -0,0 +1 @@
+1
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure/delete_procedure.4.adm
b/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure/delete_procedure.4.adm
new file mode 100644
index 0000000..c227083
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/results/procedure/delete_procedure/delete_procedure.4.adm
@@ -0,0 +1 @@
+0
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/results/procedure/insert_procedure/insert_procedure.5.adm
b/asterix-bad/src/test/resources/runtimets/results/procedure/insert_procedure/insert_procedure.5.adm
new file mode 100644
index 0000000..e440e5c
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/results/procedure/insert_procedure/insert_procedure.5.adm
@@ -0,0 +1 @@
+3
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure/query_procedure.3.adm
b/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure/query_procedure.3.adm
new file mode 100644
index 0000000..6dd90d2
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure/query_procedure.3.adm
@@ -0,0 +1 @@
+222
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure/query_procedure.5.adm
b/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure/query_procedure.5.adm
new file mode 100644
index 0000000..b1d4fa9
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/results/procedure/query_procedure/query_procedure.5.adm
@@ -0,0 +1,2 @@
+222
+225
\ No newline at end of file
diff --git
a/asterix-bad/src/test/resources/runtimets/results/procedure/repetitive_insert_procedure/repetitive_insert_procedure.5.adm
b/asterix-bad/src/test/resources/runtimets/results/procedure/repetitive_insert_procedure/repetitive_insert_procedure.5.adm
new file mode 100644
index 0000000..d8263ee
--- /dev/null
+++
b/asterix-bad/src/test/resources/runtimets/results/procedure/repetitive_insert_procedure/repetitive_insert_procedure.5.adm
@@ -0,0 +1 @@
+2
\ No newline at end of file
diff --git a/asterix-bad/src/test/resources/runtimets/testsuite.xml
b/asterix-bad/src/test/resources/runtimets/testsuite.xml
index 997dc77..d9ade1f 100644
--- a/asterix-bad/src/test/resources/runtimets/testsuite.xml
+++ b/asterix-bad/src/test/resources/runtimets/testsuite.xml
@@ -8,6 +8,26 @@
<output-dir compare="Text">room_occupants</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="procedure">
+ <compilation-unit name="insert_procedure">
+ <output-dir compare="Text">insert_procedure</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="procedure">
+ <compilation-unit name="delete_procedure">
+ <output-dir compare="Text">delete_procedure</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="procedure">
+ <compilation-unit name="query_procedure">
+ <output-dir compare="Text">query_procedure</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="procedure">
+ <compilation-unit name="repetitive_insert_procedure">
+ <output-dir compare="Text">repetitive_insert_procedure</output-dir>
+ </compilation-unit>
+ </test-case>
<test-case FilePath="channel">
<compilation-unit name="create_channel_check_datasets">
<output-dir
compare="Text">create_channel_check_datasets</output-dir>
--
To view, visit https://asterix-gerrit.ics.uci.edu/1473
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I03550a74e2c90179e72345103b3d2c4f98148631
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb-bad
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <[email protected]>