James Fang has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/3338
Change subject: [NO ISSUE] Create an abstraction for the
ForwardOperatorDescriptor
......................................................................
[NO ISSUE] Create an abstraction for the ForwardOperatorDescriptor
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Currently the forward operator has the activities inside it. By adding a layer
of abstraction, we can use different activities for the same underlying
operator.
Change-Id: Icc3db4b386e69a98c2a1c40dadc96eb3e1a5d4fa
---
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
A
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java
R
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
3 files changed, 79 insertions(+), 32 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/38/3338/1
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
index 11c584e..1b7675c 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
@@ -39,11 +39,11 @@
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.std.misc.ForwardOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.SortForwardOperatorDescriptor;
/**
* <pre>
- * {@see {@link ForwardOperator} and {@link ForwardOperatorDescriptor}}
+ * {@see {@link ForwardOperator} and {@link SortForwardOperatorDescriptor}}
* idx0: Input data source --
* |-- forward op.
* idx1: RangeMap generator--
@@ -108,8 +108,8 @@
ForwardOperator forwardOp = (ForwardOperator) op;
RecordDescriptor dataInputDescriptor = JobGenHelper.mkRecordDescriptor(
context.getTypeEnvironment(forwardOp.getInputs().get(0).getValue()),
inputSchemas[0], context);
- ForwardOperatorDescriptor forwardDescriptor =
- new ForwardOperatorDescriptor(builder.getJobSpec(),
forwardOp.getRangeMapKey(), dataInputDescriptor);
+ SortForwardOperatorDescriptor forwardDescriptor = new
SortForwardOperatorDescriptor(builder.getJobSpec(),
+ forwardOp.getRangeMapKey(), dataInputDescriptor);
builder.contributeHyracksOperator(forwardOp, forwardDescriptor);
ILogicalOperator dataSource = forwardOp.getInputs().get(0).getValue();
builder.contributeGraphEdge(dataSource, 0, forwardOp, 0);
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java
new file mode 100644
index 0000000..2d56f4c
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.base;
+
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+
+// TODO(ali): forward operator should probably be moved to asterix layer
+public class AbstractForwardOperatorDescriptor extends
AbstractOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+ protected static final int FORWARD_DATA_ACTIVITY_ID = 0;
+ protected static final int SIDE_DATA_ACTIVITY_ID = 1;
+ private AbstractActivityNode forwardDataActivity;
+ private AbstractActivityNode sideDataActivity;
+
+ /**
+ * @param spec used to create the operator id.
+ * @param outputRecordDescriptor the output schema of this operator.
+ */
+ public AbstractForwardOperatorDescriptor(IOperatorDescriptorRegistry spec,
+ RecordDescriptor outputRecordDescriptor) {
+ super(spec, 2, 1);
+ outRecDescs[0] = outputRecordDescriptor;
+ }
+
+ /**
+ * @param forwardActivityNode activity node for the Forward Activity
+ * @param sideActivityNode activity node for the Side Activity
+ */
+ public void setActivities(AbstractActivityNode forwardActivityNode,
AbstractActivityNode sideActivityNode) {
+ forwardDataActivity = forwardActivityNode;
+ sideDataActivity = sideActivityNode;
+ }
+
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ // side data activity, its input is coming through the operator's
in-port = 1 & activity's in-port = 0
+ builder.addActivity(this, sideDataActivity);
+ builder.addSourceEdge(1, sideDataActivity, 0);
+
+ // forward data activity, its input is coming through the operator's
in-port = 0 & activity's in-port = 0
+ builder.addActivity(this, forwardDataActivity);
+ builder.addSourceEdge(0, forwardDataActivity, 0);
+
+ // forward data activity will wait for the side data activity
+ builder.addBlockingEdge(sideDataActivity, forwardDataActivity);
+
+ // data leaves from the operator's out-port = 0 & forward data
activity's out-port = 0
+ builder.addTargetEdge(0, forwardDataActivity, 0);
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
similarity index 87%
rename from
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
rename to
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
index 49eea0a..d970404 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
@@ -24,7 +24,6 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.TaskId;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -42,16 +41,14 @@
import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
import org.apache.hyracks.dataflow.common.utils.TaskUtil;
import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractForwardOperatorDescriptor;
import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
import
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
import
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
// TODO(ali): forward operator should probably be moved to asterix layer
-public class ForwardOperatorDescriptor extends AbstractOperatorDescriptor {
+public class SortForwardOperatorDescriptor extends
AbstractForwardOperatorDescriptor {
private static final long serialVersionUID = 1L;
- private static final int FORWARD_DATA_ACTIVITY_ID = 0;
- private static final int RANGEMAP_READER_ACTIVITY_ID = 1;
private final String rangeMapKeyInContext;
/**
@@ -59,33 +56,15 @@
* @param rangeMapKeyInContext the unique key to store the range map in
the shared map & transfer it to partitioner.
* @param outputRecordDescriptor the output schema of this operator.
*/
- public ForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, String
rangeMapKeyInContext,
+ public SortForwardOperatorDescriptor(IOperatorDescriptorRegistry spec,
String rangeMapKeyInContext,
RecordDescriptor outputRecordDescriptor) {
- super(spec, 2, 1);
+ super(spec, outputRecordDescriptor);
this.rangeMapKeyInContext = rangeMapKeyInContext;
- outRecDescs[0] = outputRecordDescriptor;
- }
-
- @Override
- public void contributeActivities(IActivityGraphBuilder builder) {
ForwardDataActivity forwardDataActivity =
new ForwardDataActivity(new ActivityId(odId,
FORWARD_DATA_ACTIVITY_ID));
RangeMapReaderActivity rangeMapReaderActivity =
- new RangeMapReaderActivity(new ActivityId(odId,
RANGEMAP_READER_ACTIVITY_ID));
-
- // range map reader activity, its input is coming through the
operator's in-port = 1 & activity's in-port = 0
- builder.addActivity(this, rangeMapReaderActivity);
- builder.addSourceEdge(1, rangeMapReaderActivity, 0);
-
- // forward data activity, its input is coming through the operator's
in-port = 0 & activity's in-port = 0
- builder.addActivity(this, forwardDataActivity);
- builder.addSourceEdge(0, forwardDataActivity, 0);
-
- // forward data activity will wait for the range map reader activity
- builder.addBlockingEdge(rangeMapReaderActivity, forwardDataActivity);
-
- // data leaves from the operator's out-port = 0 & forward data
activity's out-port = 0
- builder.addTargetEdge(0, forwardDataActivity, 0);
+ new RangeMapReaderActivity(new ActivityId(odId,
SIDE_DATA_ACTIVITY_ID));
+ setActivities(forwardDataActivity, rangeMapReaderActivity);
}
/**
@@ -221,7 +200,7 @@
public void open() throws HyracksDataException {
// retrieve the range map from the state object (previous activity
should have already stored it)
// then deposit it into the ctx so that MToN-partition can pick it
up
- Object stateObjKey = new TaskId(new ActivityId(odId,
RANGEMAP_READER_ACTIVITY_ID), partition);
+ Object stateObjKey = new TaskId(new ActivityId(odId,
SIDE_DATA_ACTIVITY_ID), partition);
RangeMapState rangeMapState = (RangeMapState)
ctx.getStateObject(stateObjKey);
TaskUtil.put(rangeMapKeyInContext, rangeMapState.rangeMap, ctx);
writer.open();
--
To view, visit https://asterix-gerrit.ics.uci.edu/3338
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: Icc3db4b386e69a98c2a1c40dadc96eb3e1a5d4fa
Gerrit-Change-Number: 3338
Gerrit-PatchSet: 1
Gerrit-Owner: James Fang <[email protected]>