Xikui Wang has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1652
Change subject: WIP - Fix feed connection
......................................................................
WIP - Fix feed connection
Resubmit this patch for unknown compilation error
Change-Id: I4e1f7dd3621482a11feb675a93b826ae2cb965a7
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
M
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
M
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
M
asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
7 files changed, 33 insertions(+), 27 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/52/1652/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
index 1a0ecd9..dfb73ee 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceRandomPartitioningFeedComputationRule.java
@@ -61,7 +61,7 @@
final FeedDataSource feedDataSource = (FeedDataSource) dataSource;
FeedConnection feedConnection = feedDataSource.getFeedConnection();
- if (feedConnection.getAppliedFunctions() == null) {
+ if (feedConnection.getAppliedFunctions() == null ||
feedConnection.getAppliedFunctions().size() == 0) {
return false;
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 4ea524a..7e54395 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -80,6 +80,7 @@
import org.apache.hyracks.algebricks.common.utils.Triple;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import
org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
import
org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.constraints.Constraint;
@@ -207,12 +208,15 @@
Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations
= new HashMap<>();
Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>();
List<JobId> jobIds = new ArrayList<>();
+ FeedMetaOperatorDescriptor metaOp;
for (int iter1 = 0; iter1 < jobsList.size(); iter1++) {
FeedConnection curFeedConnection = feedConnections.get(iter1);
JobSpecification subJob = jobsList.get(iter1);
operatorIdMapping.clear();
Map<OperatorDescriptorId, IOperatorDescriptor> operatorsMap =
subJob.getOperatorMap();
+ FeedConnectionId feedConnectionId = new
FeedConnectionId(ingestionOp.getEntityId(),
+ feedConnections.get(iter1).getDatasetName());
FeedPolicyEntity feedPolicyEntity =
FeedMetadataUtil.validateIfPolicyExists(curFeedConnection.getDataverseName(),
@@ -221,26 +225,41 @@
for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> entry :
operatorsMap.entrySet()) {
IOperatorDescriptor opDesc = entry.getValue();
OperatorDescriptorId oldId = opDesc.getOperatorId();
- OperatorDescriptorId opId;
+ OperatorDescriptorId opId = null;
if (opDesc instanceof LSMTreeInsertDeleteOperatorDescriptor
&& ((LSMTreeInsertDeleteOperatorDescriptor)
opDesc).isPrimary()) {
String operandId =
((LSMTreeInsertDeleteOperatorDescriptor) opDesc).getIndexName();
- FeedMetaOperatorDescriptor metaOp = new
FeedMetaOperatorDescriptor(jobSpec,
- new FeedConnectionId(ingestionOp.getEntityId(),
-
feedConnections.get(iter1).getDatasetName()),
- opDesc, feedPolicyEntity.getProperties(),
FeedRuntimeType.STORE, false, operandId);
+ metaOp = new FeedMetaOperatorDescriptor(jobSpec,
+ feedConnectionId, opDesc,
feedPolicyEntity.getProperties(), FeedRuntimeType.STORE,
+ operandId);
opId = metaOp.getOperatorId();
opDesc.setOperatorId(opId);
} else {
if (opDesc instanceof AlgebricksMetaOperatorDescriptor) {
AlgebricksMetaOperatorDescriptor algOp =
(AlgebricksMetaOperatorDescriptor) opDesc;
- for (IPushRuntimeFactory runtimeFactory :
algOp.getPipeline().getRuntimeFactories()) {
+ IPushRuntimeFactory[] runtimeFactories =
algOp.getPipeline().getRuntimeFactories();
+ for (IPushRuntimeFactory runtimeFactory :
runtimeFactories) {
if (runtimeFactory instanceof
StreamSelectRuntimeFactory) {
((StreamSelectRuntimeFactory)
runtimeFactory).retainMissing(true, 0);
}
}
+ // Tweak AssignOp to work with messages
+ if (runtimeFactories[0] instanceof
AssignRuntimeFactory && runtimeFactories.length > 1) {
+ IConnectorDescriptor connectorDesc =
subJob.getOperatorInputMap()
+ .get(opDesc.getOperatorId()).get(0);
+ // anything on the network interface needs to be
message compatible
+ if (connectorDesc instanceof
MToNPartitioningConnectorDescriptor) {
+ metaOp = new
FeedMetaOperatorDescriptor(jobSpec,
+ feedConnectionId, opDesc,
feedPolicyEntity.getProperties(),
+ FeedRuntimeType.COMPUTE, null);
+ opId = metaOp.getOperatorId();
+ opDesc.setOperatorId(opId);
+ }
+ }
}
- opId = jobSpec.createOperatorDescriptorId(opDesc);
+ if (opId == null) {
+ opId = jobSpec.createOperatorDescriptorId(opDesc);
+ }
}
operatorIdMapping.put(oldId, opId);
}
@@ -250,9 +269,6 @@
for (Entry<ConnectorDescriptorId, IConnectorDescriptor> entry :
subJob.getConnectorMap().entrySet()) {
IConnectorDescriptor connDesc = entry.getValue();
ConnectorDescriptorId newConnId;
- if (entry.getKey().getId() == 0) {
- continue;
- }
if (connDesc instanceof MToNPartitioningConnectorDescriptor) {
MToNPartitioningConnectorDescriptor m2nConn =
(MToNPartitioningConnectorDescriptor) connDesc;
connDesc = new
MToNPartitioningWithMessageConnectorDescriptor(jobSpec,
@@ -277,11 +293,8 @@
if (leftOp.getLeft() instanceof FeedCollectOperatorDescriptor)
{
jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec),
replicateOp, iter1, leftOpDesc,
leftOp.getRight());
- jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec),
leftOpDesc, leftOp.getRight(),
- rightOpDesc, rightOp.getRight());
- } else {
- jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(),
rightOpDesc, rightOp.getRight());
}
+ jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(),
rightOpDesc, rightOp.getRight());
}
// prepare for setting partition constraints
@@ -295,16 +308,10 @@
switch (lexpr.getTag()) {
case PARTITION_COUNT:
opId = ((PartitionCountExpression)
lexpr).getOperatorDescriptorId();
- if (opId.getId() == 0) {
- continue;
- }
operatorCounts.put(operatorIdMapping.get(opId), (int)
((ConstantExpression) cexpr).getValue());
break;
case PARTITION_LOCATION:
opId = ((PartitionLocationExpression)
lexpr).getOperatorDescriptorId();
- if (opId.getId() == 0) {
- continue;
- }
IOperatorDescriptor opDesc =
jobSpec.getOperatorMap().get(operatorIdMapping.get(opId));
List<LocationConstraint> locations =
operatorLocations.get(opDesc.getOperatorId());
if (locations == null) {
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
index cbdc907..c31da8b 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.adm
@@ -1 +1 @@
-788
+804
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
index cbdc907..14b76cb 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.adm
@@ -1 +1 @@
-788
+804
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
index c4cb650..97e5511 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java
@@ -54,8 +54,7 @@
private final FeedRuntimeType subscriptionLocation;
public FeedCollectOperatorDescriptor(JobSpecification spec,
FeedConnectionId feedConnectionId, ARecordType atype,
- RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
- FeedRuntimeType subscriptionLocation) {
+ RecordDescriptor rDesc, Map<String, String> feedPolicyProperties,
FeedRuntimeType subscriptionLocation) {
super(spec, 1, 1);
this.recordDescriptors[0] = rDesc;
this.outputType = atype;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
index d0d9f7b..cffd303 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaOperatorDescriptor.java
@@ -76,7 +76,7 @@
public FeedMetaOperatorDescriptor(final JobSpecification spec, final
FeedConnectionId feedConnectionId,
final IOperatorDescriptor coreOperatorDescriptor, final
Map<String, String> feedPolicyProperties,
- final FeedRuntimeType runtimeType, final boolean
enableSubscriptionMode, final String operandId) {
+ final FeedRuntimeType runtimeType, final String operandId) {
super(spec, coreOperatorDescriptor.getInputArity(),
coreOperatorDescriptor.getOutputArity());
this.feedConnectionId = feedConnectionId;
this.feedPolicyProperties = feedPolicyProperties;
diff --git
a/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
b/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
index 2975e63..c31da8b 100644
---
a/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
+++
b/asterixdb/asterix-installer/src/test/resources/transactionts/results/query_after_restart/dataset-with-meta-record/dataset-with-meta-record.5.adm
@@ -1 +1 @@
-788
\ No newline at end of file
+804
\ No newline at end of file
--
To view, visit https://asterix-gerrit.ics.uci.edu/1652
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I4e1f7dd3621482a11feb675a93b826ae2cb965a7
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>