>From Ian Maxon <ima...@apache.org>:

Ian Maxon has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20069 )


Change subject: [ASTERIXDB-3628][RT] Correct subplan profiles
......................................................................

[ASTERIXDB-3628][RT] Correct subplan profiles

- user model changes: no
- storage format changes: no
- interface changes: no

Details:

Currently subplan timing information is calculated during close().
This can lead to inaccuracies in the timing, especially when the
subplan is near the end of a task pipeline, because other
operator's times are calculated after close, during deinitialize().

Correct this by adding the capability in the MetaOperator to invoke a
method on each ProfiledPushRuntime SubplanPushRuntime to calculate the
timings, so it can be invoked at the proper time as to retrieve the
correct time from its downstream operators.

Ext-ref: MB-67499

Change-Id: I4d38710626778ef52abb1cd4fe1e0b26e0d74bec
---
A 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IProfiledPushRuntime.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
6 files changed, 101 insertions(+), 13 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/69/20069/1

diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IProfiledPushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IProfiledPushRuntime.java
new file mode 100644
index 0000000..9875a2c
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IProfiledPushRuntime.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.base;
+
+public interface IProfiledPushRuntime extends IPushRuntime {
+
+    public void computeTimings();
+
+}
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
index bf4533f..ff9a58b 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java
@@ -28,7 +28,7 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.profiling.IOperatorStats;

-public class ProfiledPushRuntime extends ProfiledFrameWriter implements 
IPushRuntime {
+public class ProfiledPushRuntime extends ProfiledFrameWriter implements 
IProfiledPushRuntime {

     private final IPushRuntime wrapped;
     private final IOperatorStats stats;
@@ -45,9 +45,16 @@
         this.last = last;
     }

+    public IOperatorStats getStats() {
+        return stats;
+    }
+
     @Override
-    public void close() throws HyracksDataException {
-        super.close();
+    public void computeTimings() {
+        //mainly to push through to subplans
+        if (wrapped instanceof IProfiledPushRuntime) {
+            ((IProfiledPushRuntime) wrapped).computeTimings();
+        }
         long ownTime = getTotalTime();
         //for micro union all. accumulate the time of each input into the 
counter.
         //then, on input 0, subtract the output from the accumulated time.
@@ -62,10 +69,6 @@
         stats.getTimeCounter().set(ownTime);
     }
 
-    public IOperatorStats getStats() {
-        return stats;
-    }
-
     @Override
     public void setOutputFrameWriter(int index, IFrameWriter writer, 
RecordDescriptor recordDesc) {
         if (writer instanceof ITimedWriter) {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 560f817..149aa24 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -26,6 +26,7 @@
 import java.util.Map;

 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.IProfiledPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -35,6 +36,7 @@
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.profiling.IOperatorStats;
 import org.apache.hyracks.api.job.profiling.NoOpOperatorStats;
 import org.apache.hyracks.api.job.profiling.OperatorStats;
@@ -204,6 +206,7 @@
             private boolean opened = false;
             private IOperatorStats parentStats = NoOpOperatorStats.INSTANCE;
             private Map<IPushRuntimeFactory, IOperatorStats> microOpStats = 
new HashMap<>();
+            private PipelineAssembler assembler;

             public void open() throws HyracksDataException {
                 if (startOfPipeline == null) {
@@ -211,9 +214,9 @@
                             outputArity > 0 ? 
AlgebricksMetaOperatorDescriptor.this.outRecDescs[0] : null;
                     RecordDescriptor pipelineInputRecordDescriptor = 
recordDescProvider
                             
.getInputRecordDescriptor(AlgebricksMetaOperatorDescriptor.this.getActivityId(),
 0);
-                    PipelineAssembler pa = new PipelineAssembler(pipeline, 
inputArity, outputArity,
-                            pipelineInputRecordDescriptor, 
pipelineOutputRecordDescriptor);
-                    startOfPipeline = pa.assemblePipeline(writer, ctx, 
microOpStats);
+                    assembler = new PipelineAssembler(pipeline, inputArity, 
outputArity, pipelineInputRecordDescriptor,
+                            pipelineOutputRecordDescriptor);
+                    startOfPipeline = assembler.assemblePipeline(writer, ctx, 
microOpStats);
                 }
                 opened = true;
                 startOfPipeline.open();
@@ -245,7 +248,9 @@

             @Override
             public void deinitialize() throws HyracksDataException {
-                super.deinitialize();
+                if (ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME)) {
+                    
assembler.getProfiledPushRuntimes().forEach(IProfiledPushRuntime::computeTimings);
+                }
             }

             @Override
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index c2e8473..0f9355b 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -18,11 +18,15 @@
  */
 package org.apache.hyracks.algebricks.runtime.operators.meta;

+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;

 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IProfiledPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.base.ProfiledPushRuntime;
@@ -47,6 +51,7 @@
     private final int outputArity;
     private final AlgebricksPipeline pipeline;
     private final Map<IPushRuntimeFactory, IPushRuntime[]> runtimeMap;
+    private final List<IProfiledPushRuntime> profiledRuntimes;
     private final boolean ignoreFailures;

     public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int 
outputArity,
@@ -63,6 +68,7 @@
         this.inputArity = inputArity;
         this.outputArity = outputArity;
         this.runtimeMap = new HashMap<>();
+        this.profiledRuntimes = new ArrayList<>();
         this.ignoreFailures = ignoreFailures;
     }

@@ -97,6 +103,7 @@
                         profiled = (ProfiledPushRuntime) 
ProfiledPushRuntime.time(newRuntimes[j],
                                 microOpStats.get(runtimeFactory), false);
                     }
+                    profiledRuntimes.add(profiled);
                     newRuntimes[j] = profiled;
                 } else if (enforce && !profile) {
                     newRuntimes[j] = 
EnforcePushRuntime.enforce(newRuntimes[j]);
@@ -129,6 +136,10 @@
         return runtimeMap.get(runtimeFactory);
     }

+    public List<IProfiledPushRuntime> getProfiledPushRuntimes() {
+        return Collections.unmodifiableList(profiledRuntimes);
+    }
+
     //TODO: refactoring is needed
     public static IFrameWriter assemblePipeline(AlgebricksPipeline subplan, 
IFrameWriter writer,
             IHyracksTaskContext ctx, Map<IPushRuntimeFactory, IPushRuntime> 
outRuntimeMap) throws HyracksDataException {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 242c603..766bfac 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -20,12 +20,14 @@

 import java.io.DataOutput;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.IProfiledPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
@@ -116,7 +118,8 @@
         return new SubplanPushRuntime(ctx, false);
     }

-    public class SubplanPushRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime {
+    public class SubplanPushRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime
+            implements IProfiledPushRuntime {

         protected final IHyracksTaskContext ctx;

@@ -126,9 +129,16 @@

         boolean profile;

+        List<IProfiledPushRuntime> timedMicroOps;
+
+        PipelineAssembler[] pipelineAssemblers;
+
         protected SubplanPushRuntime(IHyracksTaskContext ctx, boolean 
ignoreFailures) throws HyracksDataException {
             this.ctx = ctx;
             this.profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
+            if (profile) {
+                timedMicroOps = new ArrayList<>();
+            }
             this.first = true;

             IMissingWriter[] missingWriters = new 
IMissingWriter[missingWriterFactories.length];
@@ -138,7 +148,7 @@

             int pipelineCount = pipelines.size();
             startOfPipelines = new NestedTupleSourceRuntime[pipelineCount];
-            PipelineAssembler[] pipelineAssemblers = new 
PipelineAssembler[pipelineCount];
+            pipelineAssemblers = new PipelineAssembler[pipelineCount];
             for (int i = 0; i < pipelineCount; i++) {
                 AlgebricksPipeline pipeline = pipelines.get(i);
                 RecordDescriptor pipelineLastRecordDescriptor =
@@ -174,6 +184,12 @@
             }
         }

+        public void computeTimings() {
+            for (PipelineAssembler assembler : pipelineAssemblers) {
+                
assembler.getProfiledPushRuntimes().forEach(IProfiledPushRuntime::computeTimings);
+            }
+        }
+
         @Override
         public void open() throws HyracksDataException {
             // writer opened many times?
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
index 53de340..881fdba 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java
@@ -123,6 +123,7 @@
     @Override
     public void close() throws HyracksDataException {
         timeMethod(writer::close, totalTime);
+
     }

     private int getTupleStartOffset(int tupleIndex, int tupleCountOffset, 
ByteBuffer buffer) {

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20069
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: ionic
Gerrit-Change-Id: I4d38710626778ef52abb1cd4fe1e0b26e0d74bec
Gerrit-Change-Number: 20069
Gerrit-PatchSet: 1
Gerrit-Owner: Ian Maxon <ima...@apache.org>
Gerrit-MessageType: newchange

Reply via email to