James Fang has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/3338


Change subject: [NO ISSUE] Create an abstraction for the 
ForwardOperatorDescriptor
......................................................................

[NO ISSUE] Create an abstraction for the ForwardOperatorDescriptor

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Currently the forward operator has the activities inside it. By adding a layer 
of abstraction, we can use different activities for the same underlying 
operator.

Change-Id: Icc3db4b386e69a98c2a1c40dadc96eb3e1a5d4fa
---
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
A 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java
R 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
3 files changed, 79 insertions(+), 32 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/38/3338/1

diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
index 11c584e..1b7675c 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ForwardPOperator.java
@@ -39,11 +39,11 @@
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.dataflow.std.misc.ForwardOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.SortForwardOperatorDescriptor;

 /**
  * <pre>
- * {@see {@link ForwardOperator} and {@link ForwardOperatorDescriptor}}
+ * {@see {@link ForwardOperator} and {@link SortForwardOperatorDescriptor}}
  * idx0: Input data source --
  *                           |-- forward op.
  * idx1: RangeMap generator--
@@ -108,8 +108,8 @@
         ForwardOperator forwardOp = (ForwardOperator) op;
         RecordDescriptor dataInputDescriptor = JobGenHelper.mkRecordDescriptor(
                 
context.getTypeEnvironment(forwardOp.getInputs().get(0).getValue()), 
inputSchemas[0], context);
-        ForwardOperatorDescriptor forwardDescriptor =
-                new ForwardOperatorDescriptor(builder.getJobSpec(), 
forwardOp.getRangeMapKey(), dataInputDescriptor);
+        SortForwardOperatorDescriptor forwardDescriptor = new 
SortForwardOperatorDescriptor(builder.getJobSpec(),
+                forwardOp.getRangeMapKey(), dataInputDescriptor);
         builder.contributeHyracksOperator(forwardOp, forwardDescriptor);
         ILogicalOperator dataSource = forwardOp.getInputs().get(0).getValue();
         builder.contributeGraphEdge(dataSource, 0, forwardOp, 0);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java
new file mode 100644
index 0000000..2d56f4c
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractForwardOperatorDescriptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.base;
+
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+
+// TODO(ali): forward operator should probably be moved to asterix layer
+public class AbstractForwardOperatorDescriptor extends 
AbstractOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    protected static final int FORWARD_DATA_ACTIVITY_ID = 0;
+    protected static final int SIDE_DATA_ACTIVITY_ID = 1;
+    private AbstractActivityNode forwardDataActivity;
+    private AbstractActivityNode sideDataActivity;
+
+    /**
+     * @param spec used to create the operator id.
+     * @param outputRecordDescriptor the output schema of this operator.
+     */
+    public AbstractForwardOperatorDescriptor(IOperatorDescriptorRegistry spec,
+            RecordDescriptor outputRecordDescriptor) {
+        super(spec, 2, 1);
+        outRecDescs[0] = outputRecordDescriptor;
+    }
+
+    /**
+     * @param forwardActivityNode activity node for the Forward Activity
+     * @param sideActivityNode activity node for the Side Activity
+     */
+    public void setActivities(AbstractActivityNode forwardActivityNode, 
AbstractActivityNode sideActivityNode) {
+        forwardDataActivity = forwardActivityNode;
+        sideDataActivity = sideActivityNode;
+    }
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        // side data activity, its input is coming through the operator's 
in-port = 1 & activity's in-port = 0
+        builder.addActivity(this, sideDataActivity);
+        builder.addSourceEdge(1, sideDataActivity, 0);
+
+        // forward data activity, its input is coming through the operator's 
in-port = 0 & activity's in-port = 0
+        builder.addActivity(this, forwardDataActivity);
+        builder.addSourceEdge(0, forwardDataActivity, 0);
+
+        // forward data activity will wait for the side data activity
+        builder.addBlockingEdge(sideDataActivity, forwardDataActivity);
+
+        // data leaves from the operator's out-port = 0 & forward data 
activity's out-port = 0
+        builder.addTargetEdge(0, forwardDataActivity, 0);
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
similarity index 87%
rename from 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
rename to 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
index 49eea0a..d970404 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ForwardOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
@@ -24,7 +24,6 @@

 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.dataflow.TaskId;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -42,16 +41,14 @@
 import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
 import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractForwardOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;

 // TODO(ali): forward operator should probably be moved to asterix layer
-public class ForwardOperatorDescriptor extends AbstractOperatorDescriptor {
+public class SortForwardOperatorDescriptor extends 
AbstractForwardOperatorDescriptor {
     private static final long serialVersionUID = 1L;
-    private static final int FORWARD_DATA_ACTIVITY_ID = 0;
-    private static final int RANGEMAP_READER_ACTIVITY_ID = 1;
     private final String rangeMapKeyInContext;

     /**
@@ -59,33 +56,15 @@
      * @param rangeMapKeyInContext the unique key to store the range map in 
the shared map & transfer it to partitioner.
      * @param outputRecordDescriptor the output schema of this operator.
      */
-    public ForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, String 
rangeMapKeyInContext,
+    public SortForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, 
String rangeMapKeyInContext,
             RecordDescriptor outputRecordDescriptor) {
-        super(spec, 2, 1);
+        super(spec, outputRecordDescriptor);
         this.rangeMapKeyInContext = rangeMapKeyInContext;
-        outRecDescs[0] = outputRecordDescriptor;
-    }
-
-    @Override
-    public void contributeActivities(IActivityGraphBuilder builder) {
         ForwardDataActivity forwardDataActivity =
                 new ForwardDataActivity(new ActivityId(odId, 
FORWARD_DATA_ACTIVITY_ID));
         RangeMapReaderActivity rangeMapReaderActivity =
-                new RangeMapReaderActivity(new ActivityId(odId, 
RANGEMAP_READER_ACTIVITY_ID));
-
-        // range map reader activity, its input is coming through the 
operator's in-port = 1 & activity's in-port = 0
-        builder.addActivity(this, rangeMapReaderActivity);
-        builder.addSourceEdge(1, rangeMapReaderActivity, 0);
-
-        // forward data activity, its input is coming through the operator's 
in-port = 0 & activity's in-port = 0
-        builder.addActivity(this, forwardDataActivity);
-        builder.addSourceEdge(0, forwardDataActivity, 0);
-
-        // forward data activity will wait for the range map reader activity
-        builder.addBlockingEdge(rangeMapReaderActivity, forwardDataActivity);
-
-        // data leaves from the operator's out-port = 0 & forward data 
activity's out-port = 0
-        builder.addTargetEdge(0, forwardDataActivity, 0);
+                new RangeMapReaderActivity(new ActivityId(odId, 
SIDE_DATA_ACTIVITY_ID));
+        setActivities(forwardDataActivity, rangeMapReaderActivity);
     }

     /**
@@ -221,7 +200,7 @@
         public void open() throws HyracksDataException {
             // retrieve the range map from the state object (previous activity 
should have already stored it)
             // then deposit it into the ctx so that MToN-partition can pick it 
up
-            Object stateObjKey = new TaskId(new ActivityId(odId, 
RANGEMAP_READER_ACTIVITY_ID), partition);
+            Object stateObjKey = new TaskId(new ActivityId(odId, 
SIDE_DATA_ACTIVITY_ID), partition);
             RangeMapState rangeMapState = (RangeMapState) 
ctx.getStateObject(stateObjKey);
             TaskUtil.put(rangeMapKeyInContext, rangeMapState.rangeMap, ctx);
             writer.open();

--
To view, visit https://asterix-gerrit.ics.uci.edu/3338
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: Icc3db4b386e69a98c2a1c40dadc96eb3e1a5d4fa
Gerrit-Change-Number: 3338
Gerrit-PatchSet: 1
Gerrit-Owner: James Fang <[email protected]>

Reply via email to