Xikui Wang has submitted this change and it was merged. Change subject: [NO ISSUE][ING] Allow external UDF to use runtime parallelism ......................................................................
[NO ISSUE][ING] Allow external UDF to use runtime parallelism - user model changes: no - storage format changes: no - interface changes: Details: 1. Enable UDF in feed to use the runtime parallelism. 2. Fix the DefaultNodeDomain where the nodes should be mutliSet but not list, for comparison purpose. Change-Id: Ic3b54617be115f51b6a48b9a61581c26b5be8d9d Reviewed-on: https://asterix-gerrit.ics.uci.edu/2398 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java M asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java M hyracks-fullstack/algebricks/algebricks-core/pom.xml M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java 8 files changed, 42 insertions(+), 40 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; No violations found; Michael Blow: Looks good to me, approved diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java index dfb73ee..c41601b 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java @@ -35,6 +35,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator; import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionExchangePOperator; +import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain; import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; @@ -66,19 +67,9 @@ } ExchangeOperator exchangeOp = new ExchangeOperator(); - INodeDomain domain = new INodeDomain() { - @Override - public boolean sameAs(INodeDomain domain) { - return domain == this; - } + INodeDomain runtimeDomain = feedDataSource.getComputationNodeDomain(); - @Override - public Integer cardinality() { - return feedDataSource.getComputeCardinality(); - } - }; - - exchangeOp.setPhysicalOperator(new RandomPartitionExchangePOperator(domain)); + exchangeOp.setPhysicalOperator(new RandomPartitionExchangePOperator(runtimeDomain)); op.getInputs().get(0).setValue(exchangeOp); exchangeOp.getInputs().add(new MutableObject<ILogicalOperator>(scanOp)); ExecutionMode em = ((AbstractLogicalOperator) scanOp).getExecutionMode(); @@ -88,8 +79,9 @@ AssignOperator assignOp = (AssignOperator) opRef.getValue(); AssignPOperator assignPhyOp = (AssignPOperator) assignOp.getPhysicalOperator(); - assignPhyOp.setCardinalityConstraint(domain.cardinality()); - + DefaultNodeGroupDomain computationNode = (DefaultNodeGroupDomain) runtimeDomain; + String[] nodes = computationNode.getNodes(); + assignPhyOp.setLocationConstraint(nodes); return true; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java index 05dd53e..51aca5d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java @@ -163,10 +163,10 @@ } else { keyAccessScalarFunctionCallExpression = null; } - FeedDataSource feedDataSource = new FeedDataSource((MetadataProvider) context.getMetadataProvider(), sourceFeed, - aqlId, targetDataset, feedOutputType, metaType, pkTypes, keyAccessScalarFunctionCallExpression, - sourceFeed.getFeedId(), FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","), - context.getComputationNodeDomain(), feedConnection); + FeedDataSource feedDataSource = new FeedDataSource(sourceFeed, aqlId, targetDataset, feedOutputType, metaType, + pkTypes, keyAccessScalarFunctionCallExpression, sourceFeed.getFeedId(), + FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","), context.getComputationNodeDomain(), + feedConnection); feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy); return feedDataSource; } diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan index caa317d..5141e09 100644 --- a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan +++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan @@ -18,7 +18,7 @@ -- STREAM_PROJECT |PARTITIONED| -- ONE_TO_ONE_EXCHANGE |PARTITIONED| -- HYBRID_HASH_JOIN [$$30][$$31] |PARTITIONED| - -- HASH_PARTITION_EXCHANGE [$$30] |PARTITIONED| + -- ONE_TO_ONE_EXCHANGE |PARTITIONED| -- STREAM_PROJECT |PARTITIONED| -- STREAM_SELECT |PARTITIONED| -- STREAM_PROJECT |PARTITIONED| diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp index 883cd7a..0d46387 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp @@ -23,6 +23,8 @@ */ use externallibtest; +SET `compiler.parallelism` "5"; + connect feed TweetFeed to dataset TweetsFeedIngest apply function `testlib#parseTweet`; start feed TweetFeed; \ No newline at end of file diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java index 325d23b..5c3ed56 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java @@ -56,17 +56,16 @@ private final FeedRuntimeType location; private final String targetDataset; private final String[] locations; - private final int computeCardinality; + private final INodeDomain computationNodeDomain; private final List<IAType> pkTypes; private final List<ScalarFunctionCallExpression> keyAccessExpression; private final FeedConnection feedConnection; - public FeedDataSource(MetadataProvider metadataProvider, Feed feed, DataSourceId id, String targetDataset, - IAType itemType, IAType metaType, List<IAType> pkTypes, - List<ScalarFunctionCallExpression> keyAccessExpression, EntityId sourceFeedId, FeedRuntimeType location, - String[] locations, INodeDomain domain, FeedConnection feedConnection) throws AlgebricksException { + public FeedDataSource(Feed feed, DataSourceId id, String targetDataset, IAType itemType, IAType metaType, + List<IAType> pkTypes, List<ScalarFunctionCallExpression> keyAccessExpression, EntityId sourceFeedId, + FeedRuntimeType location, String[] locations, INodeDomain domain, FeedConnection feedConnection) + throws AlgebricksException { super(id, itemType, metaType, Type.FEED, domain); - ICcApplicationContext appCtx = metadataProvider.getApplicationContext(); this.feed = feed; this.targetDataset = targetDataset; this.sourceFeedId = sourceFeedId; @@ -74,7 +73,7 @@ this.locations = locations; this.pkTypes = pkTypes; this.keyAccessExpression = keyAccessExpression; - this.computeCardinality = appCtx.getClusterStateManager().getParticipantNodes().size(); + this.computationNodeDomain = domain; this.feedConnection = feedConnection; initFeedDataSource(); } @@ -117,10 +116,6 @@ schemaTypes[i++] = type; } } - } - - public int getComputeCardinality() { - return computeCardinality; } public List<IAType> getPkTypes() { @@ -208,4 +203,8 @@ public FeedConnection getFeedConnection() { return feedConnection; } + + public INodeDomain getComputationNodeDomain() { + return computationNodeDomain; + } } \ No newline at end of file diff --git a/hyracks-fullstack/algebricks/algebricks-core/pom.xml b/hyracks-fullstack/algebricks/algebricks-core/pom.xml index 9f7d2bd..669988c 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/pom.xml +++ b/hyracks-fullstack/algebricks/algebricks-core/pom.xml @@ -85,5 +85,9 @@ <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-collections4</artifactId> + </dependency> </dependencies> </project> diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java index 995f6e0..ccd27f4 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java @@ -22,7 +22,7 @@ import org.apache.commons.lang3.mutable.Mutable; -import org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; @@ -44,7 +44,7 @@ public class AssignPOperator extends AbstractPhysicalOperator { private boolean flushFramesRapidly; - private int cardinalityConstraint = 0; + private String[] locations; @Override public PhysicalOperatorTag getOperatorTag() { @@ -93,10 +93,10 @@ // contribute one Asterix framewriter RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context); - if (cardinalityConstraint > 0) { - AlgebricksCountPartitionConstraint countConstraint = - new AlgebricksCountPartitionConstraint(cardinalityConstraint); - builder.contributeMicroOperator(assign, runtime, recDesc, countConstraint); + if (locations != null && locations.length > 0) { + AlgebricksAbsolutePartitionConstraint locationConstraint = + new AlgebricksAbsolutePartitionConstraint(locations); + builder.contributeMicroOperator(assign, runtime, recDesc, locationConstraint); } else { builder.contributeMicroOperator(assign, runtime, recDesc); } @@ -115,8 +115,8 @@ this.flushFramesRapidly = flushFramesRapidly; } - public void setCardinalityConstraint(int cardinality) { - this.cardinalityConstraint = cardinality; + public void setLocationConstraint(String[] locations) { + this.locations = locations; } @Override diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java index 719c70e..a0ef64e 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java @@ -18,16 +18,17 @@ */ package org.apache.hyracks.algebricks.core.algebra.properties; -import java.util.ArrayList; import java.util.List; +import org.apache.commons.collections4.MultiSet; +import org.apache.commons.collections4.multiset.HashMultiSet; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType; public class DefaultNodeGroupDomain implements INodeDomain { - private List<String> nodes = new ArrayList<>(); + private MultiSet<String> nodes = new HashMultiSet<>(); public DefaultNodeGroupDomain(List<String> nodes) { this.nodes.addAll(nodes); @@ -67,4 +68,8 @@ public Integer cardinality() { return nodes.size(); } + + public String[] getNodes() { + return nodes.toArray(new String[0]); + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/2398 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ic3b54617be115f51b6a48b9a61581c26b5be8d9d Gerrit-PatchSet: 5 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Xikui Wang <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Xikui Wang <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
