Xikui Wang has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2398
Change subject: [NO ISSUE][ING] Allow external UDF to use runtime parallelism
......................................................................
[NO ISSUE][ING] Allow external UDF to use runtime parallelism
- user model changes: no
- storage format changes: no
- interface changes:
Details:
1. Enable UDF in feed to use the runtime parallelism.
2. Fix the DefaultNodeDomain where the nodes should be mutliSet but not
list, for comparison purpose.
Change-Id: Ic3b54617be115f51b6a48b9a61581c26b5be8d9d
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
M
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
5 files changed, 29 insertions(+), 30 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/98/2398/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
index dfb73ee..c41601b 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
@@ -35,6 +35,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionExchangePOperator;
+import
org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -66,19 +67,9 @@
}
ExchangeOperator exchangeOp = new ExchangeOperator();
- INodeDomain domain = new INodeDomain() {
- @Override
- public boolean sameAs(INodeDomain domain) {
- return domain == this;
- }
+ INodeDomain runtimeDomain = feedDataSource.getComputationNodeDomain();
- @Override
- public Integer cardinality() {
- return feedDataSource.getComputeCardinality();
- }
- };
-
- exchangeOp.setPhysicalOperator(new
RandomPartitionExchangePOperator(domain));
+ exchangeOp.setPhysicalOperator(new
RandomPartitionExchangePOperator(runtimeDomain));
op.getInputs().get(0).setValue(exchangeOp);
exchangeOp.getInputs().add(new
MutableObject<ILogicalOperator>(scanOp));
ExecutionMode em = ((AbstractLogicalOperator)
scanOp).getExecutionMode();
@@ -88,8 +79,9 @@
AssignOperator assignOp = (AssignOperator) opRef.getValue();
AssignPOperator assignPhyOp = (AssignPOperator)
assignOp.getPhysicalOperator();
- assignPhyOp.setCardinalityConstraint(domain.cardinality());
-
+ DefaultNodeGroupDomain computationNode = (DefaultNodeGroupDomain)
runtimeDomain;
+ String[] nodes = computationNode.getNodes();
+ assignPhyOp.setLocationConstraint(nodes);
return true;
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
index 883cd7a..0d46387 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
@@ -23,6 +23,8 @@
*/
use externallibtest;
+SET `compiler.parallelism` "5";
+
connect feed TweetFeed to dataset TweetsFeedIngest apply function
`testlib#parseTweet`;
start feed TweetFeed;
\ No newline at end of file
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
index 325d23b..8115c2e 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
@@ -56,7 +56,7 @@
private final FeedRuntimeType location;
private final String targetDataset;
private final String[] locations;
- private final int computeCardinality;
+ private final INodeDomain computationNodeDomain;
private final List<IAType> pkTypes;
private final List<ScalarFunctionCallExpression> keyAccessExpression;
private final FeedConnection feedConnection;
@@ -74,7 +74,7 @@
this.locations = locations;
this.pkTypes = pkTypes;
this.keyAccessExpression = keyAccessExpression;
- this.computeCardinality =
appCtx.getClusterStateManager().getParticipantNodes().size();
+ this.computationNodeDomain = domain;
this.feedConnection = feedConnection;
initFeedDataSource();
}
@@ -117,10 +117,6 @@
schemaTypes[i++] = type;
}
}
- }
-
- public int getComputeCardinality() {
- return computeCardinality;
}
public List<IAType> getPkTypes() {
@@ -208,4 +204,8 @@
public FeedConnection getFeedConnection() {
return feedConnection;
}
+
+ public INodeDomain getComputationNodeDomain() {
+ return computationNodeDomain;
+ }
}
\ No newline at end of file
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
index 995f6e0..c2cc267 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
@@ -22,7 +22,7 @@
import org.apache.commons.lang3.mutable.Mutable;
-import
org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -44,7 +44,7 @@
public class AssignPOperator extends AbstractPhysicalOperator {
private boolean flushFramesRapidly;
- private int cardinalityConstraint = 0;
+ private String[] locations;
@Override
public PhysicalOperatorTag getOperatorTag() {
@@ -93,10 +93,10 @@
// contribute one Asterix framewriter
RecordDescriptor recDesc =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
context);
- if (cardinalityConstraint > 0) {
- AlgebricksCountPartitionConstraint countConstraint =
- new
AlgebricksCountPartitionConstraint(cardinalityConstraint);
- builder.contributeMicroOperator(assign, runtime, recDesc,
countConstraint);
+ if (locations != null && locations.length > 0) {
+ AlgebricksAbsolutePartitionConstraint locationConstraint = new
AlgebricksAbsolutePartitionConstraint(
+ locations);
+ builder.contributeMicroOperator(assign, runtime, recDesc,
locationConstraint);
} else {
builder.contributeMicroOperator(assign, runtime, recDesc);
}
@@ -115,8 +115,8 @@
this.flushFramesRapidly = flushFramesRapidly;
}
- public void setCardinalityConstraint(int cardinality) {
- this.cardinalityConstraint = cardinality;
+ public void setLocationConstraint(String[] locations) {
+ this.locations = locations;
}
@Override
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
index 719c70e..a0ef64e 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
@@ -18,16 +18,17 @@
*/
package org.apache.hyracks.algebricks.core.algebra.properties;
-import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.collections4.MultiSet;
+import org.apache.commons.collections4.multiset.HashMultiSet;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
public class DefaultNodeGroupDomain implements INodeDomain {
- private List<String> nodes = new ArrayList<>();
+ private MultiSet<String> nodes = new HashMultiSet<>();
public DefaultNodeGroupDomain(List<String> nodes) {
this.nodes.addAll(nodes);
@@ -67,4 +68,8 @@
public Integer cardinality() {
return nodes.size();
}
+
+ public String[] getNodes() {
+ return nodes.toArray(new String[0]);
+ }
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2398
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic3b54617be115f51b6a48b9a61581c26b5be8d9d
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>