Steven Jacobs has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/2641
Change subject: Redeploy channels and procedures during recovery ...................................................................... Redeploy channels and procedures during recovery Use the GlobalRecoveryManager extension to redeploy channels/procedures Restart execution of channels during recovery Some code cleanup Change-Id: I6897ccf9cddb9ec8d10256e252ee893afe6db145 --- M asterix-bad/pom.xml M asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java M asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java M asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java M asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java M asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java A asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java A asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADRecoveryExtension.java M asterix-bad/src/main/resources/cc.conf M asterix-bad/src/test/java/org/apache/asterix/bad/test/BADListenerTest.java M asterix-bad/src/test/resources/runtimets/results/procedure/create_procedure_check_metadata/create_procedure_check_metadata.1.adm 16 files changed, 328 insertions(+), 138 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb-bad refs/changes/41/2641/1 diff --git a/asterix-bad/pom.xml b/asterix-bad/pom.xml index 6f5cd32..51aba74 100644 --- a/asterix-bad/pom.xml +++ b/asterix-bad/pom.xml @@ -271,6 +271,11 @@ </dependency> <dependency> <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-control-common</artifactId> + <version>${hyracks.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> <artifactId>algebricks-runtime</artifactId> </dependency> <dependency> diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java index d422663..0467f6e 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADConstants.java @@ -47,7 +47,7 @@ String FIELD_NAME_ARITY = "Arity"; String FIELD_NAME_DEPENDENCIES = "Dependencies"; String FIELD_NAME_PARAMS = "Params"; - String FIELD_NAME_RETURN_TYPE = "ReturnType"; + String FIELD_NAME_TYPE = "Type"; String FIELD_NAME_DEFINITION = "Definition"; String FIELD_NAME_LANGUAGE = "Language"; String FIELD_NAME_BODY = "Body"; diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java index e326ce6..8090fb8 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java @@ -21,6 +21,7 @@ import java.io.StringReader; import java.time.Instant; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; @@ -51,6 +52,7 @@ import org.apache.asterix.translator.IStatementExecutor; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.dataset.IHyracksDataset; +import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; @@ -66,6 +68,19 @@ private static final int POOL_SIZE = 1; private static final long millisecondTimeout = BADConstants.EXECUTOR_TIMEOUT * 1000; + + public static void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, + IHyracksClientConnection hcc, DeployedJobSpecEventListener listener, ITxnIdFactory txnIdFactory, + String duration) throws Exception { + if (channeljobSpec != null) { + channeljobSpec.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId); + DeployedJobSpecId destributedId = hcc.deployJobSpec(channeljobSpec); + ScheduledExecutorService ses = startRepetitiveDeployedJobSpec(destributedId, hcc, findPeriod(duration), + new HashMap<>(), entityId, txnIdFactory, listener); + listener.storeDistributedInfo(destributedId, ses); + } + + } //Starts running a deployed job specification periodically with an interval of "period" seconds public static ScheduledExecutorService startRepetitiveDeployedJobSpec(DeployedJobSpecId distributedId, @@ -93,7 +108,8 @@ Map<byte[], byte[]> jobParameters, long period, EntityId entityId, ITxnIdFactory txnIdFactory, DeployedJobSpecEventListener listener) throws Exception { long executionMilliseconds = - runDeployedJobSpec(distributedId, hcc, jobParameters, entityId, txnIdFactory, null, listener, null); + runDeployedJobSpec(distributedId, hcc, null, jobParameters, entityId, txnIdFactory, null, listener, + null); if (executionMilliseconds > period) { LOGGER.log(Level.SEVERE, "Periodic job for " + entityId.getExtensionName() + " " + entityId.getDataverse() + "." @@ -106,7 +122,7 @@ } public static long runDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc, - Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory, + IHyracksDataset hdc, Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory, ICcApplicationContext appCtx, DeployedJobSpecEventListener listener, QueryTranslator statementExecutor) throws Exception { listener.waitWhileAtState(ActivityState.SUSPENDED); @@ -122,7 +138,7 @@ long executionMilliseconds = Instant.now().toEpochMilli() - startTime; if (listener.getType() == DeployedJobSpecEventListener.PrecompiledType.QUERY) { - ResultReader resultReader = new ResultReader(listener.getResultDataset(), jobId, listener.getResultId()); + ResultReader resultReader = new ResultReader(hdc, jobId, new ResultSetId(0)); ResultUtil.printResults(appCtx, resultReader, statementExecutor.getSessionOutput(), new IStatementExecutor.Stats(), null); @@ -217,11 +233,10 @@ } } else { //Procedures - metadataProvider.setResultSetId(listener.getResultId()); - final IStatementExecutor.ResultDelivery resultDelivery = - requestParameters.getResultProperties().getDelivery(); - final IHyracksDataset hdc = requestParameters.getHyracksDataset(); - final IStatementExecutor.Stats stats = requestParameters.getStats(); + metadataProvider.setResultSetId(new ResultSetId(0)); + IStatementExecutor.ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery(); + IHyracksDataset hdc = requestParameters.getHyracksDataset(); + IStatementExecutor.Stats stats = requestParameters.getStats(); boolean resultsAsync = resultDelivery == IStatementExecutor.ResultDelivery.ASYNC || resultDelivery == IStatementExecutor.ResultDelivery.DEFERRED; metadataProvider.setResultAsyncMode(resultsAsync); @@ -239,13 +254,11 @@ public static JobSpecification compileQueryJob(IStatementExecutor statementExecutor, MetadataProvider metadataProvider, IHyracksClientConnection hcc, Query q) throws Exception { MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - boolean bActiveTxn = true; metadataProvider.setMetadataTxnContext(mdTxnCtx); - JobSpecification jobSpec = null; + JobSpecification jobSpec; try { jobSpec = statementExecutor.rewriteCompileQuery(hcc, metadataProvider, q, null); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - bActiveTxn = false; } catch (Exception e) { ((QueryTranslator) statementExecutor).abort(e, e, mdTxnCtx); throw e; diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java index 4ab7530..f6f7d25 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/BADStatementExecutor.java @@ -185,9 +185,6 @@ public void handleCreateIndexStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { - //TODO: Check whether a delete or insert procedure using the index. If so, we will need to - // disallow the procedure until after the newly distributed version is ready - MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); //Allow channels to use the new index diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java index 22767f2..ef906a9 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Level; import java.util.logging.Logger; @@ -46,7 +45,6 @@ import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.functions.FunctionSignature; -import org.apache.asterix.common.transactions.ITxnIdFactory; import org.apache.asterix.lang.common.base.Expression; import org.apache.asterix.lang.common.base.Statement; import org.apache.asterix.lang.common.expression.CallExpr; @@ -75,7 +73,6 @@ import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.dataset.IHyracksDataset; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.common.data.parsers.IValueParser; @@ -176,42 +173,44 @@ new Identifier(BADConstants.BAD_DATAVERSE_NAME), subscriptionsTypeName, null, null, null, new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true); - //Setup the results dataset - partitionFields = new ArrayList<>(); - fieldNames = new ArrayList<>(); - fieldNames.add(BADConstants.ResultId); - partitionFields.add(fieldNames); - idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null); - DatasetDecl createResultsDataset = new DatasetDecl(dataverseName, new Identifier(resultsTableName), - new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, - new HashMap<String, String>(), DatasetType.INTERNAL, idd, null, true); - - //Create an index on timestamp for results - CreateIndexStatement createTimeIndex = new CreateIndexStatement(); - createTimeIndex.setDatasetName(new Identifier(resultsTableName)); - createTimeIndex.setDataverseName(dataverseName); - createTimeIndex.setIndexName(new Identifier(resultsTableName + "TimeIndex")); - createTimeIndex.setIfNotExists(false); - createTimeIndex.setIndexType(IndexType.BTREE); - createTimeIndex.setEnforced(false); - createTimeIndex.setGramLength(0); - List<String> fNames = new ArrayList<>(); - fNames.add(BADConstants.ChannelExecutionTime); - Pair<List<String>, IndexedTypeExpression> fields = new Pair<>(fNames, null); - createTimeIndex.addFieldExprPair(fields); - createTimeIndex.addFieldIndexIndicator(0); - - - //Run both statements to create datasets ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset, hcc, null); - metadataProvider.getLocks().reset(); - ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc, - null); - metadataProvider.getLocks().reset(); - //Create a time index for the results - ((QueryTranslator) statementExecutor).handleCreateIndexStatement(metadataProvider, createTimeIndex, hcc, null); + if (!push) { + //Setup the results dataset + partitionFields = new ArrayList<>(); + fieldNames = new ArrayList<>(); + fieldNames.add(BADConstants.ResultId); + partitionFields.add(fieldNames); + idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null); + DatasetDecl createResultsDataset = new DatasetDecl(dataverseName, new Identifier(resultsTableName), + new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, new HashMap<>(), + DatasetType.INTERNAL, idd, null, true); + + //Create an index on timestamp for results + CreateIndexStatement createTimeIndex = new CreateIndexStatement(); + createTimeIndex.setDatasetName(new Identifier(resultsTableName)); + createTimeIndex.setDataverseName(dataverseName); + createTimeIndex.setIndexName(new Identifier(resultsTableName + "TimeIndex")); + createTimeIndex.setIfNotExists(false); + createTimeIndex.setIndexType(IndexType.BTREE); + createTimeIndex.setEnforced(false); + createTimeIndex.setGramLength(0); + List<String> fNames = new ArrayList<>(); + fNames.add(BADConstants.ChannelExecutionTime); + Pair<List<String>, IndexedTypeExpression> fields = new Pair<>(fNames, null); + createTimeIndex.addFieldExprPair(fields); + createTimeIndex.addFieldIndexIndicator(0); + metadataProvider.getLocks().reset(); + ((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, + hcc, null); + metadataProvider.getLocks().reset(); + + //Create a time index for the results + ((QueryTranslator) statementExecutor).handleCreateIndexStatement(metadataProvider, createTimeIndex, hcc, + null); + + } } @@ -257,18 +256,6 @@ hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null); } - private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc, - DeployedJobSpecEventListener listener, ITxnIdFactory txnIdFactory) throws Exception { - if (channeljobSpec != null) { - channeljobSpec.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId); - DeployedJobSpecId destributedId = hcc.deployJobSpec(channeljobSpec); - ScheduledExecutorService ses = BADJobService.startRepetitiveDeployedJobSpec(destributedId, hcc, - BADJobService.findPeriod(duration), new HashMap<>(), entityId, txnIdFactory, listener); - listener.storeDistributedInfo(destributedId, ses, null, null); - } - - } - @Override public void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor, IRequestParameters requestContext, MetadataProvider metadataProvider, int resultSetId) @@ -283,7 +270,7 @@ dataverseName = new Identifier(((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName)); dataverse = dataverseName.getValue(); subscriptionsTableName = channelName + BADConstants.subscriptionEnding; - resultsTableName = channelName + BADConstants.resultsEnding; + resultsTableName = !push ? channelName + BADConstants.resultsEnding : ""; EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue()); ICcApplicationContext appCtx = metadataProvider.getApplicationContext(); @@ -291,7 +278,7 @@ (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); DeployedJobSpecEventListener listener = (DeployedJobSpecEventListener) activeEventHandler.getListener(entityId); boolean alreadyActive = false; - Channel channel = null; + Channel channel; MetadataTransactionContext mdTxnCtx = null; try { @@ -313,7 +300,7 @@ if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsTableName) != null) { throw new AsterixException("The channel name:" + channelName + " is not available."); } - if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsTableName) != null) { + if (!push && MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsTableName) != null) { throw new AsterixException("The channel name:" + channelName + " is not available."); } MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(), @@ -330,12 +317,12 @@ // Now we subscribe if (listener == null) { listener = new DeployedJobSpecEventListener(appCtx, entityId, - push ? PrecompiledType.PUSH_CHANNEL : PrecompiledType.CHANNEL, null, - "BadListener"); + push ? PrecompiledType.PUSH_CHANNEL : PrecompiledType.CHANNEL); activeEventHandler.registerListener(listener); } - setupExecutorJob(entityId, channeljobSpec, hcc, listener, metadataProvider.getTxnIdFactory()); + BADJobService.setupExecutorJob(entityId, channeljobSpec, hcc, listener, metadataProvider.getTxnIdFactory(), + duration); channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function, duration, null, body); diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java index 03db7bc..13a993d 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java @@ -72,7 +72,6 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.dataset.IHyracksDataset; import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.DeployedJobSpecId; @@ -176,7 +175,7 @@ } private Pair<JobSpecification, PrecompiledType> createProcedureJob(IStatementExecutor statementExecutor, - MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats) + MetadataProvider metadataProvider, IHyracksClientConnection hcc, Stats stats) throws Exception { if (getProcedureBodyStatement().getKind() == Statement.Kind.INSERT) { if (!varList.isEmpty()) { @@ -188,7 +187,7 @@ insertStatement.getDatasetName().getValue())); return new Pair<>( ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, - getProcedureBodyStatement(), hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null), + getProcedureBodyStatement(), hcc, null, ResultDelivery.ASYNC, null, stats, true, null), PrecompiledType.INSERT); } else if (getProcedureBodyStatement().getKind() == Statement.Kind.QUERY) { SqlppRewriterFactory fact = new SqlppRewriterFactory(); @@ -218,11 +217,11 @@ } private void setupDeployedJobSpec(EntityId entityId, JobSpecification jobSpec, IHyracksClientConnection hcc, - DeployedJobSpecEventListener listener, ResultSetId resultSetId, IHyracksDataset hdc, Stats stats) + DeployedJobSpecEventListener listener, ResultSetId resultSetId, Stats stats) throws Exception { jobSpec.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId); DeployedJobSpecId deployedJobSpecId = hcc.deployJobSpec(jobSpec); - listener.storeDistributedInfo(deployedJobSpecId, null, hdc, resultSetId); + listener.storeDistributedInfo(deployedJobSpecId, null); } @Override @@ -255,29 +254,25 @@ if (alreadyActive) { throw new AsterixException("Procedure " + signature.getName() + " is already running"); } - metadataProvider.setResultSetId(new ResultSetId(resultSetId++)); - final ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery(); - final IHyracksDataset hdc = requestParameters.getHyracksDataset(); + metadataProvider.setResultSetId(new ResultSetId(0)); final Stats stats = requestParameters.getStats(); - boolean resultsAsync = resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED; - metadataProvider.setResultAsyncMode(resultsAsync); + metadataProvider.setResultAsyncMode(false); metadataProvider.setMaxResultReads(1); //Create Procedure Internal Job Pair<JobSpecification, PrecompiledType> procedureJobSpec = - createProcedureJob(statementExecutor, metadataProvider, hcc, hdc, stats); + createProcedureJob(statementExecutor, metadataProvider, hcc, stats); // Now we subscribe if (listener == null) { - listener = new DeployedJobSpecEventListener(appCtx, entityId, procedureJobSpec.second, null, - "BadListener"); + listener = new DeployedJobSpecEventListener(appCtx, entityId, procedureJobSpec.second); activeEventHandler.registerListener(listener); } setupDeployedJobSpec(entityId, procedureJobSpec.first, hcc, listener, metadataProvider.getResultSetId(), - hdc, stats); procedure = new Procedure(dataverse, signature.getName(), signature.getArity(), getParamList(), - Function.RETURNTYPE_VOID, getProcedureBody(), Function.LANGUAGE_AQL, duration, dependencies); + procedureJobSpec.second.toString(), getProcedureBody(), Function.LANGUAGE_AQL, duration, + dependencies); MetadataManager.INSTANCE.addEntity(mdTxnCtx, procedure); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java index 025b9e6..60625a2 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java @@ -118,7 +118,8 @@ Map<byte[], byte[]> contextRuntimeVarMap = createParameterMap(procedure); DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId(); if (procedure.getDuration().equals("")) { - BADJobService.runDeployedJobSpec(deployedJobSpecId, hcc, contextRuntimeVarMap, entityId, + BADJobService.runDeployedJobSpec(deployedJobSpecId, hcc, requestParameters.getHyracksDataset(), + contextRuntimeVarMap, entityId, metadataProvider.getTxnIdFactory(), appCtx, listener, (QueryTranslator) statementExecutor); @@ -126,8 +127,7 @@ ScheduledExecutorService ses = BADJobService.startRepetitiveDeployedJobSpec(deployedJobSpecId, hcc, BADJobService.findPeriod(procedure.getDuration()), contextRuntimeVarMap, entityId, metadataProvider.getTxnIdFactory(), listener); - listener.storeDistributedInfo(deployedJobSpecId, ses, listener.getResultDataset(), - listener.getResultId()); + listener.storeDistributedInfo(deployedJobSpecId, ses); } MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); txnActive = false; diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java index 1e5e627..a764a5a 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/BADMetadataRecordTypes.java @@ -28,19 +28,19 @@ public class BADMetadataRecordTypes { // -------------------------------------- Subscriptions --------------------------------------// - private static final String[] subTypeFieldNames = { BADConstants.DataverseName, BADConstants.BrokerName, - BADConstants.SubscriptionId }; + private static final String[] subTypeFieldNames = + { BADConstants.DataverseName, BADConstants.BrokerName, BADConstants.SubscriptionId }; private static final IAType[] subTypeFieldTypes = { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.AUUID }; - public static final ARecordType channelSubscriptionsType = new ARecordType(BADConstants.ChannelSubscriptionsType, - subTypeFieldNames, subTypeFieldTypes, true); + public static final ARecordType channelSubscriptionsType = + new ARecordType(BADConstants.ChannelSubscriptionsType, subTypeFieldNames, subTypeFieldTypes, true); // ---------------------------------------- Results --------------------------------------------// private static final String[] resultTypeFieldNames = { BADConstants.ResultId, BADConstants.ChannelExecutionTime, BADConstants.SubscriptionId, BADConstants.DeliveryTime }; - private static final IAType[] resultTypeFieldTypes = { BuiltinType.AUUID, BuiltinType.ADATETIME, BuiltinType.AUUID, - BuiltinType.ADATETIME }; - public static final ARecordType channelResultsType = new ARecordType(BADConstants.ChannelResultsType, - resultTypeFieldNames, resultTypeFieldTypes, true); + private static final IAType[] resultTypeFieldTypes = + { BuiltinType.AUUID, BuiltinType.ADATETIME, BuiltinType.AUUID, BuiltinType.ADATETIME }; + public static final ARecordType channelResultsType = + new ARecordType(BADConstants.ChannelResultsType, resultTypeFieldNames, resultTypeFieldTypes, true); //------------------------------------------ Channel ----------------------------------------// public static final int CHANNEL_ARECORD_DATAVERSE_NAME_FIELD_INDEX = 0; @@ -86,7 +86,7 @@ public static final int PROCEDURE_ARECORD_PROCEDURE_NAME_FIELD_INDEX = 1; public static final int PROCEDURE_ARECORD_PROCEDURE_ARITY_FIELD_INDEX = 2; public static final int PROCEDURE_ARECORD_PROCEDURE_PARAM_LIST_FIELD_INDEX = 3; - public static final int PROCEDURE_ARECORD_PROCEDURE_RETURN_TYPE_FIELD_INDEX = 4; + public static final int PROCEDURE_ARECORD_PROCEDURE_TYPE_FIELD_INDEX = 4; public static final int PROCEDURE_ARECORD_PROCEDURE_DEFINITION_FIELD_INDEX = 5; public static final int PROCEDURE_ARECORD_PROCEDURE_LANGUAGE_FIELD_INDEX = 6; public static final int PROCEDURE_ARECORD_PROCEDURE_DURATION_FIELD_INDEX = 7; @@ -96,9 +96,8 @@ BADConstants.RECORD_TYPENAME_PROCEDURE, // FieldNames new String[] { BADConstants.DataverseName, BADConstants.ProcedureName, BADConstants.FIELD_NAME_ARITY, - BADConstants.FIELD_NAME_PARAMS, BADConstants.FIELD_NAME_RETURN_TYPE, - BADConstants.FIELD_NAME_DEFINITION, BADConstants.FIELD_NAME_LANGUAGE, BADConstants.Duration, - BADConstants.FIELD_NAME_DEPENDENCIES }, + BADConstants.FIELD_NAME_PARAMS, BADConstants.FIELD_NAME_TYPE, BADConstants.FIELD_NAME_DEFINITION, + BADConstants.FIELD_NAME_LANGUAGE, BADConstants.Duration, BADConstants.FIELD_NAME_DEPENDENCIES }, // FieldTypes new IAType[] { BuiltinType.ASTRING, BuiltinType.ASTRING, BuiltinType.ASTRING, new AOrderedListType(BuiltinType.ASTRING, null), BuiltinType.ASTRING, BuiltinType.ASTRING, diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java index 78f7c95..fd918ba 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/DeployedJobSpecEventListener.java @@ -30,9 +30,6 @@ import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.metadata.IDataset; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; -import org.apache.hyracks.api.dataset.IHyracksDataset; -import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.log4j.Logger; @@ -53,9 +50,6 @@ private ScheduledExecutorService executorService = null; private final PrecompiledType type; - private IHyracksDataset hdc; - private ResultSetId resultSetId; - // members protected volatile ActivityState state; protected final ICcApplicationContext appCtx; @@ -63,29 +57,16 @@ protected final ActiveEvent statsUpdatedEvent; protected long statsTimestamp; protected String stats; - protected final String runtimeName; - protected final AlgebricksAbsolutePartitionConstraint locations; private int runningInstance; - public DeployedJobSpecEventListener(ICcApplicationContext appCtx, EntityId entityId, PrecompiledType type, - AlgebricksAbsolutePartitionConstraint locations, String runtimeName) { + public DeployedJobSpecEventListener(ICcApplicationContext appCtx, EntityId entityId, PrecompiledType type) { this.appCtx = appCtx; this.entityId = entityId; setState(ActivityState.STOPPED); this.statsTimestamp = -1; this.statsUpdatedEvent = new ActiveEvent(null, Kind.STATS_UPDATED, entityId, null); this.stats = "{\"Stats\":\"N/A\"}"; - this.runtimeName = runtimeName; - this.locations = locations; this.type = type; - } - - public IHyracksDataset getResultDataset() { - return hdc; - } - - public ResultSetId getResultId() { - return resultSetId; } public DeployedJobSpecId getDeployedJobSpecId() { @@ -121,12 +102,9 @@ return type; } - public void storeDistributedInfo(DeployedJobSpecId deployedJobSpecId, ScheduledExecutorService ses, - IHyracksDataset hdc, ResultSetId resultSetId) { + public void storeDistributedInfo(DeployedJobSpecId deployedJobSpecId, ScheduledExecutorService ses) { this.deployedJobSpecId = deployedJobSpecId; this.executorService = ses; - this.hdc = hdc; - this.resultSetId = resultSetId; } public ScheduledExecutorService getExecutorService() { diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java index 50d506b..dff4577 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/Procedure.java @@ -34,7 +34,7 @@ private final int arity; private final List<String> params; private final String body; - private final String returnType; + private final String type; private final String language; private final String duration; /* @@ -46,12 +46,12 @@ */ private final List<List<List<String>>> dependencies; - public Procedure(String dataverseName, String functionName, int arity, List<String> params, String returnType, + public Procedure(String dataverseName, String functionName, int arity, List<String> params, String type, String functionBody, String language, String duration, List<List<List<String>>> dependencies) { this.procedureId = new EntityId(BADConstants.PROCEDURE_KEYWORD, dataverseName, functionName); this.params = params; this.body = functionBody; - this.returnType = returnType == null ? RETURNTYPE_VOID : returnType; + this.type = type; this.language = language; this.arity = arity; this.duration = duration; @@ -76,8 +76,8 @@ return body; } - public String getReturnType() { - return returnType; + public String getType() { + return type; } public String getLanguage() { diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java index 0a6acb9..a0e6657 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/metadata/ProcedureTupleTranslator.java @@ -96,7 +96,7 @@ } String returnType = ((AString) procedureRecord - .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_RETURN_TYPE_FIELD_INDEX)) + .getValueByPos(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_TYPE_FIELD_INDEX)) .getStringValue(); String definition = ((AString) procedureRecord @@ -194,9 +194,9 @@ // write field 4 fieldValue.reset(); - aString.setValue(procedure.getReturnType()); + aString.setValue(procedure.getType()); stringSerde.serialize(aString, fieldValue.getDataOutput()); - recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_RETURN_TYPE_FIELD_INDEX, fieldValue); + recordBuilder.addField(BADMetadataRecordTypes.PROCEDURE_ARECORD_PROCEDURE_TYPE_FIELD_INDEX, fieldValue); // write field 5 fieldValue.reset(); diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java new file mode 100644 index 0000000..3e6c0f5 --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.recovery; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.active.EntityId; +import org.apache.asterix.active.IActiveEntityEventsListener; +import org.apache.asterix.app.active.ActiveNotificationHandler; +import org.apache.asterix.app.result.ResultReader; +import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; +import org.apache.asterix.app.translator.RequestParameters; +import org.apache.asterix.bad.BADJobService; +import org.apache.asterix.bad.lang.BADCompilationProvider; +import org.apache.asterix.bad.lang.BADLangExtension; +import org.apache.asterix.bad.lang.BADStatementExecutor; +import org.apache.asterix.bad.metadata.Channel; +import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener; +import org.apache.asterix.bad.metadata.DeployedJobSpecEventListener.PrecompiledType; +import org.apache.asterix.bad.metadata.Procedure; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.hyracks.bootstrap.GlobalRecoveryManager; +import org.apache.asterix.metadata.MetadataManager; +import org.apache.asterix.metadata.MetadataTransactionContext; +import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.ResultProperties; +import org.apache.asterix.translator.SessionConfig; +import org.apache.asterix.translator.SessionOutput; +import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider; +import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.DeployedJobSpecId; +import org.apache.hyracks.api.job.DeployedJobSpecIdFactory; +import org.apache.hyracks.client.dataset.HyracksDataset; +import org.apache.hyracks.control.common.utils.HyracksThreadFactory; + +public class BADGlobalRecoveryManager extends GlobalRecoveryManager { + + private static final Logger LOGGER = Logger.getLogger(BADGlobalRecoveryManager.class.getName()); + + @Override + protected void recover(ICcApplicationContext appCtx) throws HyracksDataException { + try { + LOGGER.info("Starting Global Recovery"); + MetadataManager.INSTANCE.init(); + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + mdTxnCtx = doRecovery(appCtx, mdTxnCtx); + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + deployJobs(appCtx); + recoveryCompleted = true; + recovering = false; + LOGGER.info("Global Recovery Completed. Refreshing cluster state..."); + appCtx.getClusterStateManager().refreshState(); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + + private void deployJobs(ICcApplicationContext appCtx) throws Exception { + + MetadataProvider metadataProvider = new MetadataProvider(appCtx, MetadataBuiltinEntities.DEFAULT_DATAVERSE); + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + metadataProvider.setMetadataTxnContext(mdTxnCtx); + + List<Channel> channels = BADLangExtension.getAllChannels(mdTxnCtx); + List<Procedure> procedures = BADLangExtension.getAllProcedures(mdTxnCtx); + + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + metadataProvider.getLocks().unlock(); + + SessionConfig sessionConfig = + new SessionConfig(SessionConfig.OutputFormat.ADM, true, true, true, SessionConfig.PlanFormat.STRING); + + BADStatementExecutor badStatementExecutor = new BADStatementExecutor(appCtx, new ArrayList<>(), + new SessionOutput(sessionConfig, null), new BADCompilationProvider(), Executors.newSingleThreadExecutor( + new HyracksThreadFactory(DefaultStatementExecutorFactory.class.getSimpleName()))); + + ActiveNotificationHandler activeEventHandler = + (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + + //Remove any lingering listeners + for (IActiveEntityEventsListener listener : activeEventHandler.getEventListeners()) { + if (listener instanceof DeployedJobSpecEventListener) { + activeEventHandler.unregisterListener(listener); + } + } + + DeployedJobSpecIdFactory deployedJobSpecIdFactory = new DeployedJobSpecIdFactory(); + + //Redeploy Jobs + for (Channel channel : channels) { + EntityId entityId = channel.getChannelId(); + DeployedJobSpecId deployedJobSpecId = deployedJobSpecIdFactory.create(); + metadataProvider = new MetadataProvider(appCtx, MetadataBuiltinEntities.DEFAULT_DATAVERSE); + DeployedJobSpecEventListener listener = new DeployedJobSpecEventListener(appCtx, entityId, + channel.getResultsDatasetName().equals("") ? PrecompiledType.PUSH_CHANNEL + : PrecompiledType.CHANNEL); + listener.storeDistributedInfo(deployedJobSpecId, null); + listener.suspend(); + activeEventHandler.registerListener(listener); + BADJobService.redeployJobSpec(entityId, channel.getChannelBody(), metadataProvider, badStatementExecutor, + hcc, new RequestParameters(null, null, null, null, null, null)); + + ScheduledExecutorService ses = BADJobService.startRepetitiveDeployedJobSpec(deployedJobSpecId, hcc, + BADJobService.findPeriod(channel.getDuration()), new HashMap<>(), entityId, + metadataProvider.getTxnIdFactory(), listener); + listener.storeDistributedInfo(deployedJobSpecId, ses); + + LOGGER.log(Level.SEVERE, entityId.getExtensionName() + " " + entityId.getDataverse() + "." + + entityId.getEntityName() + " was stopped by cluster failure. It has restarted."); + + } + for (Procedure procedure : procedures) { + EntityId entityId = procedure.getEntityId(); + DeployedJobSpecId deployedJobSpecId = deployedJobSpecIdFactory.create(); + metadataProvider = new MetadataProvider(appCtx, MetadataBuiltinEntities.DEFAULT_DATAVERSE); + metadataProvider.setWriterFactory(PrinterBasedWriterFactory.INSTANCE); + metadataProvider.setResultSerializerFactoryProvider(ResultSerializerFactoryProvider.INSTANCE); + DeployedJobSpecEventListener listener = + new DeployedJobSpecEventListener(appCtx, entityId, PrecompiledType.valueOf(procedure.getType())); + listener.storeDistributedInfo(deployedJobSpecId, null); + listener.suspend(); + activeEventHandler.registerListener(listener); + BADJobService.redeployJobSpec(entityId, procedure.getBody(), metadataProvider, badStatementExecutor, hcc, + new RequestParameters( + new HyracksDataset(hcc, appCtx.getCompilerProperties().getFrameSize(), + ResultReader.NUM_READERS), + new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE), + new IStatementExecutor.Stats(), null, null, null)); + //Log that the procedure stopped by cluster restart. Procedure is available again now. + LOGGER.log(Level.SEVERE, entityId.getExtensionName() + " " + entityId.getDataverse() + "." + + entityId.getEntityName() + + " was lost with cluster failure and any repetitive instances have stopped. It is now available to run again."); + //TODO: allow repetitive procedures to restart execution automatically + //Issue: need to store in metadata the information for running instances + } + hcc.resetDeployedJobIdFactory(deployedJobSpecIdFactory.maxDeployedJobSpecId()); + } +} diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADRecoveryExtension.java b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADRecoveryExtension.java new file mode 100644 index 0000000..b5f6465 --- /dev/null +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADRecoveryExtension.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.bad.recovery; + +import java.util.List; + +import org.apache.asterix.app.cc.IGlobalRecoveryExtension; +import org.apache.asterix.common.api.ExtensionId; +import org.apache.asterix.common.cluster.IGlobalRecoveryManager; +import org.apache.hyracks.algebricks.common.utils.Pair; + +public class BADRecoveryExtension implements IGlobalRecoveryExtension { + + public static final ExtensionId BAD_RECOVERY_EXTENSION_ID = + new ExtensionId(BADRecoveryExtension.class.getSimpleName(), 0); + + private static class LazyHolder { + private static final IGlobalRecoveryManager INSTANCE = new BADGlobalRecoveryManager(); + + } + + @Override + public ExtensionId getId() { + return BAD_RECOVERY_EXTENSION_ID; + } + + @Override + public void configure(List<Pair<String, String>> args) { + } + + @Override + public IGlobalRecoveryManager getGlobalRecoveryManager() { + return LazyHolder.INSTANCE; + } +} diff --git a/asterix-bad/src/main/resources/cc.conf b/asterix-bad/src/main/resources/cc.conf index 1153dcc..371cbe8 100644 --- a/asterix-bad/src/main/resources/cc.conf +++ b/asterix-bad/src/main/resources/cc.conf @@ -59,3 +59,5 @@ enabled = true [extension/org.apache.asterix.bad.metadata.BADMetadataExtension] enabled = true +[extension/org.apache.asterix.bad.recovery.BADRecoveryExtension] +enabled = true \ No newline at end of file diff --git a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADListenerTest.java b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADListenerTest.java index 1cd49e3..8dbfc6d 100644 --- a/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADListenerTest.java +++ b/asterix-bad/src/test/java/org/apache/asterix/bad/test/BADListenerTest.java @@ -68,7 +68,7 @@ public static void init() { djsel = new DeployedJobSpecEventListener(null, new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, "test", "test"), - DeployedJobSpecEventListener.PrecompiledType.CHANNEL, null, "BadListener"); + DeployedJobSpecEventListener.PrecompiledType.CHANNEL); } @Test diff --git a/asterix-bad/src/test/resources/runtimets/results/procedure/create_procedure_check_metadata/create_procedure_check_metadata.1.adm b/asterix-bad/src/test/resources/runtimets/results/procedure/create_procedure_check_metadata/create_procedure_check_metadata.1.adm index c41aec1..79fa293 100644 --- a/asterix-bad/src/test/resources/runtimets/results/procedure/create_procedure_check_metadata/create_procedure_check_metadata.1.adm +++ b/asterix-bad/src/test/resources/runtimets/results/procedure/create_procedure_check_metadata/create_procedure_check_metadata.1.adm @@ -1,6 +1,6 @@ -{ "DataverseName": "two", "ProcedureName": "addMe", "Arity": "0", "Params": [ ], "ReturnType": "VOID", "Definition": "use two;\ninsert into channels.UserLocations([\n {\"timeStamp\":current_datetime(), \"roomNumber\":222}]\n );", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ ] ] } -{ "DataverseName": "two", "ProcedureName": "deleteSome", "Arity": "2", "Params": [ "r", "otherRoom" ], "ReturnType": "VOID", "Definition": "use two;\ndelete from channels.UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand channels.really_contains(roomNumber,\"l\");", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ [ "channels", "really_contains", "2" ], [ "two", "get_job_param", "1" ], [ "two", "get_job_param", "1" ] ] ] } -{ "DataverseName": "two", "ProcedureName": "localAddMe", "Arity": "0", "Params": [ ], "ReturnType": "VOID", "Definition": "use two;\ninsert into UserLocations([\n {\"timeStamp\":current_datetime(), \"roomNumber\":222}]\n );", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "two", "UserLocations" ] ], [ ] ] } -{ "DataverseName": "two", "ProcedureName": "localDeleteSome", "Arity": "2", "Params": [ "r", "otherRoom" ], "ReturnType": "VOID", "Definition": "use two;\ndelete from UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand really_contains(roomNumber,\"l\");", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "two", "UserLocations" ] ], [ [ "two", "get_job_param", "1" ], [ "two", "really_contains", "2" ], [ "two", "get_job_param", "1" ] ] ] } -{ "DataverseName": "two", "ProcedureName": "localSelectSome", "Arity": "2", "Params": [ "r", "otherRoom" ], "ReturnType": "VOID", "Definition": "use two;\nselect roomNumber from UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand really_contains(roomNumber,\"l\")\norder by id;", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "two", "UserLocations" ] ], [ [ "two", "get_job_param", "1" ], [ "two", "really_contains", "2" ], [ "two", "get_job_param", "1" ] ] ] } -{ "DataverseName": "two", "ProcedureName": "selectSome", "Arity": "2", "Params": [ "r", "otherRoom" ], "ReturnType": "VOID", "Definition": "use two;\nselect roomNumber from channels.UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand channels.really_contains(roomNumber,\"l\")\norder by id;", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ [ "channels", "really_contains", "2" ], [ "two", "get_job_param", "1" ], [ "two", "get_job_param", "1" ] ] ] } \ No newline at end of file +{ "DataverseName": "two", "ProcedureName": "addMe", "Arity": "0", "Params": [ ], "Type": "INSERT", "Definition": "use two;\ninsert into channels.UserLocations([\n {\"timeStamp\":current_datetime(), \"roomNumber\":222}]\n );", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ ] ] } +{ "DataverseName": "two", "ProcedureName": "deleteSome", "Arity": "2", "Params": [ "r", "otherRoom" ], "Type": "DELETE", "Definition": "use two;\ndelete from channels.UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand channels.really_contains(roomNumber,\"l\");", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ [ "channels", "really_contains", "2" ], [ "two", "get_job_param", "1" ], [ "two", "get_job_param", "1" ] ] ] } +{ "DataverseName": "two", "ProcedureName": "localAddMe", "Arity": "0", "Params": [ ], "Type": "INSERT", "Definition": "use two;\ninsert into UserLocations([\n {\"timeStamp\":current_datetime(), \"roomNumber\":222}]\n );", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "two", "UserLocations" ] ], [ ] ] } +{ "DataverseName": "two", "ProcedureName": "localDeleteSome", "Arity": "2", "Params": [ "r", "otherRoom" ], "Type": "DELETE", "Definition": "use two;\ndelete from UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand really_contains(roomNumber,\"l\");", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "two", "UserLocations" ] ], [ [ "two", "get_job_param", "1" ], [ "two", "really_contains", "2" ], [ "two", "get_job_param", "1" ] ] ] } +{ "DataverseName": "two", "ProcedureName": "localSelectSome", "Arity": "2", "Params": [ "r", "otherRoom" ], "Type": "QUERY", "Definition": "use two;\nselect roomNumber from UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand really_contains(roomNumber,\"l\")\norder by id;", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "two", "UserLocations" ] ], [ [ "two", "get_job_param", "1" ], [ "two", "really_contains", "2" ], [ "two", "get_job_param", "1" ] ] ] } +{ "DataverseName": "two", "ProcedureName": "selectSome", "Arity": "2", "Params": [ "r", "otherRoom" ], "Type": "QUERY", "Definition": "use two;\nselect roomNumber from channels.UserLocations\nwhere roomNumber = get_job_param(\"r\")\nor roomNumber = get_job_param(\"otherRoom\")\nand channels.really_contains(roomNumber,\"l\")\norder by id;", "Language": "AQL", "Duration": "", "Dependencies": [ [ [ "channels", "UserLocations" ] ], [ [ "channels", "really_contains", "2" ], [ "two", "get_job_param", "1" ], [ "two", "get_job_param", "1" ] ] ] } \ No newline at end of file -- To view, visit https://asterix-gerrit.ics.uci.edu/2641 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I6897ccf9cddb9ec8d10256e252ee893afe6db145 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb-bad Gerrit-Branch: master Gerrit-Owner: Steven Jacobs <sjaco...@ucr.edu>