Xikui Wang has submitted this change and it was merged.

Change subject: [NO ISSUE][ING] Allow external UDF to use runtime parallelism
......................................................................


[NO ISSUE][ING] Allow external UDF to use runtime parallelism

- user model changes: no
- storage format changes: no
- interface changes:

Details:
1. Enable UDF in feed to use the runtime parallelism.
2. Fix the DefaultNodeDomain where the nodes should be mutliSet but not
list, for comparison purpose.

Change-Id: Ic3b54617be115f51b6a48b9a61581c26b5be8d9d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2398
Sonar-Qube: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Contrib: Jenkins <[email protected]>
Reviewed-by: Michael Blow <[email protected]>
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
M 
asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan
M 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
M hyracks-fullstack/algebricks/algebricks-core/pom.xml
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
8 files changed, 42 insertions(+), 40 deletions(-)

Approvals:
  Anon. E. Moose #1000171: 
  Jenkins: Verified; No violations found; 
  Michael Blow: Looks good to me, approved



diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
index dfb73ee..c41601b 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
@@ -35,6 +35,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionExchangePOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
@@ -66,19 +67,9 @@
         }
 
         ExchangeOperator exchangeOp = new ExchangeOperator();
-        INodeDomain domain = new INodeDomain() {
-            @Override
-            public boolean sameAs(INodeDomain domain) {
-                return domain == this;
-            }
+        INodeDomain runtimeDomain = feedDataSource.getComputationNodeDomain();
 
-            @Override
-            public Integer cardinality() {
-                return feedDataSource.getComputeCardinality();
-            }
-        };
-
-        exchangeOp.setPhysicalOperator(new 
RandomPartitionExchangePOperator(domain));
+        exchangeOp.setPhysicalOperator(new 
RandomPartitionExchangePOperator(runtimeDomain));
         op.getInputs().get(0).setValue(exchangeOp);
         exchangeOp.getInputs().add(new 
MutableObject<ILogicalOperator>(scanOp));
         ExecutionMode em = ((AbstractLogicalOperator) 
scanOp).getExecutionMode();
@@ -88,8 +79,9 @@
 
         AssignOperator assignOp = (AssignOperator) opRef.getValue();
         AssignPOperator assignPhyOp = (AssignPOperator) 
assignOp.getPhysicalOperator();
-        assignPhyOp.setCardinalityConstraint(domain.cardinality());
-
+        DefaultNodeGroupDomain computationNode = (DefaultNodeGroupDomain) 
runtimeDomain;
+        String[] nodes = computationNode.getNodes();
+        assignPhyOp.setLocationConstraint(nodes);
         return true;
     }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
index 05dd53e..51aca5d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
@@ -163,10 +163,10 @@
         } else {
             keyAccessScalarFunctionCallExpression = null;
         }
-        FeedDataSource feedDataSource = new FeedDataSource((MetadataProvider) 
context.getMetadataProvider(), sourceFeed,
-                aqlId, targetDataset, feedOutputType, metaType, pkTypes, 
keyAccessScalarFunctionCallExpression,
-                sourceFeed.getFeedId(), 
FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","),
-                context.getComputationNodeDomain(), feedConnection);
+        FeedDataSource feedDataSource = new FeedDataSource(sourceFeed, aqlId, 
targetDataset, feedOutputType, metaType,
+                pkTypes, keyAccessScalarFunctionCallExpression, 
sourceFeed.getFeedId(),
+                FeedRuntimeType.valueOf(subscriptionLocation), 
locations.split(","), context.getComputationNodeDomain(),
+                feedConnection);
         
feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, 
feedPolicy);
         return feedDataSource;
     }
diff --git 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan
 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan
index caa317d..5141e09 100644
--- 
a/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan
+++ 
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1343-2.plan
@@ -18,7 +18,7 @@
                   -- STREAM_PROJECT  |PARTITIONED|
                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                       -- HYBRID_HASH_JOIN [$$30][$$31]  |PARTITIONED|
-                        -- HASH_PARTITION_EXCHANGE [$$30]  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                           -- STREAM_PROJECT  |PARTITIONED|
                             -- STREAM_SELECT  |PARTITIONED|
                               -- STREAM_PROJECT  |PARTITIONED|
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
index 883cd7a..0d46387 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/feeds/feed-with-external-function/feed-with-external-function.3.update.sqlpp
@@ -23,6 +23,8 @@
  */
 use externallibtest;
 
+SET `compiler.parallelism` "5";
+
 connect feed TweetFeed to dataset TweetsFeedIngest apply function 
`testlib#parseTweet`;
 
 start feed TweetFeed;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
index 325d23b..5c3ed56 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FeedDataSource.java
@@ -56,17 +56,16 @@
     private final FeedRuntimeType location;
     private final String targetDataset;
     private final String[] locations;
-    private final int computeCardinality;
+    private final INodeDomain computationNodeDomain;
     private final List<IAType> pkTypes;
     private final List<ScalarFunctionCallExpression> keyAccessExpression;
     private final FeedConnection feedConnection;
 
-    public FeedDataSource(MetadataProvider metadataProvider, Feed feed, 
DataSourceId id, String targetDataset,
-            IAType itemType, IAType metaType, List<IAType> pkTypes,
-            List<ScalarFunctionCallExpression> keyAccessExpression, EntityId 
sourceFeedId, FeedRuntimeType location,
-            String[] locations, INodeDomain domain, FeedConnection 
feedConnection) throws AlgebricksException {
+    public FeedDataSource(Feed feed, DataSourceId id, String targetDataset, 
IAType itemType, IAType metaType,
+            List<IAType> pkTypes, List<ScalarFunctionCallExpression> 
keyAccessExpression, EntityId sourceFeedId,
+            FeedRuntimeType location, String[] locations, INodeDomain domain, 
FeedConnection feedConnection)
+            throws AlgebricksException {
         super(id, itemType, metaType, Type.FEED, domain);
-        ICcApplicationContext appCtx = 
metadataProvider.getApplicationContext();
         this.feed = feed;
         this.targetDataset = targetDataset;
         this.sourceFeedId = sourceFeedId;
@@ -74,7 +73,7 @@
         this.locations = locations;
         this.pkTypes = pkTypes;
         this.keyAccessExpression = keyAccessExpression;
-        this.computeCardinality = 
appCtx.getClusterStateManager().getParticipantNodes().size();
+        this.computationNodeDomain = domain;
         this.feedConnection = feedConnection;
         initFeedDataSource();
     }
@@ -117,10 +116,6 @@
                 schemaTypes[i++] = type;
             }
         }
-    }
-
-    public int getComputeCardinality() {
-        return computeCardinality;
     }
 
     public List<IAType> getPkTypes() {
@@ -208,4 +203,8 @@
     public FeedConnection getFeedConnection() {
         return feedConnection;
     }
+
+    public INodeDomain getComputationNodeDomain() {
+        return computationNodeDomain;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/algebricks/algebricks-core/pom.xml 
b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
index 9f7d2bd..669988c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-core/pom.xml
@@ -85,5 +85,9 @@
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-collections4</artifactId>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
index 995f6e0..ccd27f4 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AssignPOperator.java
@@ -22,7 +22,7 @@
 
 import org.apache.commons.lang3.mutable.Mutable;
 
-import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -44,7 +44,7 @@
 public class AssignPOperator extends AbstractPhysicalOperator {
 
     private boolean flushFramesRapidly;
-    private int cardinalityConstraint = 0;
+    private String[] locations;
 
     @Override
     public PhysicalOperatorTag getOperatorTag() {
@@ -93,10 +93,10 @@
 
         // contribute one Asterix framewriter
         RecordDescriptor recDesc = 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, 
context);
-        if (cardinalityConstraint > 0) {
-            AlgebricksCountPartitionConstraint countConstraint =
-                    new 
AlgebricksCountPartitionConstraint(cardinalityConstraint);
-            builder.contributeMicroOperator(assign, runtime, recDesc, 
countConstraint);
+        if (locations != null && locations.length > 0) {
+            AlgebricksAbsolutePartitionConstraint locationConstraint =
+                    new AlgebricksAbsolutePartitionConstraint(locations);
+            builder.contributeMicroOperator(assign, runtime, recDesc, 
locationConstraint);
         } else {
             builder.contributeMicroOperator(assign, runtime, recDesc);
         }
@@ -115,8 +115,8 @@
         this.flushFramesRapidly = flushFramesRapidly;
     }
 
-    public void setCardinalityConstraint(int cardinality) {
-        this.cardinalityConstraint = cardinality;
+    public void setLocationConstraint(String[] locations) {
+        this.locations = locations;
     }
 
     @Override
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
index 719c70e..a0ef64e 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/DefaultNodeGroupDomain.java
@@ -18,16 +18,17 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.properties;
 
-import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.collections4.MultiSet;
+import org.apache.commons.collections4.multiset.HashMultiSet;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint.PartitionConstraintType;
 
 public class DefaultNodeGroupDomain implements INodeDomain {
 
-    private List<String> nodes = new ArrayList<>();
+    private MultiSet<String> nodes = new HashMultiSet<>();
 
     public DefaultNodeGroupDomain(List<String> nodes) {
         this.nodes.addAll(nodes);
@@ -67,4 +68,8 @@
     public Integer cardinality() {
         return nodes.size();
     }
+
+    public String[] getNodes() {
+        return nodes.toArray(new String[0]);
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2398
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ic3b54617be115f51b6a48b9a61581c26b5be8d9d
Gerrit-PatchSet: 5
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>
Gerrit-Reviewer: Xikui Wang <[email protected]>
Gerrit-Reviewer: abdullah alamoudi <[email protected]>

Reply via email to