Xikui Wang has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1652

Change subject: WIP - Fix feed connection
......................................................................

WIP - Fix feed connection

Resubmit this patch for unknown compilation error

Change-Id: I4e1f7dd3621482a11feb675a93b826ae2cb965a7
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
M 
asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
7 files changed, 33 insertions(+), 27 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/52/1652/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
index 1a0ecd9..dfb73ee 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
@@ -61,7 +61,7 @@
 
         final FeedDataSource feedDataSource = (FeedDataSource) dataSource;
         FeedConnection feedConnection = feedDataSource.getFeedConnection();
-        if (feedConnection.getAppliedFunctions() == null) {
+        if (feedConnection.getAppliedFunctions() == null || 
feedConnection.getAppliedFunctions().size() == 0) {
             return false;
         }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 4ea524a..7e54395 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -80,6 +80,7 @@
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import 
org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.constraints.Constraint;
@@ -207,12 +208,15 @@
         Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations 
= new HashMap<>();
         Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>();
         List<JobId> jobIds = new ArrayList<>();
+        FeedMetaOperatorDescriptor metaOp;
 
         for (int iter1 = 0; iter1 < jobsList.size(); iter1++) {
             FeedConnection curFeedConnection = feedConnections.get(iter1);
             JobSpecification subJob = jobsList.get(iter1);
             operatorIdMapping.clear();
             Map<OperatorDescriptorId, IOperatorDescriptor> operatorsMap = 
subJob.getOperatorMap();
+            FeedConnectionId feedConnectionId = new 
FeedConnectionId(ingestionOp.getEntityId(),
+                    feedConnections.get(iter1).getDatasetName());
 
             FeedPolicyEntity feedPolicyEntity =
                     
FeedMetadataUtil.validateIfPolicyExists(curFeedConnection.getDataverseName(),
@@ -221,26 +225,41 @@
             for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> entry : 
operatorsMap.entrySet()) {
                 IOperatorDescriptor opDesc = entry.getValue();
                 OperatorDescriptorId oldId = opDesc.getOperatorId();
-                OperatorDescriptorId opId;
+                OperatorDescriptorId opId = null;
                 if (opDesc instanceof LSMTreeInsertDeleteOperatorDescriptor
                         && ((LSMTreeInsertDeleteOperatorDescriptor) 
opDesc).isPrimary()) {
                     String operandId = 
((LSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName();
-                    FeedMetaOperatorDescriptor metaOp = new 
FeedMetaOperatorDescriptor(jobSpec,
-                            new FeedConnectionId(ingestionOp.getEntityId(),
-                                    
feedConnections.get(iter1).getDatasetName()),
-                            opDesc, feedPolicyEntity.getProperties(), 
FeedRuntimeType.STORE, false, operandId);
+                    metaOp = new FeedMetaOperatorDescriptor(jobSpec,
+                            feedConnectionId, opDesc, 
feedPolicyEntity.getProperties(), FeedRuntimeType.STORE,
+                            operandId);
                     opId = metaOp.getOperatorId();
                     opDesc.setOperatorId(opId);
                 } else {
                     if (opDesc instanceof AlgebricksMetaOperatorDescriptor) {
                         AlgebricksMetaOperatorDescriptor algOp = 
(AlgebricksMetaOperatorDescriptor) opDesc;
-                        for (IPushRuntimeFactory runtimeFactory : 
algOp.getPipeline().getRuntimeFactories()) {
+                        IPushRuntimeFactory[] runtimeFactories = 
algOp.getPipeline().getRuntimeFactories();
+                        for (IPushRuntimeFactory runtimeFactory : 
runtimeFactories) {
                             if (runtimeFactory instanceof 
StreamSelectRuntimeFactory) {
                                 ((StreamSelectRuntimeFactory) 
runtimeFactory).retainMissing(true, 0);
                             }
                         }
+                        // Tweak AssignOp to work with messages
+                        if (runtimeFactories[0] instanceof 
AssignRuntimeFactory && runtimeFactories.length > 1) {
+                            IConnectorDescriptor connectorDesc = 
subJob.getOperatorInputMap()
+                                    .get(opDesc.getOperatorId()).get(0);
+                            // anything on the network interface needs to be 
message compatible
+                            if (connectorDesc instanceof 
MToNPartitioningConnectorDescriptor) {
+                                metaOp = new 
FeedMetaOperatorDescriptor(jobSpec,
+                                        feedConnectionId, opDesc, 
feedPolicyEntity.getProperties(),
+                                        FeedRuntimeType.COMPUTE, null);
+                                opId = metaOp.getOperatorId();
+                                opDesc.setOperatorId(opId);
+                            }
+                        }
                     }
-                    opId = jobSpec.createOperatorDescriptorId(opDesc);
+                    if (opId == null) {
+                        opId = jobSpec.createOperatorDescriptorId(opDesc);
+                    }
                 }
                 operatorIdMapping.put(oldId, opId);
             }
@@ -250,9 +269,6 @@
             for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : 
subJob.getConnectorMap().entrySet()) {
                 IConnectorDescriptor connDesc = entry.getValue();
                 ConnectorDescriptorId newConnId;
-                if (entry.getKey().getId() == 0) {
-                    continue;
-                }
                 if (connDesc instanceof MToNPartitioningConnectorDescriptor) {
                     MToNPartitioningConnectorDescriptor m2nConn = 
(MToNPartitioningConnectorDescriptor) connDesc;
                     connDesc = new 
MToNPartitioningWithMessageConnectorDescriptor(jobSpec,
@@ -277,11 +293,8 @@
                 if (leftOp.getLeft() instanceof FeedCollectOperatorDescriptor) 
{
                     jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), 
replicateOp, iter1, leftOpDesc,
                             leftOp.getRight());
-                    jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), 
leftOpDesc, leftOp.getRight(),
-                            rightOpDesc, rightOp.getRight());
-                } else {
-                    jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(), 
rightOpDesc, rightOp.getRight());
                 }
+                jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(), 
rightOpDesc, rightOp.getRight());
             }
 
             // prepare for setting partition constraints
@@ -295,16 +308,10 @@
                 switch (lexpr.getTag()) {
                     case PARTITION_COUNT:
                         opId = ((PartitionCountExpression) 
lexpr).getOperatorDescriptorId();
-                        if (opId.getId() == 0) {
-                            continue;
-                        }
                         operatorCounts.put(operatorIdMapping.get(opId), (int) 
((ConstantExpression) cexpr).getValue());
                         break;
                     case PARTITION_LOCATION:
                         opId = ((PartitionLocationExpression) 
lexpr).getOperatorDescriptorId();
-                        if (opId.getId() == 0) {
-                            continue;
-                        }
                         IOperatorDescriptor opDesc = 
jobSpec.getOperatorMap().get(operatorIdMapping.get(opId));
                         List<LocationConstraint> locations = 
operatorLocations.get(opDesc.getOperatorId());
                         if (locations == null) {
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
index cbdc907..c31da8b 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
@@ -1 +1 @@
-788
+804
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
index cbdc907..14b76cb 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
@@ -1 +1 @@
-788
+804
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index c4cb650..97e5511 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -54,8 +54,7 @@
     private final FeedRuntimeType subscriptionLocation;
 
     public FeedCollectOperatorDescriptor(JobSpecification spec, 
FeedConnectionId feedConnectionId, ARecordType atype,
-            RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
-            FeedRuntimeType subscriptionLocation) {
+            RecordDescriptor rDesc, Map<String, String> feedPolicyProperties, 
FeedRuntimeType subscriptionLocation) {
         super(spec, 1, 1);
         this.recordDescriptors[0] = rDesc;
         this.outputType = atype;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
index d0d9f7b..cffd303 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
@@ -76,7 +76,7 @@
 
     public FeedMetaOperatorDescriptor(final JobSpecification spec, final 
FeedConnectionId feedConnectionId,
             final IOperatorDescriptor coreOperatorDescriptor, final 
Map<String, String> feedPolicyProperties,
-            final FeedRuntimeType runtimeType, final boolean 
enableSubscriptionMode, final String operandId) {
+            final FeedRuntimeType runtimeType, final String operandId) {
         super(spec, coreOperatorDescriptor.getInputArity(), 
coreOperatorDescriptor.getOutputArity());
         this.feedConnectionId = feedConnectionId;
         this.feedPolicyProperties = feedPolicyProperties;
diff --git 
a/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
 
b/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
index 2975e63..c31da8b 100644
--- 
a/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
+++ 
b/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
@@ -1 +1 @@
-788
\ No newline at end of file
+804
\ No newline at end of file

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1652
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I4e1f7dd3621482a11feb675a93b826ae2cb965a7
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>

Reply via email to