abdullah alamoudi has submitted this change and it was merged. Change subject: Refactor Active Listeners ......................................................................
Refactor Active Listeners Change-Id: I260c8608329523f56dc54780d87d796f838505cf Reviewed-on: https://asterix-gerrit.ics.uci.edu/1118 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> --- M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java M asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessRule.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java A asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSourcePartitioningProvider.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSource.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java M hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletFileDataSource.java M hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java M hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java 26 files changed, 331 insertions(+), 265 deletions(-) Approvals: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; No violations found; Verified diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java index 30a2eb6..5ff02c7 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveJobNotificationHandler.java @@ -198,4 +198,17 @@ LOGGER.severe("No listener was found for the entity: " + activeJob.getEntityId()); } } + + public synchronized void unregisterListener(IActiveEntityEventsListener listener) throws HyracksDataException { + if (DEBUG) { + LOGGER.log(Level.WARNING, + "unregisterListener(IActiveEntityEventsListener listener) was called for the entity " + + listener.getEntityId()); + } + IActiveEntityEventsListener registeredListener = entityEventListener.remove(listener.getEntityId()); + if (registeredListener == null) { + throw new HyracksDataException( + "Active Entity Listener " + listener.getEntityId() + " hasn't been registered"); + } + } } diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java index 156576c..2dd9fe7 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/IActiveEntityEventsListener.java @@ -31,4 +31,6 @@ public EntityId getEntityId(); + public boolean isEntityConnectedToDataset(String dataverseName, String datasetName); + } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java index 916355d..a574711 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceMaterializationForInsertWithSelfScanRule.java @@ -18,14 +18,13 @@ */ package org.apache.asterix.optimizer.rules; -import org.apache.commons.lang3.mutable.Mutable; -import org.apache.commons.lang3.mutable.MutableObject; - import org.apache.asterix.metadata.declared.AqlDataSource; import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType; import org.apache.asterix.metadata.declared.DatasetDataSource; import org.apache.asterix.om.functions.AsterixBuiltinFunctions; import org.apache.asterix.optimizer.rules.am.AccessMethodJobGenParams; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; @@ -45,7 +44,8 @@ public class IntroduceMaterializationForInsertWithSelfScanRule implements IAlgebraicRewriteRule { @Override - public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException { + public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) + throws AlgebricksException { return false; } @@ -105,11 +105,10 @@ } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) { DataSourceScanOperator dataSourceScanOp = (DataSourceScanOperator) descendantOp; AqlDataSource ds = (AqlDataSource) dataSourceScanOp.getDataSource(); - if (ds.getDatasourceType() != AqlDataSourceType.FEED - && ds.getDatasourceType() != AqlDataSourceType.LOADABLE) { - if (((DatasetDataSource) ds).getDataset().getDatasetName().compareTo(insertDatasetName) == 0) { - return true; - } + if ((ds.getDatasourceType() == AqlDataSourceType.INTERNAL_DATASET + || ds.getDatasourceType() == AqlDataSourceType.EXTERNAL_DATASET) + && ((DatasetDataSource) ds).getDataset().getDatasetName().compareTo(insertDatasetName) == 0) { + return true; } } sameDataset = checkIfInsertAndScanDatasetsSame(descendantOp, insertDatasetName); diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java index 4f5a848..815bd93 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java @@ -18,12 +18,12 @@ */ package org.apache.asterix.optimizer.rules; -import org.apache.commons.lang3.mutable.Mutable; -import org.apache.commons.lang3.mutable.MutableObject; import org.apache.asterix.metadata.declared.AqlDataSource; import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType; -import org.apache.asterix.metadata.entities.Feed; import org.apache.asterix.metadata.declared.FeedDataSource; +import org.apache.asterix.metadata.entities.Feed; +import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; @@ -41,7 +41,8 @@ public class IntroduceRandomPartitioningFeedComputationRule implements IAlgebraicRewriteRule { @Override - public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException { + public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) + throws AlgebricksException { ILogicalOperator op = opRef.getValue(); if (!op.getOperatorTag().equals(LogicalOperatorTag.ASSIGN)) { return false; @@ -54,7 +55,7 @@ DataSourceScanOperator scanOp = (DataSourceScanOperator) opChild; AqlDataSource dataSource = (AqlDataSource) scanOp.getDataSource(); - if (!dataSource.getDatasourceType().equals(AqlDataSourceType.FEED)) { + if (dataSource.getDatasourceType() != AqlDataSourceType.FEED) { return false; } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java index 1fa7730..06a6f37 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java @@ -85,7 +85,10 @@ LogicalVariable dataVar = dataSource.getDataRecordVariable(allVars); LogicalVariable metaVar = dataSource.getMetaVariable(allVars); LogicalExpressionReferenceTransform currentTransformer = null; - if (dataSource.getDatasourceType() == AqlDataSourceType.FEED) { + // https://issues.apache.org/jira/browse/ASTERIXDB-1618 + if (dataSource.getDatasourceType() != AqlDataSourceType.EXTERNAL_DATASET + && dataSource.getDatasourceType() != AqlDataSourceType.INTERNAL_DATASET + && dataSource.getDatasourceType() != AqlDataSourceType.LOADABLE) { IMutationDataSource mds = (IMutationDataSource) dataSource; if (mds.isChange()) { transformers = new ArrayList<>(); diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessRule.java index 079f61a..7af3c1a 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessRule.java @@ -24,7 +24,6 @@ import java.util.List; import org.apache.asterix.algebra.base.AsterixOperatorAnnotations; -import org.apache.asterix.om.util.ConstantExpressionUtil; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.metadata.declared.AqlDataSource; import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType; @@ -40,6 +39,7 @@ import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.util.ConstantExpressionUtil; import org.apache.asterix.optimizer.base.AnalysisUtil; import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; @@ -85,7 +85,7 @@ } AssignOperator access = (AssignOperator) op; ILogicalExpression expr = getFirstExpr(access); - String finalAnnot = null; + String finalAnnot; if (AnalysisUtil.isAccessToFieldRecord(expr)) { finalAnnot = AsterixOperatorAnnotations.PUSHED_FIELD_ACCESS; } else if (AnalysisUtil.isRunnableAccessToFieldRecord(expr)) { @@ -195,17 +195,17 @@ propagateFieldAccessRec(opRef2, context, finalAnnot); return true; } - List<LogicalVariable> usedInAccess = new LinkedList<LogicalVariable>(); + List<LogicalVariable> usedInAccess = new LinkedList<>(); VariableUtilities.getUsedVariables(access, usedInAccess); - List<LogicalVariable> produced2 = new LinkedList<LogicalVariable>(); + List<LogicalVariable> produced2 = new LinkedList<>(); if (op2.getOperatorTag() == LogicalOperatorTag.GROUP) { VariableUtilities.getLiveVariables(op2, produced2); } else { VariableUtilities.getProducedVariables(op2, produced2); } boolean pushItDown = false; - List<LogicalVariable> inter = new ArrayList<LogicalVariable>(usedInAccess); + List<LogicalVariable> inter = new ArrayList<>(usedInAccess); if (inter.isEmpty()) { // ground value return false; } @@ -214,7 +214,8 @@ pushItDown = true; } else if (op2.getOperatorTag() == LogicalOperatorTag.GROUP) { GroupByOperator g = (GroupByOperator) op2; - List<Pair<LogicalVariable, LogicalVariable>> varMappings = new ArrayList<Pair<LogicalVariable, LogicalVariable>>(); + List<Pair<LogicalVariable, LogicalVariable>> varMappings = + new ArrayList<>(); for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : g.getDecorList()) { ILogicalExpression e = p.second.getValue(); if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) { @@ -222,7 +223,7 @@ if (inter.contains(decorVar)) { inter.remove(decorVar); LogicalVariable v1 = ((VariableReferenceExpression) e).getVariableReference(); - varMappings.add(new Pair<LogicalVariable, LogicalVariable>(decorVar, v1)); + varMappings.add(new Pair<>(decorVar, v1)); } } } @@ -257,7 +258,7 @@ return true; } else { for (Mutable<ILogicalOperator> inp : op2.getInputs()) { - HashSet<LogicalVariable> v2 = new HashSet<LogicalVariable>(); + HashSet<LogicalVariable> v2 = new HashSet<>(); VariableUtilities.getLiveVariables(inp.getValue(), v2); if (v2.containsAll(usedInAccess)) { pushAccessDown(opRef, op2, inp, context, finalAnnot); @@ -269,7 +270,7 @@ AbstractOperatorWithNestedPlans nestedOp = (AbstractOperatorWithNestedPlans) op2; for (ILogicalPlan plan : nestedOp.getNestedPlans()) { for (Mutable<ILogicalOperator> root : plan.getRoots()) { - HashSet<LogicalVariable> v2 = new HashSet<LogicalVariable>(); + HashSet<LogicalVariable> v2 = new HashSet<>(); VariableUtilities.getLiveVariables(root.getValue(), v2); if (v2.containsAll(usedInAccess)) { pushAccessDown(opRef, op2, root, context, finalAnnot); @@ -297,7 +298,7 @@ ILogicalExpression e1 = accessFun.getArguments().get(1).getValue(); if (e1.getExpressionTag() == LogicalExpressionTag.CONSTANT) { IDataSource<AqlSourceId> dataSource = (IDataSource<AqlSourceId>) scan.getDataSource(); - AqlDataSourceType dsType = ((AqlDataSource) dataSource).getDatasourceType(); + byte dsType = ((AqlDataSource) dataSource).getDatasourceType(); if (dsType == AqlDataSourceType.FEED || dsType == AqlDataSourceType.LOADABLE) { return false; } @@ -368,7 +369,7 @@ // indirect recursivity with propagateFieldAccessRec private void pushAccessDown(Mutable<ILogicalOperator> fldAccessOpRef, ILogicalOperator op2, Mutable<ILogicalOperator> inputOfOp2, IOptimizationContext context, String finalAnnot) - throws AlgebricksException { + throws AlgebricksException { ILogicalOperator fieldAccessOp = fldAccessOpRef.getValue(); fldAccessOpRef.setValue(op2); List<Mutable<ILogicalOperator>> faInpList = fieldAccessOp.getInputs(); diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java index b088607..25dcf35 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/RemoveSortInFeedIngestionRule.java @@ -18,10 +18,9 @@ */ package org.apache.asterix.optimizer.rules; -import org.apache.commons.lang3.mutable.Mutable; - import org.apache.asterix.metadata.declared.AqlDataSource; import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType; +import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; @@ -33,7 +32,8 @@ public class RemoveSortInFeedIngestionRule implements IAlgebraicRewriteRule { @Override - public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException { + public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) + throws AlgebricksException { return false; } @@ -51,7 +51,7 @@ while (descendantOp != null) { if (descendantOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) { AqlDataSource dataSource = (AqlDataSource) ((DataSourceScanOperator) descendantOp).getDataSource(); - if (dataSource.getDatasourceType().equals(AqlDataSourceType.FEED)) { + if (dataSource.getDatasourceType() == AqlDataSourceType.FEED) { isSourceAFeed = true; } break; diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java index f483d70..01476e7 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java @@ -228,7 +228,7 @@ DataSourceScanOperator dataSourceScan = (DataSourceScanOperator) sourceOpRefs.get(i).getValue(); IDataSource<?> datasource = dataSourceScan.getDataSource(); if (datasource instanceof AqlDataSource) { - AqlDataSourceType dsType = ((AqlDataSource) datasource).getDatasourceType(); + byte dsType = ((AqlDataSource) datasource).getDatasourceType(); if (dsType != AqlDataSourceType.INTERNAL_DATASET && dsType != AqlDataSourceType.EXTERNAL_DATASET) { return false; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java index 5b91f2f..015044c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/QueryServiceServlet.java @@ -37,6 +37,7 @@ import javax.servlet.http.HttpServletResponse; import org.apache.asterix.app.result.ResultReader; +import org.apache.asterix.app.result.ResultUtil; import org.apache.asterix.app.translator.QueryTranslator; import org.apache.asterix.common.app.SessionConfig; import org.apache.asterix.common.config.GlobalConfig; @@ -51,7 +52,6 @@ import org.apache.asterix.translator.IStatementExecutor.Stats; import org.apache.asterix.translator.IStatementExecutorFactory; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.dataset.IHyracksDataset; @@ -119,7 +119,7 @@ } } - private enum ResultFields { + public enum ResultFields { REQUEST_ID("requestID"), SIGNATURE("signature"), TYPE("type"), @@ -139,7 +139,7 @@ } } - private enum ResultStatus { + public enum ResultStatus { SUCCESS("success"), TIMEOUT("timeout"), ERRORS("errors"), @@ -331,7 +331,7 @@ } private static void printError(PrintWriter pw, Throwable e) { - Throwable rootCause = ExceptionUtils.getRootCause(e); + Throwable rootCause = ResultUtil.getRootCause(e); if (rootCause == null) { rootCause = e; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java index b5fd96e..979f7f4 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultUtil.java @@ -158,12 +158,12 @@ return errorMessage.toString(); } - private static Throwable getRootCause(Throwable cause) { + public static Throwable getRootCause(Throwable cause) { Throwable currentCause = cause; Throwable nextCause = cause.getCause(); while (nextCause != null && nextCause != currentCause) { currentCause = nextCause; - nextCause = cause.getCause(); + nextCause = nextCause.getCause(); } return currentCause; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 6390b9d..727a1f9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -190,6 +190,7 @@ import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.dataset.IHyracksDataset; import org.apache.hyracks.api.dataset.ResultSetId; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; @@ -696,8 +697,7 @@ StringBuilder builder = null; IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners(); for (IActiveEntityEventsListener listener : listeners) { - if (listener instanceof FeedEventsListener - && ((FeedEventsListener) listener).isConnectedToDataset(datasetName)) { + if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) { if (builder == null) { builder = new StringBuilder(); } @@ -706,7 +706,7 @@ } if (builder != null) { throw new AsterixException("Dataset " + dataverseName + "." + datasetName + " is currently being " - + "fed into by the following feed(s).\n" + builder.toString() + "\n" + "Operation not supported"); + + "fed into by the following active entities.\n" + builder.toString()); } } @@ -1411,22 +1411,11 @@ Map<FeedConnectionId, Pair<JobSpecification, Boolean>> disconnectJobList = new HashMap<>(); if (ds.getDatasetType() == DatasetType.INTERNAL) { // prepare job spec(s) that would disconnect any active feeds involving the dataset. - IActiveEntityEventsListener[] feedConnections = ActiveJobNotificationHandler.INSTANCE.getEventListeners(); - for (IActiveEntityEventsListener conn : feedConnections) { - if (conn.getEntityId().getExtensionName().equals(Feed.EXTENSION_NAME) - && ((FeedEventsListener) conn).isConnectedToDataset(datasetName)) { - FeedConnectionId connectionId = new FeedConnectionId(conn.getEntityId(), datasetName); - Pair<JobSpecification, Boolean> p = FeedOperations.buildDisconnectFeedJobSpec(metadataProvider, - connectionId); - disconnectJobList.put(connectionId, p); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Disconnecting feed " + connectionId.getFeedId().getEntityName() + " from dataset " - + datasetName + " as dataset is being dropped"); - } - // prepare job to remove feed log storage - jobsToExecute.add(FeedOperations.buildRemoveFeedStorageJob(MetadataManager.INSTANCE.getFeed( - mdTxnCtx.getValue(), connectionId.getFeedId().getDataverse(), - connectionId.getFeedId().getEntityName()))); + IActiveEntityEventsListener[] activeListeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners(); + for (IActiveEntityEventsListener listener : activeListeners) { + if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) { + throw new AsterixException( + "Can't drop dataset since it is connected to active entity: " + listener.getEntityId()); } } @@ -1547,8 +1536,7 @@ IActiveEntityEventsListener[] listeners = ActiveJobNotificationHandler.INSTANCE.getEventListeners(); StringBuilder builder = null; for (IActiveEntityEventsListener listener : listeners) { - if (listener.getEntityId().getExtensionName().equals(Feed.EXTENSION_NAME) - && ((FeedEventsListener) listener).isConnectedToDataset(datasetName)) { + if (listener.isEntityConnectedToDataset(dataverseName, datasetName)) { if (builder == null) { builder = new StringBuilder(); } @@ -1557,8 +1545,8 @@ } if (builder != null) { throw new AsterixException( - "Dataset" + datasetName + " is currently being fed into by the following feeds " + "." - + builder.toString() + "\nOperation not supported."); + "Dataset" + datasetName + " is currently being fed into by the following active entities: " + + builder.toString()); } if (ds.getDatasetType() == DatasetType.INTERNAL) { @@ -1709,7 +1697,7 @@ e.addSuppressed(e2); abort(e, e2, mdTxnCtx); throw new IllegalStateException("System is inconsistent state: pending index(" - + dataverseName + "." + datasetName + "." + + dataverseName + "." + datasetName + "." + indexName + ") couldn't be removed from the metadata", e); } } @@ -2008,17 +1996,19 @@ } } - protected void handleCreateFeedPolicyStatement(AqlMetadataProvider metadataProvider, Statement stmt) throws Exception { - MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); - metadataProvider.setMetadataTxnContext(mdTxnCtx); + protected void handleCreateFeedPolicyStatement(AqlMetadataProvider metadataProvider, Statement stmt) + throws AlgebricksException, HyracksDataException { String dataverse; String policy; FeedPolicyEntity newPolicy = null; + MetadataTransactionContext mdTxnCtx = null; CreateFeedPolicyStatement cfps = (CreateFeedPolicyStatement) stmt; dataverse = getActiveDataverse(null); policy = cfps.getPolicyName(); MetadataLockManager.INSTANCE.createFeedPolicyBegin(dataverse, dataverse + "." + policy); try { + mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + metadataProvider.setMetadataTxnContext(mdTxnCtx); FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, policy); if (feedPolicy != null) { @@ -2036,8 +2026,9 @@ .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, cfps.getSourcePolicyName()); if (sourceFeedPolicy == null) { - sourceFeedPolicy = MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(), - MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName()); + sourceFeedPolicy = + MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(), + MetadataConstants.METADATA_DATAVERSE_NAME, cfps.getSourcePolicyName()); if (sourceFeedPolicy == null) { throw new AlgebricksException("Unknown policy " + cfps.getSourcePolicyName()); } @@ -2061,9 +2052,9 @@ } MetadataManager.INSTANCE.addFeedPolicy(mdTxnCtx, newPolicy); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); - } catch (Exception e) { + } catch (RemoteException | ACIDException e) { abort(e, e, mdTxnCtx); - throw e; + throw new HyracksDataException(e); } finally { MetadataLockManager.INSTANCE.createFeedPolicyEnd(dataverse, dataverse + "." + policy); } @@ -2350,7 +2341,7 @@ IFeedLifecycleEventSubscriber eventSubscriber = new FeedLifecycleEventSubscriber(); FeedEventsListener listener = (FeedEventsListener) ActiveJobNotificationHandler.INSTANCE .getActiveEntityListener(entityId); - if (listener == null || !listener.isConnectedToDataset(datasetName)) { + if (listener == null || !listener.isEntityConnectedToDataset(dataverseName, datasetName)) { throw new AsterixException("Feed " + feed.getFeedId().getEntityName() + " is currently not connected to " + cfs.getDatasetName().getValue() + ". Invalid operation!"); } @@ -2873,7 +2864,7 @@ default: throw new AlgebricksException( "The system \"" + runStmt.getSystem() + - "\" specified in your run statement is not supported."); + "\" specified in your run statement is not supported."); } } @@ -3107,12 +3098,21 @@ return getActiveDataverseName(dataverse != null ? dataverse.getValue() : null); } + /** + * Abort the ongoing metadata transaction logging the error cause + * + * @param rootE + * @param parentE + * @param mdTxnCtx + */ public static void abort(Exception rootE, Exception parentE, MetadataTransactionContext mdTxnCtx) { try { if (IS_DEBUG_MODE) { LOGGER.log(Level.SEVERE, rootE.getMessage(), rootE); } - MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); + if (mdTxnCtx != null) { + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); + } } catch (Exception e2) { parentE.addSuppressed(e2); throw new IllegalStateException(rootE); diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java index 47290e7..4dc206c 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java @@ -43,10 +43,10 @@ import org.apache.asterix.test.server.ITestServer; import org.apache.asterix.test.server.TestServerProvider; import org.apache.asterix.testframework.context.TestCaseContext; -import org.apache.asterix.testframework.context.TestFileContext; import org.apache.asterix.testframework.context.TestCaseContext.OutputFormat; -import org.apache.asterix.testframework.xml.TestGroup; +import org.apache.asterix.testframework.context.TestFileContext; import org.apache.asterix.testframework.xml.TestCase.CompilationUnit; +import org.apache.asterix.testframework.xml.TestGroup; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.http.HttpResponse; @@ -249,7 +249,7 @@ } } - private HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception { + protected HttpResponse executeHttpRequest(HttpUriRequest method) throws Exception { HttpClient client = HttpClients.custom() .setRetryHandler(StandardHttpRequestRetryHandler.INSTANCE) .build(); @@ -270,8 +270,8 @@ String errorBody = EntityUtils.toString(httpResponse.getEntity()); try { JSONObject result = new JSONObject(errorBody); - String[] errors = {result.getJSONArray("error-code").getString(0), result.getString("summary"), - result.getString("stacktrace")}; + String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"), + result.getString("stacktrace") }; GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]); String exceptionMsg = "HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + httpResponse.getStatusLine() @@ -307,7 +307,7 @@ return response.getEntity().getContent(); } - private void setFormatParam(List<CompilationUnit.Parameter> params, OutputFormat fmt) { + protected void setFormatParam(List<CompilationUnit.Parameter> params, OutputFormat fmt) { boolean formatSet = false; for (CompilationUnit.Parameter param : params) { if ("format".equals(param.getName())) { @@ -344,7 +344,7 @@ return builder.build(); } - private HttpUriRequest constructPostMethod(String statement, String endpoint, String stmtParam, + protected HttpUriRequest constructPostMethod(String statement, String endpoint, String stmtParam, boolean postStmtAsParam, List<CompilationUnit.Parameter> otherParams) { RequestBuilder builder = RequestBuilder.post(endpoint); if (postStmtAsParam) { @@ -513,7 +513,6 @@ boolean isDmlRecoveryTest) throws Exception { executeTest(actualPath, testCaseCtx, pb, isDmlRecoveryTest, null); } - public void executeTest(TestCaseContext testCaseCtx, TestFileContext ctx, String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit, MutableInt queryCount, @@ -699,7 +698,7 @@ } break; case "server": // (start <test server name> <port> - // [<arg1>][<arg2>][<arg3>]...|stop (<port>|all)) + // [<arg1>][<arg2>][<arg3>]...|stop (<port>|all)) try { lines = statement.trim().split("\n"); String[] command = lines[lines.length - 1].trim().split(" "); @@ -747,7 +746,7 @@ } break; case "lib": // expected format <dataverse-name> <library-name> - // <library-directory> + // <library-directory> // TODO: make this case work well with entity names containing spaces by // looking for \" lines = statement.split("\n"); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java index f4e67f3..c7bed8d 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java @@ -180,4 +180,8 @@ public void setMetaType(ARecordType metaType) { this.metaType = metaType; } + + public IExternalDataSourceFactory getDataSourceFactory() { + return dataSourceFactory; + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java index c40fed6..7f2191c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedEventsListener.java @@ -611,7 +611,7 @@ cInfo.setState(ActivityState.ACTIVE); } - public synchronized boolean isConnectedToDataset(String datasetName) { + private synchronized boolean isConnectedToDataset(String datasetName) { for (FeedConnectionId connection : connectJobInfos.keySet()) { if (connection.getDatasetName().equals(datasetName)) { return true; @@ -641,4 +641,9 @@ public IFeedJoint getSourceFeedJoint() { return sourceFeedJoint; } + + @Override + public boolean isEntityConnectedToDataset(String dataverseName, String datasetName) { + return isConnectedToDataset(datasetName); + } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java index 1272d03..d406108 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSource.java @@ -24,29 +24,19 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.asterix.om.types.IAType; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.common.utils.ListSet; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource; import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind; import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency; import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; -import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; -import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; -import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty; -import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; -import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty; -import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; -import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.job.JobSpecification; @@ -56,19 +46,24 @@ protected final AqlSourceId id; protected final IAType itemType; protected final IAType metaItemType; - protected final AqlDataSourceType datasourceType; + protected final byte datasourceType; protected IAType[] schemaTypes; protected INodeDomain domain; protected Map<String, Serializable> properties = new HashMap<>(); - public enum AqlDataSourceType { - INTERNAL_DATASET, - EXTERNAL_DATASET, - FEED, - LOADABLE + public static class AqlDataSourceType { + // positive range is reserved for core datasource types + public static final byte INTERNAL_DATASET = 0x00; + public static final byte EXTERNAL_DATASET = 0x01; + public static final byte FEED = 0x02; + public static final byte LOADABLE = 0x03; + + // Hide implicit public constructor + private AqlDataSourceType() { + } } - public AqlDataSource(AqlSourceId id, IAType itemType, IAType metaItemType, AqlDataSourceType datasourceType, + public AqlDataSource(AqlSourceId id, IAType itemType, IAType metaItemType, byte datasourceType, INodeDomain domain) throws AlgebricksException { this.id = id; this.itemType = itemType; @@ -118,72 +113,7 @@ } } - private static class AqlDataSourcePartitioningProvider implements IDataSourcePropertiesProvider { - - private final AqlDataSource ds; - - private final INodeDomain domain; - - public AqlDataSourcePartitioningProvider(AqlDataSource dataSource, INodeDomain domain) { - this.ds = dataSource; - this.domain = domain; - } - - @Override - public IPhysicalPropertiesVector computePropertiesVector(List<LogicalVariable> scanVariables) { - IPhysicalPropertiesVector propsVector = null; - IPartitioningProperty pp; - List<ILocalStructuralProperty> propsLocal; - int n; - switch (ds.getDatasourceType()) { - case LOADABLE: - case EXTERNAL_DATASET: - pp = new RandomPartitioningProperty(domain); - propsLocal = new ArrayList<ILocalStructuralProperty>(); - ds.computeLocalStructuralProperties(propsLocal, scanVariables); - propsVector = new StructuralPropertiesVector(pp, propsLocal); - break; - - case FEED: - n = scanVariables.size(); - if (n < 2) { - pp = new RandomPartitioningProperty(domain); - } else { - Set<LogicalVariable> pvars = new ListSet<LogicalVariable>(); - pvars.addAll(ds.getPrimaryKeyVariables(scanVariables)); - pp = new UnorderedPartitionedProperty(pvars, domain); - } - propsLocal = new ArrayList<ILocalStructuralProperty>(); - propsVector = new StructuralPropertiesVector(pp, propsLocal); - break; - - case INTERNAL_DATASET: - n = scanVariables.size(); - Set<LogicalVariable> pvars = new ListSet<LogicalVariable>(); - if (n < 2) { - pp = new RandomPartitioningProperty(domain); - } else { - pvars.addAll(ds.getPrimaryKeyVariables(scanVariables)); - pp = new UnorderedPartitionedProperty(pvars, domain); - } - propsLocal = new ArrayList<ILocalStructuralProperty>(); - List<OrderColumn> orderColumns = new ArrayList<OrderColumn>(); - for (LogicalVariable pkVar : pvars) { - orderColumns.add(new OrderColumn(pkVar, OrderKind.ASC)); - } - propsLocal.add(new LocalOrderProperty(orderColumns)); - propsVector = new StructuralPropertiesVector(pp, propsLocal); - break; - - default: - throw new IllegalArgumentException(); - } - return propsVector; - } - - } - - public AqlDataSourceType getDatasourceType() { + public byte getDatasourceType() { return datasourceType; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSourcePartitioningProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSourcePartitioningProvider.java new file mode 100644 index 0000000..14a6e68 --- /dev/null +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlDataSourcePartitioningProvider.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.metadata.declared; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.asterix.metadata.declared.AqlDataSource.AqlDataSourceType; +import org.apache.hyracks.algebricks.common.utils.ListSet; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; +import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind; +import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; +import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; +import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty; +import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector; +import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty; + +public class AqlDataSourcePartitioningProvider implements IDataSourcePropertiesProvider { + + private final AqlDataSource ds; + private final INodeDomain domain; + + public AqlDataSourcePartitioningProvider(AqlDataSource dataSource, INodeDomain domain) { + this.ds = dataSource; + this.domain = domain; + } + + @Override + public IPhysicalPropertiesVector computePropertiesVector(List<LogicalVariable> scanVariables) { + IPhysicalPropertiesVector propsVector; + IPartitioningProperty pp; + List<ILocalStructuralProperty> propsLocal = new ArrayList<>(); + switch (ds.getDatasourceType()) { + case AqlDataSourceType.LOADABLE: + case AqlDataSourceType.EXTERNAL_DATASET: + pp = new RandomPartitioningProperty(domain); + ds.computeLocalStructuralProperties(propsLocal, scanVariables); + break; + case AqlDataSourceType.FEED: + pp = getFeedPartitioningProperty(ds, domain, scanVariables); + break; + case AqlDataSourceType.INTERNAL_DATASET: + Set<LogicalVariable> pvars = new ListSet<>(); + pp = getInternalDatasetPartitioningProperty(ds, domain, scanVariables, pvars); + propsLocal.add(new LocalOrderProperty(getOrderColumns(pvars))); + break; + default: + throw new IllegalArgumentException(); + } + propsVector = new StructuralPropertiesVector(pp, propsLocal); + return propsVector; + } + + private static List<OrderColumn> getOrderColumns(Set<LogicalVariable> pvars) { + List<OrderColumn> orderColumns = new ArrayList<>(); + for (LogicalVariable pkVar : pvars) { + orderColumns.add(new OrderColumn(pkVar, OrderKind.ASC)); + } + return orderColumns; + } + + private static IPartitioningProperty getInternalDatasetPartitioningProperty(AqlDataSource ds, INodeDomain domain, + List<LogicalVariable> scanVariables, Set<LogicalVariable> pvars) { + IPartitioningProperty pp; + if (scanVariables.size() < 2) { + pp = new RandomPartitioningProperty(domain); + } else { + pvars.addAll(ds.getPrimaryKeyVariables(scanVariables)); + pp = new UnorderedPartitionedProperty(pvars, domain); + } + return pp; + } + + public static IPartitioningProperty getFeedPartitioningProperty(AqlDataSource ds, INodeDomain domain, + List<LogicalVariable> scanVariables) { + IPartitioningProperty pp; + if (scanVariables.size() < 2) { + pp = new RandomPartitioningProperty(domain); + } else { + Set<LogicalVariable> pvars = new ListSet<>(); + pvars.addAll(ds.getPrimaryKeyVariables(scanVariables)); + pp = new UnorderedPartitionedProperty(pvars, domain); + } + return pp; + } + +} \ No newline at end of file diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java index da54e32..72360b6 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java @@ -481,8 +481,7 @@ } ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType); - IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils. - computeFilterBinaryComparatorFactories(dataset, + IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, itemType, context.getBinaryComparatorFactoryProvider()); int[] filterFields = null; int[] btreeFields = null; @@ -497,10 +496,10 @@ } Pair<IBinaryComparatorFactory[], ITypeTraits[]> comparatorFactoriesAndTypeTraits = getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex( - secondaryIndex.getIndexType(), secondaryIndex.getKeyFieldNames(), - secondaryIndex.getKeyFieldTypes(), DatasetUtils.getPartitioningKeys(dataset), itemType, - dataset.getDatasetType(), dataset.hasMetaPart(), primaryKeyIndicators, - secondaryIndex.getKeyFieldSourceIndicators(), metaType); + secondaryIndex.getIndexType(), secondaryIndex.getKeyFieldNames(), + secondaryIndex.getKeyFieldTypes(), DatasetUtils.getPartitioningKeys(dataset), itemType, + dataset.getDatasetType(), dataset.hasMetaPart(), primaryKeyIndicators, + secondaryIndex.getKeyFieldSourceIndicators(), metaType); comparatorFactories = comparatorFactoriesAndTypeTraits.first; typeTraits = comparatorFactoriesAndTypeTraits.second; if (filterTypeTraits != null) { @@ -581,12 +580,12 @@ int[] buddyBreeFields = new int[] { numSecondaryKeys }; ExternalBTreeWithBuddyDataflowHelperFactory indexDataflowHelperFactory = new ExternalBTreeWithBuddyDataflowHelperFactory( - compactionInfo.first, compactionInfo.second, - new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), - AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, - LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE, - getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields, - ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp); + compactionInfo.first, compactionInfo.second, + new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()), + AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, + LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE, + getStorageProperties().getBloomFilterFalsePositiveRate(), buddyBreeFields, + ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp); btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider, rtcProvider, spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive, indexDataflowHelperFactory, retainInput, @@ -703,7 +702,7 @@ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext(); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = splitProviderAndPartitionConstraintsForDataset( - dataset.getDataverseName(), dataset.getDatasetName(), indexName, temp); + dataset.getDataverseName(), dataset.getDatasetName(), indexName, temp); ARecordType metaType = null; if (dataset.hasMetaPart()) { metaType = (ARecordType) findType(dataset.getMetaItemTypeDataverseName(), @@ -718,8 +717,7 @@ } ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType); - IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils. - computeFilterBinaryComparatorFactories(dataset, + IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, recType, context.getBinaryComparatorFactoryProvider()); int[] filterFields = null; int[] rtreeFields = null; @@ -864,30 +862,10 @@ IAType itemType = findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); IAType metaItemType = findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()); INodeDomain domain = findNodeDomain(dataset.getNodeGroupName()); - AqlDataSourceType datasourceType = dataset.getDatasetType().equals(DatasetType.EXTERNAL) + byte datasourceType = dataset.getDatasetType().equals(DatasetType.EXTERNAL) ? AqlDataSourceType.EXTERNAL_DATASET : AqlDataSourceType.INTERNAL_DATASET; return new DatasetDataSource(aqlId, dataset, itemType, metaItemType, datasourceType, dataset.getDatasetDetails(), domain); - } - - @Override - public boolean scannerOperatorIsLeaf(IDataSource<AqlSourceId> dataSource) { - boolean result = false; - switch (((AqlDataSource) dataSource).getDatasourceType()) { - case INTERNAL_DATASET: - case EXTERNAL_DATASET: - result = ((DatasetDataSource) dataSource).getDataset().getDatasetType() == DatasetType.EXTERNAL; - break; - case FEED: - result = true; - break; - case LOADABLE: - result = true; - break; - default: - break; - } - return result; } @Override @@ -946,14 +924,13 @@ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset( - dataSource.getId().getDataverseName(), datasetName, indexName, temp); + dataSource.getId().getDataverseName(), datasetName, indexName, temp); IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext(); long numElementsHint = getCardinalityPerPartitionHint(dataset); ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType); - IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils. - computeFilterBinaryComparatorFactories(dataset, + IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, itemType, context.getBinaryComparatorFactoryProvider()); int[] filterFields = DatasetUtils.createFilterFields(dataset); int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset); @@ -1037,7 +1014,7 @@ itemType, metaItemType, context.getBinaryComparatorFactoryProvider()); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset( - dataSource.getId().getDataverseName(), datasetName, indexName, temp); + dataSource.getId().getDataverseName(), datasetName, indexName, temp); // prepare callback JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); @@ -1359,7 +1336,7 @@ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset( - dataverseName, datasetName, indexName, dataset.getDatasetDetails().isTemp()); + dataverseName, datasetName, indexName, dataset.getDatasetDetails().isTemp()); // Generate Output Record format ISerializerDeserializer<?>[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields]; @@ -1491,8 +1468,7 @@ dataset.getDatasetName(), indexName); ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType); - IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils. - computeFilterBinaryComparatorFactories(dataset, + IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, recType, context.getBinaryComparatorFactoryProvider()); int[] filterFields = null; int[] btreeFields = null; @@ -1529,7 +1505,7 @@ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext(); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset( - dataverseName, datasetName, indexName, temp); + dataverseName, datasetName, indexName, temp); // prepare callback JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); @@ -1734,7 +1710,7 @@ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext(); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset( - dataverseName, datasetName, indexName, temp); + dataverseName, datasetName, indexName, temp); // prepare callback JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); @@ -1869,15 +1845,14 @@ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext(); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset( - dataverseName, datasetName, indexName, temp); + dataverseName, datasetName, indexName, temp); int[] btreeFields = new int[primaryComparatorFactories.length]; for (int k = 0; k < btreeFields.length; k++) { btreeFields[k] = k + numSecondaryKeys; } ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType); - IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils. - computeFilterBinaryComparatorFactories(dataset, + IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, recType, context.getBinaryComparatorFactoryProvider()); int[] filterFields = null; int[] rtreeFields = null; @@ -2216,7 +2191,7 @@ itemType, metaItemType, context.getBinaryComparatorFactoryProvider()); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset( - dataSource.getId().getDataverseName(), datasetName, indexName, temp); + dataSource.getId().getDataverseName(), datasetName, indexName, temp); // prepare callback JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); @@ -2227,8 +2202,7 @@ } ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, itemType); - IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils. - computeFilterBinaryComparatorFactories(dataset, + IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, itemType, context.getBinaryComparatorFactoryProvider()); int[] filterFields = DatasetUtils.createFilterFields(dataset); int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset); @@ -2512,8 +2486,7 @@ secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength()); ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType); - IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils. - computeFilterBinaryComparatorFactories(dataset, + IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, recType, context.getBinaryComparatorFactoryProvider()); int[] filterFields = null; @@ -2539,7 +2512,7 @@ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext(); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset( - dataverseName, datasetName, indexName, temp); + dataverseName, datasetName, indexName, temp); // prepare callback JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); @@ -2692,15 +2665,14 @@ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext(); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset( - dataverseName, datasetName, indexName, temp); + dataverseName, datasetName, indexName, temp); int[] btreeFields = new int[primaryComparatorFactories.length]; for (int k = 0; k < btreeFields.length; k++) { btreeFields[k] = k + numSecondaryKeys; } ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType); - IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils. - computeFilterBinaryComparatorFactories(dataset, + IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, recType, context.getBinaryComparatorFactoryProvider()); int[] filterFields = null; int[] rtreeFields = null; @@ -2827,8 +2799,7 @@ dataset.getDatasetName(), indexName); ITypeTraits[] filterTypeTraits = DatasetUtils.computeFilterTypeTraits(dataset, recType); - IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils. - computeFilterBinaryComparatorFactories(dataset, + IBinaryComparatorFactory[] filterCmpFactories = DatasetUtils.computeFilterBinaryComparatorFactories(dataset, recType, context.getBinaryComparatorFactoryProvider()); int[] filterFields = null; int[] btreeFields = null; @@ -2849,15 +2820,15 @@ Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i), secondaryKeyNames.get(i), recType); IAType keyType = keyPairType.first; - comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE. - getBinaryComparatorFactory(keyType, true); + comparatorFactories[i] = + AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true); typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType); } List<List<String>> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset); for (List<String> partitioningKey : partitioningKeys) { IAType keyType = recType.getSubFieldType(partitioningKey); - comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE. - getBinaryComparatorFactory(keyType, true); + comparatorFactories[i] = + AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true); typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType); ++i; } @@ -2865,7 +2836,7 @@ IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext(); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset( - dataverseName, datasetName, indexName, temp); + dataverseName, datasetName, indexName, temp); // prepare callback JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId(); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java index 5a601bc..9729c77 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java @@ -20,6 +20,7 @@ import java.util.List; +import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.external.api.IAdapterFactory; import org.apache.asterix.metadata.IDatasetDetails; import org.apache.asterix.metadata.MetadataManager; @@ -48,7 +49,7 @@ private Dataset dataset; public DatasetDataSource(AqlSourceId id, Dataset dataset, IAType itemType, IAType metaItemType, - AqlDataSourceType datasourceType, IDatasetDetails datasetDetails, INodeDomain datasetDomain) + byte datasourceType, IDatasetDetails datasetDetails, INodeDomain datasetDomain) throws AlgebricksException { super(id, itemType, metaItemType, datasourceType, datasetDomain); this.dataset = dataset; @@ -141,4 +142,9 @@ } } + @Override + public boolean isScanAccessPathALeaf() { + return dataset.getDatasetType() == DatasetType.EXTERNAL; + } + } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java index 0a81f03..d74bf9f 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java @@ -204,4 +204,9 @@ throw new AlgebricksException(e); } } + + @Override + public boolean isScanAccessPathALeaf() { + return true; + } } \ No newline at end of file diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java index 2ffaded..200a5a9 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java @@ -144,4 +144,9 @@ RecordDescriptor rDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); return aqlMetadataProvider.buildLoadableDatasetScan(jobSpec, adapterFactory, rDesc); } + + @Override + public boolean isScanAccessPathALeaf() { + return true; + } } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSource.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSource.java index 300033f..63ce5fa 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSource.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IDataSource.java @@ -31,4 +31,7 @@ public IDataSourcePropertiesProvider getPropertiesProvider(); public void computeFDs(List<LogicalVariable> scanVariables, List<FunctionalDependency> fdList); + + // https://issues.apache.org/jira/browse/ASTERIXDB-1619 + public boolean isScanAccessPathALeaf(); } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java index 8466ef9..68710a5 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java @@ -48,13 +48,11 @@ List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed, List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig) - throws AlgebricksException; - - public boolean scannerOperatorIsLeaf(IDataSource<S> dataSource); + throws AlgebricksException; public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink, int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) - throws AlgebricksException; + throws AlgebricksException; public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink, int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered, @@ -63,7 +61,7 @@ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields, JobGenContext context, JobSpecification jobSpec) - throws AlgebricksException; + throws AlgebricksException; public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys, diff --git a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletFileDataSource.java b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletFileDataSource.java index d395f3c..5b60022 100644 --- a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletFileDataSource.java +++ b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletFileDataSource.java @@ -77,4 +77,9 @@ @Override public void computeFDs(List<LogicalVariable> scanVariables, List<FunctionalDependency> fdList) { } + + @Override + public boolean isScanAccessPathALeaf() { + return true; + } } diff --git a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java index f586af7..7bcb1d6 100644 --- a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java +++ b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java @@ -83,7 +83,7 @@ List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed, List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig) - throws AlgebricksException { + throws AlgebricksException { PigletFileDataSource ds = (PigletFileDataSource) dataSource; FileSplit[] fileSplits = ds.getFileSplits(); @@ -133,14 +133,9 @@ } @Override - public boolean scannerOperatorIsLeaf(IDataSource<String> dataSource) { - return true; - } - - @Override public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink, int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) - throws AlgebricksException { + throws AlgebricksException { PigletFileDataSink ds = (PigletFileDataSink) sink; FileSplit[] fileSplits = ds.getFileSplits(); String[] locations = new String[fileSplits.length]; @@ -192,7 +187,7 @@ IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec) - throws AlgebricksException { + throws AlgebricksException { // TODO Auto-generated method stub return null; } diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java index 8701851..ddfb331 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java @@ -37,7 +37,6 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory; import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource; -import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator; @@ -269,8 +268,7 @@ DataSourceScanOperator scan = (DataSourceScanOperator) op; IDataSource dataSource = scan.getDataSource(); DataSourceScanPOperator dss = new DataSourceScanPOperator(dataSource); - IMetadataProvider mp = context.getMetadataProvider(); - if (mp.scannerOperatorIsLeaf(dataSource)) { + if (dataSource.isScanAccessPathALeaf()) { dss.disableJobGenBelowMe(); } op.setPhysicalOperator(dss); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java index 4cddef1..1272562 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java @@ -34,10 +34,7 @@ import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.hyracks.control.cc.work.TriggerNCWork; -import org.apache.hyracks.control.common.controllers.IniUtils; -import org.ini4j.Ini; -import org.xml.sax.InputSource; + import org.apache.hyracks.api.application.ICCApplicationEntryPoint; import org.apache.hyracks.api.client.ClusterControllerInfo; import org.apache.hyracks.api.client.HyracksClientInterfaceFunctions; @@ -86,10 +83,12 @@ import org.apache.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork; import org.apache.hyracks.control.cc.work.TaskCompleteWork; import org.apache.hyracks.control.cc.work.TaskFailureWork; +import org.apache.hyracks.control.cc.work.TriggerNCWork; import org.apache.hyracks.control.cc.work.UnregisterNodeWork; import org.apache.hyracks.control.cc.work.WaitForJobCompletionWork; import org.apache.hyracks.control.common.context.ServerContext; import org.apache.hyracks.control.common.controllers.CCConfig; +import org.apache.hyracks.control.common.controllers.IniUtils; import org.apache.hyracks.control.common.deployment.DeploymentRun; import org.apache.hyracks.control.common.ipc.CCNCFunctions; import org.apache.hyracks.control.common.ipc.CCNCFunctions.Function; @@ -105,6 +104,7 @@ import org.apache.hyracks.ipc.exceptions.IPCException; import org.apache.hyracks.ipc.impl.IPCSystem; import org.apache.hyracks.ipc.impl.JavaSerializationBasedPayloadSerializerDeserializer; +import org.ini4j.Ini; import org.xml.sax.InputSource; public class ClusterControllerService implements IControllerService { @@ -351,6 +351,7 @@ public Map<String, NodeControllerState> getNodeMap() { return nodeRegistry; } + public CCConfig getConfig() { return ccConfig; } @@ -406,21 +407,24 @@ } case GET_JOB_STATUS: { - HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn; + HyracksClientInterfaceFunctions.GetJobStatusFunction gjsf = + (HyracksClientInterfaceFunctions.GetJobStatusFunction) fn; workQueue.schedule(new GetJobStatusWork(ClusterControllerService.this, gjsf.getJobId(), new IPCResponder<JobStatus>(handle, mid))); return; } case GET_JOB_INFO: { - HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = (HyracksClientInterfaceFunctions.GetJobInfoFunction) fn; + HyracksClientInterfaceFunctions.GetJobInfoFunction gjsf = + (HyracksClientInterfaceFunctions.GetJobInfoFunction) fn; workQueue.schedule(new GetJobInfoWork(ClusterControllerService.this, gjsf.getJobId(), new IPCResponder<JobInfo>(handle, mid))); return; } case START_JOB: { - HyracksClientInterfaceFunctions.StartJobFunction sjf = (HyracksClientInterfaceFunctions.StartJobFunction) fn; + HyracksClientInterfaceFunctions.StartJobFunction sjf = + (HyracksClientInterfaceFunctions.StartJobFunction) fn; JobId jobId = createJobId(); workQueue.schedule(new JobStartWork(ClusterControllerService.this, sjf.getDeploymentId(), sjf.getACGGFBytes(), sjf.getJobFlags(), jobId, new IPCResponder<JobId>(handle, mid))); @@ -434,14 +438,16 @@ } case GET_DATASET_RESULT_STATUS: { - HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn; + HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = + (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn; workQueue.schedule(new GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(), gdrlf.getResultSetId(), new IPCResponder<Status>(handle, mid))); return; } case GET_DATASET_RESULT_LOCATIONS: { - HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn; + HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = + (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn; workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this, gdrlf.getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(), new IPCResponder<DatasetDirectoryRecord[]>(handle, mid))); @@ -449,7 +455,8 @@ } case WAIT_FOR_COMPLETION: { - HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn; + HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = + (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn; workQueue.schedule(new WaitForJobCompletionWork(ClusterControllerService.this, wfcf.getJobId(), new IPCResponder<Object>(handle, mid))); return; @@ -471,14 +478,16 @@ } case CLI_DEPLOY_BINARY: { - HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn; + HyracksClientInterfaceFunctions.CliDeployBinaryFunction dbf = + (HyracksClientInterfaceFunctions.CliDeployBinaryFunction) fn; workQueue.schedule(new CliDeployBinaryWork(ClusterControllerService.this, dbf.getBinaryURLs(), dbf.getDeploymentId(), new IPCResponder<DeploymentId>(handle, mid))); return; } case CLI_UNDEPLOY_BINARY: { - HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf = (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn; + HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction udbf = + (HyracksClientInterfaceFunctions.CliUnDeployBinaryFunction) fn; workQueue.schedule(new CliUnDeployBinaryWork(ClusterControllerService.this, udbf.getDeploymentId(), new IPCResponder<DeploymentId>(handle, mid))); return; @@ -556,21 +565,24 @@ } case REGISTER_PARTITION_PROVIDER: { - CCNCFunctions.RegisterPartitionProviderFunction rppf = (CCNCFunctions.RegisterPartitionProviderFunction) fn; + CCNCFunctions.RegisterPartitionProviderFunction rppf = + (CCNCFunctions.RegisterPartitionProviderFunction) fn; workQueue.schedule(new RegisterPartitionAvailibilityWork(ClusterControllerService.this, rppf.getPartitionDescriptor())); return; } case REGISTER_PARTITION_REQUEST: { - CCNCFunctions.RegisterPartitionRequestFunction rprf = (CCNCFunctions.RegisterPartitionRequestFunction) fn; + CCNCFunctions.RegisterPartitionRequestFunction rprf = + (CCNCFunctions.RegisterPartitionRequestFunction) fn; workQueue.schedule(new RegisterPartitionRequestWork(ClusterControllerService.this, rprf.getPartitionRequest())); return; } case REGISTER_RESULT_PARTITION_LOCATION: { - CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn; + CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = + (CCNCFunctions.RegisterResultPartitionLocationFunction) fn; workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this, rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getEmptyResult(), rrplf.getPartition(), rrplf.getNPartitions(), rrplf.getNetworkAddress())); @@ -578,21 +590,24 @@ } case REPORT_RESULT_PARTITION_WRITE_COMPLETION: { - CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrplf = (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn; + CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrplf = + (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn; workQueue.schedule(new ReportResultPartitionWriteCompletionWork(ClusterControllerService.this, rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition())); return; } case REPORT_RESULT_PARTITION_FAILURE: { - CCNCFunctions.ReportResultPartitionFailureFunction rrplf = (CCNCFunctions.ReportResultPartitionFailureFunction) fn; + CCNCFunctions.ReportResultPartitionFailureFunction rrplf = + (CCNCFunctions.ReportResultPartitionFailureFunction) fn; workQueue.schedule(new ReportResultPartitionFailureWork(ClusterControllerService.this, rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition())); return; } case SEND_APPLICATION_MESSAGE: { - CCNCFunctions.SendApplicationMessageFunction rsf = (CCNCFunctions.SendApplicationMessageFunction) fn; + CCNCFunctions.SendApplicationMessageFunction rsf = + (CCNCFunctions.SendApplicationMessageFunction) fn; workQueue.schedule(new ApplicationMessageWork(ClusterControllerService.this, rsf.getMessage(), rsf.getDeploymentId(), rsf.getNodeId())); return; @@ -609,7 +624,6 @@ @Override public void setException(Exception e) { - } })); return; -- To view, visit https://asterix-gerrit.ics.uci.edu/1118 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I260c8608329523f56dc54780d87d796f838505cf Gerrit-PatchSet: 6 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
