Xikui Wang has submitted this change and it was merged.

Change subject: Fix record loss for certain certain feed type
......................................................................


Fix record loss for certain certain feed type

1. Fix blindly replace connector between FeedCollector and
   AssignOperator.
2. Wrap AssignOperator into the FeedMetaOperator to make sure the
   operators inside (udf, accessor, etc.) can handle messages in the
   feed workflow.
3. Revise feed connection job merge function.
4. Test case fix.

Change-Id: I4e1f7dd3621482a11feb675a93b826ae2cb965a7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1652
Tested-by: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: abdullah alamoudi <[email protected]>
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
M 
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
M 
asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
7 files changed, 30 insertions(+), 29 deletions(-)

Approvals:
  abdullah alamoudi: Looks good to me, approved
  Jenkins: Verified; Verified

Objections:
  Jenkins: Violations found; Violations found



diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
index 1a0ecd9..dfb73ee 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
@@ -61,7 +61,7 @@
 
         final FeedDataSource feedDataSource = (FeedDataSource) dataSource;
         FeedConnection feedConnection = feedDataSource.getFeedConnection();
-        if (feedConnection.getAppliedFunctions() == null) {
+        if (feedConnection.getAppliedFunctions() == null || 
feedConnection.getAppliedFunctions().size() == 0) {
             return false;
         }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 4ea524a..874cbb1 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -80,6 +80,7 @@
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import 
org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.constraints.Constraint;
@@ -207,12 +208,15 @@
         Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations 
= new HashMap<>();
         Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>();
         List<JobId> jobIds = new ArrayList<>();
+        FeedMetaOperatorDescriptor metaOp;
 
         for (int iter1 = 0; iter1 < jobsList.size(); iter1++) {
             FeedConnection curFeedConnection = feedConnections.get(iter1);
             JobSpecification subJob = jobsList.get(iter1);
             operatorIdMapping.clear();
             Map<OperatorDescriptorId, IOperatorDescriptor> operatorsMap = 
subJob.getOperatorMap();
+            FeedConnectionId feedConnectionId = new 
FeedConnectionId(ingestionOp.getEntityId(),
+                    feedConnections.get(iter1).getDatasetName());
 
             FeedPolicyEntity feedPolicyEntity =
                     
FeedMetadataUtil.validateIfPolicyExists(curFeedConnection.getDataverseName(),
@@ -221,26 +225,36 @@
             for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> entry : 
operatorsMap.entrySet()) {
                 IOperatorDescriptor opDesc = entry.getValue();
                 OperatorDescriptorId oldId = opDesc.getOperatorId();
-                OperatorDescriptorId opId;
+                OperatorDescriptorId opId = null;
                 if (opDesc instanceof LSMTreeInsertDeleteOperatorDescriptor
                         && ((LSMTreeInsertDeleteOperatorDescriptor) 
opDesc).isPrimary()) {
                     String operandId = 
((LSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName();
-                    FeedMetaOperatorDescriptor metaOp = new 
FeedMetaOperatorDescriptor(jobSpec,
-                            new FeedConnectionId(ingestionOp.getEntityId(),
-                                    
feedConnections.get(iter1).getDatasetName()),
-                            opDesc, feedPolicyEntity.getProperties(), 
FeedRuntimeType.STORE, false, operandId);
+                    metaOp = new FeedMetaOperatorDescriptor(jobSpec,
+                            feedConnectionId, opDesc, 
feedPolicyEntity.getProperties(), FeedRuntimeType.STORE,
+                            operandId);
                     opId = metaOp.getOperatorId();
                     opDesc.setOperatorId(opId);
                 } else {
                     if (opDesc instanceof AlgebricksMetaOperatorDescriptor) {
                         AlgebricksMetaOperatorDescriptor algOp = 
(AlgebricksMetaOperatorDescriptor) opDesc;
-                        for (IPushRuntimeFactory runtimeFactory : 
algOp.getPipeline().getRuntimeFactories()) {
-                            if (runtimeFactory instanceof 
StreamSelectRuntimeFactory) {
-                                ((StreamSelectRuntimeFactory) 
runtimeFactory).retainMissing(true, 0);
+                        IPushRuntimeFactory[] runtimeFactories = 
algOp.getPipeline().getRuntimeFactories();
+                        // Tweak AssignOp to work with messages
+                        if (runtimeFactories[0] instanceof 
AssignRuntimeFactory && runtimeFactories.length > 1) {
+                            IConnectorDescriptor connectorDesc = 
subJob.getOperatorInputMap()
+                                    .get(opDesc.getOperatorId()).get(0);
+                            // anything on the network interface needs to be 
message compatible
+                            if (connectorDesc instanceof 
MToNPartitioningConnectorDescriptor) {
+                                metaOp = new 
FeedMetaOperatorDescriptor(jobSpec,
+                                        feedConnectionId, opDesc, 
feedPolicyEntity.getProperties(),
+                                        FeedRuntimeType.COMPUTE, null);
+                                opId = metaOp.getOperatorId();
+                                opDesc.setOperatorId(opId);
                             }
                         }
                     }
-                    opId = jobSpec.createOperatorDescriptorId(opDesc);
+                    if (opId == null) {
+                        opId = jobSpec.createOperatorDescriptorId(opDesc);
+                    }
                 }
                 operatorIdMapping.put(oldId, opId);
             }
@@ -250,9 +264,6 @@
             for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry : 
subJob.getConnectorMap().entrySet()) {
                 IConnectorDescriptor connDesc = entry.getValue();
                 ConnectorDescriptorId newConnId;
-                if (entry.getKey().getId() == 0) {
-                    continue;
-                }
                 if (connDesc instanceof MToNPartitioningConnectorDescriptor) {
                     MToNPartitioningConnectorDescriptor m2nConn = 
(MToNPartitioningConnectorDescriptor) connDesc;
                     connDesc = new 
MToNPartitioningWithMessageConnectorDescriptor(jobSpec,
@@ -277,11 +288,8 @@
                 if (leftOp.getLeft() instanceof FeedCollectOperatorDescriptor) 
{
                     jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), 
replicateOp, iter1, leftOpDesc,
                             leftOp.getRight());
-                    jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec), 
leftOpDesc, leftOp.getRight(),
-                            rightOpDesc, rightOp.getRight());
-                } else {
-                    jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(), 
rightOpDesc, rightOp.getRight());
                 }
+                jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(), 
rightOpDesc, rightOp.getRight());
             }
 
             // prepare for setting partition constraints
@@ -295,16 +303,10 @@
                 switch (lexpr.getTag()) {
                     case PARTITION_COUNT:
                         opId = ((PartitionCountExpression) 
lexpr).getOperatorDescriptorId();
-                        if (opId.getId() == 0) {
-                            continue;
-                        }
                         operatorCounts.put(operatorIdMapping.get(opId), (int) 
((ConstantExpression) cexpr).getValue());
                         break;
                     case PARTITION_LOCATION:
                         opId = ((PartitionLocationExpression) 
lexpr).getOperatorDescriptorId();
-                        if (opId.getId() == 0) {
-                            continue;
-                        }
                         IOperatorDescriptor opDesc = 
jobSpec.getOperatorMap().get(operatorIdMapping.get(opId));
                         List<LocationConstraint> locations = 
operatorLocations.get(opDesc.getOperatorId());
                         if (locations == null) {
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
index cbdc907..c31da8b 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
@@ -1 +1 @@
-788
+804
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
index cbdc907..14b76cb 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
@@ -1 +1 @@
-788
+804
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index c4cb650..97e5511 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -54,8 +54,7 @@
     private final FeedRuntimeType subscriptionLocation;
 
     public FeedCollectOperatorDescriptor(JobSpecification spec, 
FeedConnectionId feedConnectionId, ARecordType atype,
-            RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
-            FeedRuntimeType subscriptionLocation) {
+            RecordDescriptor rDesc, Map<String, String> feedPolicyProperties, 
FeedRuntimeType subscriptionLocation) {
         super(spec, 1, 1);
         this.recordDescriptors[0] = rDesc;
         this.outputType = atype;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
index d0d9f7b..cffd303 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
@@ -76,7 +76,7 @@
 
     public FeedMetaOperatorDescriptor(final JobSpecification spec, final 
FeedConnectionId feedConnectionId,
             final IOperatorDescriptor coreOperatorDescriptor, final 
Map<String, String> feedPolicyProperties,
-            final FeedRuntimeType runtimeType, final boolean 
enableSubscriptionMode, final String operandId) {
+            final FeedRuntimeType runtimeType, final String operandId) {
         super(spec, coreOperatorDescriptor.getInputArity(), 
coreOperatorDescriptor.getOutputArity());
         this.feedConnectionId = feedConnectionId;
         this.feedPolicyProperties = feedPolicyProperties;
diff --git 
a/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
 
b/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
index 2975e63..c31da8b 100644
--- 
a/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
+++ 
b/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
@@ -1 +1 @@
-788
\ No newline at end of file
+804
\ No newline at end of file

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1652
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I4e1f7dd3621482a11feb675a93b826ae2cb965a7
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Xikui Wang <[email protected]>
Gerrit-Reviewer: abdullah alamoudi <[email protected]>

Reply via email to