Xikui Wang has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2398

Change subject: [NO ISSUE][ING] Allow external UDF to use runtime parallelism
......................................................................

[NO ISSUE][ING] Allow external UDF to use runtime parallelism

- user model changes: no
- storage format changes: no
- interface changes:

Details:
1. Enable UDF in feed to use the runtime parallelism.
2. Fix the DefaultNodeDomain where the nodes should be mutliSet but not
list, for comparison purpose.

Change-Id: Ic3b54617be115f51b6a48b9a61581c26b5be8d9d
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
5 files changed, 29 insertions(+), 30 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/98/2398/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
index dfb73ee..c41601b 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
@@ -35,6 +35,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionExchangePOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
@@ -66,19 +67,9 @@
         }
 
         ExchangeOperator exchangeOp = new ExchangeOperator();
-        INodeDomain domain = new INodeDomain() {
-            @Override
-            public boolean sameAs(INodeDomain domain) {
-                return domain == this;
-            }
+        INodeDomain runtimeDomain = feedDataSource.getComputationNodeDomain();
 
-            @Override
-            public Integer cardinality() {
-                return feedDataSource.getComputeCardinality();
-            }
-        };
-
-        exchangeOp.setPhysicalOperator(new 
RandomPartitionExchangePOperator(domain));
+        exchangeOp.setPhysicalOperator(new 
RandomPartitionExchangePOperator(runtimeDomain));
         op.getInputs().get(0).setValue(exchangeOp);
         exchangeOp.getInputs().add(new 
MutableObject<ILogicalOperator>(scanOp));
         ExecutionMode em = ((AbstractLogicalOperator) 
scanOp).getExecutionMode();
@@ -88,8 +79,9 @@
 
         AssignOperator assignOp = (AssignOperator) opRef.getValue();
         AssignPOperator assignPhyOp = (AssignPOperator) 
assignOp.getPhysicalOperator();
-        assignPhyOp.setCardinalityConstraint(domain.cardinality());
-
+        DefaultNodeGroupDomain computationNode = (DefaultNodeGroupDomain) 
runtimeDomain;
+        String[] nodes = computationNode.getNodes();
+        assignPhyOp.setLocationConstraint(nodes);
         return true;
     }
 
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
index 883cd7a..0d46387 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
@@ -23,6 +23,8 @@
  */
 use externallibtest;
 
+SET `compiler.parallelism` "5";
+
 connect feed TweetFeed to dataset TweetsFeedIngest apply function 
`testlib#parseTweet`;
 
 start feed TweetFeed;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
index 325d23b..8115c2e 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
@@ -56,7 +56,7 @@
     private final FeedRuntimeType location;
     private final String targetDataset;
     private final String[] locations;
-    private final int computeCardinality;
+    private final INodeDomain computationNodeDomain;
     private final List<IAType> pkTypes;
     private final List<ScalarFunctionCallExpression> keyAccessExpression;
     private final FeedConnection feedConnection;
@@ -74,7 +74,7 @@
         this.locations = locations;
         this.pkTypes = pkTypes;
         this.keyAccessExpression = keyAccessExpression;
-        this.computeCardinality = 
appCtx.getClusterStateManager().getParticipantNodes().size();
+        this.computationNodeDomain = domain;
         this.feedConnection = feedConnection;
         initFeedDataSource();
     }
@@ -117,10 +117,6 @@
                 schemaTypes[i++] = type;
             }
         }
-    }
-
-    public int getComputeCardinality() {
-        return computeCardinality;
     }
 
     public List<IAType> getPkTypes() {
@@ -208,4 +204,8 @@
     public FeedConnection getFeedConnection() {
         return feedConnection;
     }
+
+    public INodeDomain getComputationNodeDomain() {
+        return computationNodeDomain;
+    }
 }
\ No newline at end of file
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
index 995f6e0..c2cc267 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
@@ -22,7 +22,7 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -44,7 +44,7 @@
 public class AssignPOperator extends AbstractPhysicalOperator {
 
     private boolean flushFramesRapidly;
-    private int cardinalityConstraint = 0;
+    private String[] locations;
 
     @Override
     public PhysicalOperatorTag getOperatorTag() {
@@ -93,10 +93,10 @@
 
         // contribute one Asterix framewriter
         RecordDescriptor recDesc = 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, 
context);
-        if (cardinalityConstraint > 0) {
-            AlgebricksCountPartitionConstraint countConstraint =
-                    new 
AlgebricksCountPartitionConstraint(cardinalityConstraint);
-            builder.contributeMicroOperator(assign, runtime, recDesc, 
countConstraint);
+        if (locations != null && locations.length > 0) {
+            AlgebricksAbsolutePartitionConstraint locationConstraint = new 
AlgebricksAbsolutePartitionConstraint(
+                    locations);
+            builder.contributeMicroOperator(assign, runtime, recDesc, 
locationConstraint);
         } else {
             builder.contributeMicroOperator(assign, runtime, recDesc);
         }
@@ -115,8 +115,8 @@
         this.flushFramesRapidly = flushFramesRapidly;
     }
 
-    public void setCardinalityConstraint(int cardinality) {
-        this.cardinalityConstraint = cardinality;
+    public void setLocationConstraint(String[] locations) {
+        this.locations = locations;
     }
 
     @Override
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
index 719c70e..a0ef64e 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
@@ -18,16 +18,17 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.properties;
 
-import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.collections4.MultiSet;
+import org.apache.commons.collections4.multiset.HashMultiSet;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
 
 public class DefaultNodeGroupDomain implements INodeDomain {
 
-    private List<String> nodes = new ArrayList<>();
+    private MultiSet<String> nodes = new HashMultiSet<>();
 
     public DefaultNodeGroupDomain(List<String> nodes) {
         this.nodes.addAll(nodes);
@@ -67,4 +68,8 @@
     public Integer cardinality() {
         return nodes.size();
     }
+
+    public String[] getNodes() {
+        return nodes.toArray(new String[0]);
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2398
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic3b54617be115f51b6a48b9a61581c26b5be8d9d
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>

Reply via email to