>From Ian Maxon <ima...@apache.org>:

Ian Maxon has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20069 )

Change subject: [ASTERIXDB-3628][RT] Correct subplan profiles
......................................................................

[ASTERIXDB-3628][RT] Correct subplan profiles

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

Currently subplan timing information is calculated during close().
This can lead to inaccuracies in the timing, especially when the
subplan is near the end of a task pipeline, because other
operator's times are calculated after close, during deinitialize().

Correct this by adding the capability in the MetaOperator to invoke a
method on each ProfiledPushRuntime SubplanPushRuntime to calculate the
timings, so it can be invoked at the proper time as to retrieve the
correct time from its downstream operators.

Ext-ref: MB-67499

Change-Id: I4d38710626778ef52abb1cd4fe1e0b26e0d74bec
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20069
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
---
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.3.regex
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.1.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.2.update.sqlpp
A 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IProfiledPushRuntime.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.5.regex
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.3.plans.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.4.plans.sqlpp
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.5.plans.sqlpp
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.4.regex
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
M asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml
15 files changed, 261 insertions(+), 13 deletions(-)

Approvals:
  Ali Alsuliman: Looks good to me, approved
  Jenkins: Verified; Verified
  Anon. E. Moose #1000171:




diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml
index b3f222a..2efc64f 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/profiled.xml
@@ -58,5 +58,13 @@
         <output-dir compare="Text">non-unary-subplan</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="profile">
+      <compilation-unit name="subplan">
+        <parameter name="profile" value="timings" type="string"/>
+        <parameter name="optimized-logical-plan" value="true" type="string"/>
+        <parameter name="plan-format" value="json" type="string"/>
+        <output-dir compare="Text">subplan</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.1.ddl.sqlpp
new file mode 100644
index 0000000..c86a631
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.1.ddl.sqlpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+create dataset test1 primary key(id: uuid) autogenerated;
+create dataset test2 primary key(id: uuid) autogenerated;
+create dataset test3 primary key(id: uuid) autogenerated;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.2.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.2.update.sqlpp
new file mode 100644
index 0000000..91fd485
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.2.update.sqlpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into test1 (
+{"x1": 88, "x2": 99, "states": [1,2,3,4], "elements": [{"s": 1, "m": 2}, {"s": 
11, "m": 22}] },
+{"x1": 88, "x2": 99, "states": [5,2,3,4], "elements": [{"s": 1, "m": 2}, {"s": 
11, "m": 22}] }
+);
+insert into test2 (
+{"elements": [{"a": 1, "states": [1,2]}, {"a": 2, "states": [4,5]}, {"a": 3, 
"states": [9,10]}] }
+);
+insert into test3 (
+{"elements": [{"elements1": [1,2,3]}, {"elements1": [1,2,3]}]},
+{"elements": [{"elements1": [0,2,5]}, {"elements1": [11,22,1]}]}
+);
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.3.plans.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.3.plans.sqlpp
new file mode 100644
index 0000000..ae2b41f
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.3.plans.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- compareunorderedarray=true
+use test;
+
+SELECT f1,
+COUNT(DISTINCT test11) AS f2,
+COUNT(DISTINCT test12) as f3,
+COUNT((SELECT m FROM el)) AS f4
+FROM test1
+LET f1 = (CASE WHEN (ANY a IN test1.states SATISFIES a = 1) THEN "1" WHEN (ANY 
a IN test1.states SATISFIES a = sleep(2,100)) THEN "2" ELSE "3" END),
+el = (SELECT s,m FROM test1.elements)
+GROUP BY f1;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.4.plans.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.4.plans.sqlpp
new file mode 100644
index 0000000..86c2d3b
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.4.plans.sqlpp
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- compareunorderedarray=true
+use test;
+
+SELECT states
+FROM test2
+LET states = (SELECT sleep(el.a,200) FROM test2.elements AS el WHERE ANY i IN 
el.states SATISFIES i > 9);
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.5.plans.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.5.plans.sqlpp
new file mode 100644
index 0000000..85e4318
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/profile/subplan/subplan.5.plans.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- compareunorderedarray=true
+use test;
+
+SELECT CASE WHEN (ANY i IN elements3 SATISFIES i > sleep(1,100)) THEN "a" ELSE 
"" END AS f
+FROM test3
+LET
+elements0 = test3.elements,
+elements2 = (FROM elements0 AS o SELECT CASE WHEN (ANY i IN o.elements1 
SATISFIES i > 2) THEN "b" ELSE "" END AS a),
+elements3 = (FROM elements2 SELECT a);
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.3.regex
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.3.regex
new file mode 100644
index 0000000..96bffa9
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.3.regex
@@ -0,0 +1 @@
+/"operator"\s*:\s*"select",\s*(?:\"[^\"]+\"\s*:\s*.+?,\s*){3}"min-time"\s*:\s*8[0-9]{2}(?:\.\d+)?/m
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.4.regex
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.4.regex
new file mode 100644
index 0000000..0f6580ec
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.4.regex
@@ -0,0 +1 @@
+/"operator"\s*:\s*"assign",\s*(?:\"[^\"]+\"\s*:\s*.+?,\s*){3}"min-time"\s*:\s*2[0-9]{2}(?:\.\d+)?/m
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.5.regex
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.5.regex
new file mode 100644
index 0000000..3840b32
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/profile/subplan/subplan.5.regex
@@ -0,0 +1 @@
+/"operator"\s*:\s*"select",\s*(?:\"[^\"]+\"\s*:\s*.+?,\s*){3}"min-time"\s*:\s*4[0-9]{2}(?:\.\d+)?/m
\ No newline at end of file
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IProfiledPushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IProfiledPushRuntime.java
new file mode 100644
index 0000000..9875a2c
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IProfiledPushRuntime.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.base;
+
+public interface IProfiledPushRuntime extends IPushRuntime {
+
+    public void computeTimings();
+
+}
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
index bf4533f..ff9a58b 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
@@ -28,7 +28,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.profiling.IOperatorStats;

-public class ProfiledPushRuntime extends ProfiledFrameWriter implements 
IPushRuntime {
+public class ProfiledPushRuntime extends ProfiledFrameWriter implements 
IProfiledPushRuntime {

     private final IPushRuntime wrapped;
     private final IOperatorStats stats;
@@ -45,9 +45,16 @@
         this.last = last;
     }

+    public IOperatorStats getStats() {
+        return stats;
+    }
+
     @Override
-    public void close() throws HyracksDataException {
-        super.close();
+    public void computeTimings() {
+        //mainly to push through to subplans
+        if (wrapped instanceof IProfiledPushRuntime) {
+            ((IProfiledPushRuntime) wrapped).computeTimings();
+        }
         long ownTime = getTotalTime();
         //for micro union all. accumulate the time of each input into the 
counter.
         //then, on input 0, subtract the output from the accumulated time.
@@ -62,10 +69,6 @@
         stats.getTimeCounter().set(ownTime);
     }

-    public IOperatorStats getStats() {
-        return stats;
-    }
-
     @Override
     public void setOutputFrameWriter(int index, IFrameWriter writer, 
RecordDescriptor recordDesc) {
         if (writer instanceof ITimedWriter) {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 560f817..232121b 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -21,11 +21,13 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;

 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.IProfiledPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -35,6 +37,7 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.profiling.IOperatorStats;
 import org.apache.hyracks.api.job.profiling.NoOpOperatorStats;
 import org.apache.hyracks.api.job.profiling.OperatorStats;
@@ -204,6 +207,7 @@
             private boolean opened = false;
             private IOperatorStats parentStats = NoOpOperatorStats.INSTANCE;
             private Map<IPushRuntimeFactory, IOperatorStats> microOpStats = 
new HashMap<>();
+            private List<IProfiledPushRuntime> profiledPushRuntimes = 
Collections.emptyList();

             public void open() throws HyracksDataException {
                 if (startOfPipeline == null) {
@@ -211,9 +215,12 @@
                             outputArity > 0 ? 
AlgebricksMetaOperatorDescriptor.this.outRecDescs[0] : null;
                     RecordDescriptor pipelineInputRecordDescriptor = 
recordDescProvider
                             
.getInputRecordDescriptor(AlgebricksMetaOperatorDescriptor.this.getActivityId(),
 0);
-                    PipelineAssembler pa = new PipelineAssembler(pipeline, 
inputArity, outputArity,
+                    PipelineAssembler assembler = new 
PipelineAssembler(pipeline, inputArity, outputArity,
                             pipelineInputRecordDescriptor, 
pipelineOutputRecordDescriptor);
-                    startOfPipeline = pa.assemblePipeline(writer, ctx, 
microOpStats);
+                    startOfPipeline = assembler.assemblePipeline(writer, ctx, 
microOpStats);
+                    if (ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME)) {
+                        profiledPushRuntimes = 
assembler.getProfiledPushRuntimes();
+                    }
                 }
                 opened = true;
                 startOfPipeline.open();
@@ -246,6 +253,9 @@
             @Override
             public void deinitialize() throws HyracksDataException {
                 super.deinitialize();
+                if (ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME)) {
+                    
profiledPushRuntimes.forEach(IProfiledPushRuntime::computeTimings);
+                }
             }

             @Override
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index c2e8473..85a4b51 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -18,11 +18,15 @@
  */
 package org.apache.hyracks.algebricks.runtime.operators.meta;

+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;

 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IProfiledPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.base.ProfiledPushRuntime;
@@ -129,6 +133,11 @@
         return runtimeMap.get(runtimeFactory);
     }

+    public List<IProfiledPushRuntime> getProfiledPushRuntimes() {
+        return runtimeMap.values().stream().flatMap(Arrays::stream).filter(f 
-> f instanceof IProfiledPushRuntime)
+                .map(f -> (IProfiledPushRuntime) 
f).collect(Collectors.toList());
+    }
+
     //TODO: refactoring is needed
     public static IFrameWriter assemblePipeline(AlgebricksPipeline subplan, 
IFrameWriter writer,
             IHyracksTaskContext ctx, Map<IPushRuntimeFactory, IPushRuntime> 
outRuntimeMap) throws HyracksDataException {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 242c603..7f9f3a9 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -20,12 +20,15 @@

 import java.io.DataOutput;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.IProfiledPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
@@ -116,20 +119,28 @@
         return new SubplanPushRuntime(ctx, false);
     }

-    public class SubplanPushRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime {
+    public class SubplanPushRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime
+            implements IProfiledPushRuntime {

         protected final IHyracksTaskContext ctx;

         protected final NestedTupleSourceRuntime[] startOfPipelines;

-        boolean first;
+        private final boolean profile;

-        boolean profile;
+        private final List<IProfiledPushRuntime> timedMicroOps;
+
+        boolean first;

         protected SubplanPushRuntime(IHyracksTaskContext ctx, boolean 
ignoreFailures) throws HyracksDataException {
             this.ctx = ctx;
             this.profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
             this.first = true;
+            if (profile) {
+                timedMicroOps = new ArrayList<>();
+            } else {
+                timedMicroOps = Collections.emptyList();
+            }

             IMissingWriter[] missingWriters = new 
IMissingWriter[missingWriterFactories.length];
             for (int i = 0; i < missingWriterFactories.length; i++) {
@@ -169,11 +180,18 @@
                 PipelineAssembler pa =
                         new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, 
outputRecordDescriptor, ignoreFailures);
                 IFrameWriter head = pa.assemblePipeline(outputWriter, ctx, 
stats);
+                if (profile) {
+                    timedMicroOps.addAll(pa.getProfiledPushRuntimes());
+                }
                 startOfPipelines[i] = (NestedTupleSourceRuntime) head;
                 pipelineAssemblers[i] = pa;
             }
         }

+        public void computeTimings() {
+            timedMicroOps.forEach(IProfiledPushRuntime::computeTimings);
+        }
+
         @Override
         public void open() throws HyracksDataException {
             // writer opened many times?
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
index cb188c0..072a684 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledOperatorNodePushable.java
@@ -55,6 +55,7 @@

     @Override
     public void deinitialize() throws HyracksDataException {
+        ProfiledFrameWriter.timeMethod(op::deinitialize, totalTime);
         long ownTime = totalTime.get();
         for (ITimedWriter i : inputs.values()) {
             ownTime += i.getTotalTime();
@@ -62,7 +63,6 @@
         for (ITimedWriter w : outputs.values()) {
             ownTime -= w.getTotalTime();
         }
-        op.deinitialize();
         stats.getTimeCounter().set(ownTime);
     }


--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20069
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: ionic
Gerrit-Change-Id: I4d38710626778ef52abb1cd4fe1e0b26e0d74bec
Gerrit-Change-Number: 20069
Gerrit-PatchSet: 11
Gerrit-Owner: Ian Maxon <ima...@apache.org>
Gerrit-Reviewer: Ali Alsuliman <ali.al.solai...@gmail.com>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Ian Maxon <ima...@apache.org>
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-MessageType: merged

Reply via email to