Xikui Wang has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1755
Change subject: ASTERIXDB-1908 Allow applying join function to feed
......................................................................
ASTERIXDB-1908 Allow applying join function to feed
1. Add FeedMessagingOperator for processing feed messages on the network
boundaries.
2. Unwrap operators in the feed workflow from FeedMetaOperator, and
insert FeedMessagingOperator to handle messages.
3. LSMTreeInsertDeleteOperator is still kept in FeedMetaOperator as for
fail-safe insertion workflow.
4. Add test case for applying join function to a feed.
Change-Id: I1c79a106537dbeeedd7a1b12cc2f1310307f3a00
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
A
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-join-function/feed-with-join-function.1.ddl.aql
A
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-join-function/feed-with-join-function.2.update.aql
A
asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-join-function/feed-with-join-function.3.query.aql
A
asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-join-function/feed-with-join-function.1.adm
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessagingOperatorDescriptor.java
A
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessagingOperatorPushable.java
8 files changed, 322 insertions(+), 51 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/55/1755/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 6155450..ca427be 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -49,6 +49,7 @@
import org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
+import org.apache.asterix.external.operators.FeedMessagingOperatorDescriptor;
import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
@@ -69,6 +70,7 @@
import org.apache.asterix.runtime.utils.RuntimeUtils;
import org.apache.asterix.translator.CompiledStatements;
import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.SessionConfig;
import org.apache.asterix.translator.SessionOutput;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
@@ -77,9 +79,6 @@
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Triple;
-import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
-import
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
-import
org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.constraints.Constraint;
import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
@@ -207,7 +206,6 @@
Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations
= new HashMap<>();
Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>();
List<JobId> jobIds = new ArrayList<>();
- FeedMetaOperatorDescriptor metaOp;
for (int iter1 = 0; iter1 < jobsList.size(); iter1++) {
FeedConnection curFeedConnection = feedConnections.get(iter1);
@@ -221,36 +219,22 @@
FeedMetadataUtil.validateIfPolicyExists(curFeedConnection.getDataverseName(),
curFeedConnection.getPolicyName(),
metadataProvider.getMetadataTxnContext());
- for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> entry :
operatorsMap.entrySet()) {
+ // copy operators
+ OperatorDescriptorId oldId;
+ OperatorDescriptorId opId;
+ FeedMetaOperatorDescriptor metaOpDesc;
+ for (Entry<OperatorDescriptorId, IOperatorDescriptor> entry :
operatorsMap.entrySet()) {
+ oldId = entry.getKey();
IOperatorDescriptor opDesc = entry.getValue();
- OperatorDescriptorId oldId = opDesc.getOperatorId();
- OperatorDescriptorId opId = null;
+ // wrap up LSMTreeInsertDeleteOperatorDescriptor to ensure
failsafe insert
if (opDesc instanceof LSMTreeInsertDeleteOperatorDescriptor
&& ((LSMTreeInsertDeleteOperatorDescriptor)
opDesc).isPrimary()) {
- metaOp = new FeedMetaOperatorDescriptor(jobSpec,
feedConnectionId, opDesc,
+ metaOpDesc = new FeedMetaOperatorDescriptor(jobSpec,
feedConnectionId, opDesc,
feedPolicyEntity.getProperties(),
FeedRuntimeType.STORE);
- opId = metaOp.getOperatorId();
+ opId = metaOpDesc.getOperatorId();
opDesc.setOperatorId(opId);
} else {
- if (opDesc instanceof AlgebricksMetaOperatorDescriptor) {
- AlgebricksMetaOperatorDescriptor algOp =
(AlgebricksMetaOperatorDescriptor) opDesc;
- IPushRuntimeFactory[] runtimeFactories =
algOp.getPipeline().getRuntimeFactories();
- // Tweak AssignOp to work with messages
- if (runtimeFactories[0] instanceof
AssignRuntimeFactory && runtimeFactories.length > 1) {
- IConnectorDescriptor connectorDesc =
-
subJob.getOperatorInputMap().get(opDesc.getOperatorId()).get(0);
- // anything on the network interface needs to be
message compatible
- if (connectorDesc instanceof
MToNPartitioningConnectorDescriptor) {
- metaOp = new
FeedMetaOperatorDescriptor(jobSpec, feedConnectionId, opDesc,
- feedPolicyEntity.getProperties(),
FeedRuntimeType.COMPUTE);
- opId = metaOp.getOperatorId();
- opDesc.setOperatorId(opId);
- }
- }
- }
- if (opId == null) {
- opId = jobSpec.createOperatorDescriptorId(opDesc);
- }
+ opId =
jobSpec.createOperatorDescriptorId(entry.getValue());
}
operatorIdMapping.put(oldId, opId);
}
@@ -271,22 +255,6 @@
connectorIdMapping.put(entry.getKey(), newConnId);
}
- // make connections between operators
- for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor,
Integer>,
- Pair<IOperatorDescriptor, Integer>>> entry :
subJob.getConnectorOperatorMap().entrySet()) {
- ConnectorDescriptorId newId =
connectorIdMapping.get(entry.getKey());
- IConnectorDescriptor connDesc =
jobSpec.getConnectorMap().get(newId);
- Pair<IOperatorDescriptor, Integer> leftOp =
entry.getValue().getLeft();
- Pair<IOperatorDescriptor, Integer> rightOp =
entry.getValue().getRight();
- IOperatorDescriptor leftOpDesc =
jobSpec.getOperatorMap().get(leftOp.getLeft().getOperatorId());
- IOperatorDescriptor rightOpDesc =
jobSpec.getOperatorMap().get(rightOp.getLeft().getOperatorId());
- if (leftOp.getLeft() instanceof FeedCollectOperatorDescriptor)
{
- jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec),
replicateOp, iter1, leftOpDesc,
- leftOp.getRight());
- }
- jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(),
rightOpDesc, rightOp.getRight());
- }
-
// prepare for setting partition constraints
operatorLocations.clear();
operatorCounts.clear();
@@ -294,7 +262,6 @@
for (Constraint constraint : subJob.getUserConstraints()) {
LValueConstraintExpression lexpr = constraint.getLValue();
ConstraintExpression cexpr = constraint.getRValue();
- OperatorDescriptorId opId;
switch (lexpr.getTag()) {
case PARTITION_COUNT:
opId = ((PartitionCountExpression)
lexpr).getOperatorDescriptorId();
@@ -309,8 +276,8 @@
operatorLocations.put(opDesc.getOperatorId(),
locations);
}
String location = (String) ((ConstantExpression)
cexpr).getValue();
- LocationConstraint lc =
- new LocationConstraint(location,
((PartitionLocationExpression) lexpr).getPartition());
+ LocationConstraint lc = new
LocationConstraint(location,
+ ((PartitionLocationExpression)
lexpr).getPartition());
locations.add(lc);
break;
default:
@@ -318,13 +285,52 @@
}
}
+ // sort the location constraint to make sure all Ops are consisten
+ for (Entry<OperatorDescriptorId, List<LocationConstraint>> entry :
operatorLocations.entrySet()) {
+ Collections.sort(entry.getValue(),
+ (LocationConstraint o1, LocationConstraint o2) ->
o1.partition - o2.partition);
+ }
+
+ // make connections between operators
+ for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor,
Integer>,
+ Pair<IOperatorDescriptor, Integer>>> entry :
subJob.getConnectorOperatorMap().entrySet()) {
+ ConnectorDescriptorId newId =
connectorIdMapping.get(entry.getKey());
+ IConnectorDescriptor connDesc =
jobSpec.getConnectorMap().get(newId);
+ Pair<IOperatorDescriptor, Integer> leftOp =
entry.getValue().getLeft();
+ Pair<IOperatorDescriptor, Integer> rightOp =
entry.getValue().getRight();
+ IOperatorDescriptor leftOpDesc =
jobSpec.getOperatorMap().get(leftOp.getLeft().getOperatorId());
+ IOperatorDescriptor rightOpDesc =
jobSpec.getOperatorMap().get(rightOp.getLeft().getOperatorId());
+ if (leftOp.getLeft() instanceof FeedCollectOperatorDescriptor)
{
+ jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec),
replicateOp, iter1, leftOpDesc,
+ leftOp.getRight());
+ }
+ if (connDesc instanceof
MToNPartitioningWithMessageConnectorDescriptor
+ && !(rightOpDesc instanceof
FeedMetaOperatorDescriptor)) {
+ FeedMessagingOperatorDescriptor feedMessagingOpDesc = new
FeedMessagingOperatorDescriptor(jobSpec,
+ feedPolicyEntity.getProperties(), leftOpDesc);
+ if
(operatorLocations.containsKey(rightOpDesc.getOperatorId())) {
+ List<LocationConstraint> rightLocations =
operatorLocations.get(rightOpDesc.getOperatorId());
+ String[] locations = new String[rightLocations.size()];
+ for (int iter2 = 0; iter2 <
operatorLocations.get(rightOpDesc.getOperatorId())
+ .size(); iter2++) {
+ locations[iter2] =
rightLocations.get(iter2).location;
+ }
+
PartitionConstraintHelper.addAbsoluteLocationConstraint(jobSpec,
feedMessagingOpDesc, locations);
+ } else {
+
PartitionConstraintHelper.addPartitionCountConstraint(jobSpec,
feedMessagingOpDesc,
+
operatorCounts.get(rightOpDesc.getOperatorId()));
+ }
+ jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(),
feedMessagingOpDesc, 0);
+ jobSpec.connect(new OneToOneConnectorDescriptor(jobSpec),
feedMessagingOpDesc, 0, rightOpDesc,
+ rightOp.getRight());
+ } else {
+ jobSpec.connect(connDesc, leftOpDesc, leftOp.getRight(),
rightOpDesc, rightOp.getRight());
+ }
+ }
+
// set absolute location constraints
for (Entry<OperatorDescriptorId, List<LocationConstraint>> entry :
operatorLocations.entrySet()) {
IOperatorDescriptor opDesc =
jobSpec.getOperatorMap().get(entry.getKey());
- // why do we need to sort?
- Collections.sort(entry.getValue(), (LocationConstraint o1,
LocationConstraint o2) -> {
- return o1.partition - o2.partition;
- });
String[] locations = new String[entry.getValue().size()];
for (int j = 0; j < locations.length; ++j) {
locations[j] = entry.getValue().get(j).location;
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-join-function/feed-with-join-function.1.ddl.aql
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-join-function/feed-with-join-function.1.ddl.aql
new file mode 100644
index 0000000..2462576
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-join-function/feed-with-join-function.1.ddl.aql
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+drop dataverse test if exists;
+
+create dataverse test;
+
+use dataverse test;
+
+create type AddressType as open {
+ number: int64,
+ street: string,
+ city: string
+}
+
+create type CustomerType as closed {
+ cid: int64,
+ name: string,
+ cashBack: int64,
+ age: int64?,
+ address: AddressType?,
+ lastorder: {
+ oid: int64,
+ total: float
+ }
+}
+
+create type OrderType as open {
+ oid: int64,
+ cid: int64,
+ orderstatus: string,
+ orderpriority: string,
+ clerk: string,
+ total: float,
+ items: [int64]
+}
+
+create type JoinResultType as open {
+ cid: int64
+}
+
+create dataset JoinResult(JoinResultType) primary key cid;
+
+create external dataset Orders(OrderType)
+using localfs
+(("path"="asterix_nc1://data/nontagged/orderData.json"),("format"="adm"));
+
+create function AnnotateOrderId($c) {
+object_merge($c, {"order_ids" : for $o in dataset Orders return $o.oid})
+}
+
+create feed CustomerFeed using localfs
+(("type-name"="CustomerType"),
+("path"="asterix_nc1://data/nontagged/customerData.json"),
+("format"="adm"));
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-join-function/feed-with-join-function.2.update.aql
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-join-function/feed-with-join-function.2.update.aql
new file mode 100644
index 0000000..e145ce7
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-join-function/feed-with-join-function.2.update.aql
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use dataverse test;
+
+connect feed CustomerFeed to dataset JoinResult apply function AnnotateOrderId;
+
+set wait-for-completion-feed "true";
+start feed CustomerFeed;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-join-function/feed-with-join-function.3.query.aql
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-join-function/feed-with-join-function.3.query.aql
new file mode 100644
index 0000000..143f131
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-join-function/feed-with-join-function.3.query.aql
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use dataverse test;
+for $i in dataset JoinResult order by $i.cid return $i;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-join-function/feed-with-join-function.1.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-join-function/feed-with-join-function.1.adm
new file mode 100644
index 0000000..f05d324
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/feeds/feed-with-join-function/feed-with-join-function.1.adm
@@ -0,0 +1,5 @@
+{ "cid": 0, "address": null, "cashBack": 600, "lastorder": { "oid": 258,
"total": 368.61862 }, "name": "Mike ley", "order_ids": [ 1000, 1, 100, 10 ] }
+{ "cid": 1, "address": { "number": 389, "street": "Hill St.", "city":
"Mountain View" }, "cashBack": 650, "lastorder": { "oid": 18, "total":
338.61862 }, "name": "Mike Carey", "order_ids": [ 1000, 1, 100, 10 ] }
+{ "cid": 4, "address": { "number": 8, "street": "Hill St.", "city": "Mountain
View" }, "age": 12, "cashBack": 450, "lastorder": { "oid": 4545, "total":
87.61863 }, "name": "Mary Carey", "order_ids": [ 1000, 1, 100, 10 ] }
+{ "cid": 5, "address": null, "age": 19, "cashBack": 350, "lastorder": { "oid":
48, "total": 318.61862 }, "name": "Jodi Alex", "order_ids": [ 1000, 1, 100, 10
] }
+{ "cid": 775, "address": { "number": 8389, "street": "Hill St.", "city":
"Mountain View" }, "age": null, "cashBack": 100, "lastorder": { "oid": 66,
"total": 38.618626 }, "name": "Jodi Rotruck", "order_ids": [ 1000, 1, 100, 10 ]
}
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
index a69d313..3039579 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite.xml
@@ -273,6 +273,11 @@
<output-dir
compare="Text">record-reader-with-malformed-input-stream</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="feeds">
+ <compilation-unit name="feed-with-join-function">
+ <output-dir compare="Text">feed-with-join-function</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="upsert">
<test-case FilePath="upsert">
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessagingOperatorDescriptor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessagingOperatorDescriptor.java
new file mode 100644
index 0000000..ac0cd7e
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessagingOperatorDescriptor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.operators;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+
+import java.util.Map;
+
+public class FeedMessagingOperatorDescriptor extends
AbstractSingleActivityOperatorDescriptor {
+
+ private final Map<String, String> feedPolicyProperties;
+
+ public FeedMessagingOperatorDescriptor(IOperatorDescriptorRegistry spec,
+ final Map<String, String> feedPolicyProperties,
IOperatorDescriptor sourceOpDesc) {
+ super(spec, 1, 1);
+ this.feedPolicyProperties = feedPolicyProperties;
+ this.outRecDescs[0] = sourceOpDesc.getOutputRecordDescriptors()[0];
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int
nPartitions) throws HyracksDataException {
+ return new FeedMessagingOperatorPushable(ctx, recordDescProvider,
feedPolicyProperties, activityNodeId);
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessagingOperatorPushable.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessagingOperatorPushable.java
new file mode 100644
index 0000000..94e0731
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMessagingOperatorPushable.java
@@ -0,0 +1,94 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.asterix.external.operators;
+
+import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.utils.TaskUtil;
+import
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class FeedMessagingOperatorPushable extends
AbstractUnaryInputUnaryOutputOperatorNodePushable {
+
+ private final IHyracksTaskContext ctx;
+ private final IRecordDescriptorProvider recordDescriptorProvider;
+ private final VSizeFrame message;
+ private FrameTupleAccessor fta;
+ private final ActivityId activityId;
+ private final FeedPolicyAccessor fpa;
+ private boolean opened;
+
+ public FeedMessagingOperatorPushable(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescriptorProvider, Map<String,
String> feedPolicyProperties,
+ ActivityId activityId)
+ throws HyracksDataException {
+ this.ctx = ctx;
+ this.recordDescriptorProvider = recordDescriptorProvider;
+ this.message = new VSizeFrame(ctx);
+ this.activityId = activityId;
+ // TODO: Enable policy control for each message handler by
configuration
+ this.fpa = new FeedPolicyAccessor(feedPolicyProperties);
+ this.opened = false;
+ TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ fta = new
FrameTupleAccessor(recordDescriptorProvider.getInputRecordDescriptor(activityId,
0));
+ writer.open();
+ opened = true;
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ FeedUtils.processFeedMessage(buffer, message, fta);
+ writer.nextFrame(buffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ if (opened) {
+ writer.fail();
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (opened) {
+ writer.close();
+ }
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ writer.flush();
+ }
+}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1755
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I1c79a106537dbeeedd7a1b12cc2f1310307f3a00
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <[email protected]>