>From Ian Maxon <ima...@apache.org>: Ian Maxon has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20069 )
Change subject: [ASTERIXDB-3628][RT] Correct subplan profiles ...................................................................... [ASTERIXDB-3628][RT] Correct subplan profiles - user model changes: no - storage format changes: no - interface changes: no Details: Currently subplan timing information is calculated during close(). This can lead to inaccuracies in the timing, especially when the subplan is near the end of a task pipeline, because other operator's times are calculated after close, during deinitialize(). Correct this by adding the capability in the MetaOperator to invoke a method on each ProfiledPushRuntime SubplanPushRuntime to calculate the timings, so it can be invoked at the proper time as to retrieve the correct time from its downstream operators. Ext-ref: MB-67499 Change-Id: I4d38710626778ef52abb1cd4fe1e0b26e0d74bec --- A hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IProfiledPushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java 6 files changed, 101 insertions(+), 13 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/69/20069/1 diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IProfiledPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IProfiledPushRuntime.java new file mode 100644 index 0000000..9875a2c --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IProfiledPushRuntime.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.algebricks.runtime.base; + +public interface IProfiledPushRuntime extends IPushRuntime { + + public void computeTimings(); + +} diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java index bf4533f..ff9a58b 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/ProfiledPushRuntime.java @@ -28,7 +28,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.profiling.IOperatorStats; -public class ProfiledPushRuntime extends ProfiledFrameWriter implements IPushRuntime { +public class ProfiledPushRuntime extends ProfiledFrameWriter implements IProfiledPushRuntime { private final IPushRuntime wrapped; private final IOperatorStats stats; @@ -45,9 +45,16 @@ this.last = last; } + public IOperatorStats getStats() { + return stats; + } + @Override - public void close() throws HyracksDataException { - super.close(); + public void computeTimings() { + //mainly to push through to subplans + if (wrapped instanceof IProfiledPushRuntime) { + ((IProfiledPushRuntime) wrapped).computeTimings(); + } long ownTime = getTotalTime(); //for micro union all. accumulate the time of each input into the counter. //then, on input 0, subtract the output from the accumulated time. @@ -62,10 +69,6 @@ stats.getTimeCounter().set(ownTime); } - public IOperatorStats getStats() { - return stats; - } - @Override public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { if (writer instanceof ITimedWriter) { diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java index 560f817..149aa24 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java @@ -26,6 +26,7 @@ import java.util.Map; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; +import org.apache.hyracks.algebricks.runtime.base.IProfiledPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -35,6 +36,7 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; +import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.profiling.IOperatorStats; import org.apache.hyracks.api.job.profiling.NoOpOperatorStats; import org.apache.hyracks.api.job.profiling.OperatorStats; @@ -204,6 +206,7 @@ private boolean opened = false; private IOperatorStats parentStats = NoOpOperatorStats.INSTANCE; private Map<IPushRuntimeFactory, IOperatorStats> microOpStats = new HashMap<>(); + private PipelineAssembler assembler; public void open() throws HyracksDataException { if (startOfPipeline == null) { @@ -211,9 +214,9 @@ outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.outRecDescs[0] : null; RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider .getInputRecordDescriptor(AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0); - PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity, - pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor); - startOfPipeline = pa.assemblePipeline(writer, ctx, microOpStats); + assembler = new PipelineAssembler(pipeline, inputArity, outputArity, pipelineInputRecordDescriptor, + pipelineOutputRecordDescriptor); + startOfPipeline = assembler.assemblePipeline(writer, ctx, microOpStats); } opened = true; startOfPipeline.open(); @@ -245,7 +248,9 @@ @Override public void deinitialize() throws HyracksDataException { - super.deinitialize(); + if (ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME)) { + assembler.getProfiledPushRuntimes().forEach(IProfiledPushRuntime::computeTimings); + } } @Override diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java index c2e8473..0f9355b 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java @@ -18,11 +18,15 @@ */ package org.apache.hyracks.algebricks.runtime.operators.meta; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime; +import org.apache.hyracks.algebricks.runtime.base.IProfiledPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.base.ProfiledPushRuntime; @@ -47,6 +51,7 @@ private final int outputArity; private final AlgebricksPipeline pipeline; private final Map<IPushRuntimeFactory, IPushRuntime[]> runtimeMap; + private final List<IProfiledPushRuntime> profiledRuntimes; private final boolean ignoreFailures; public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int outputArity, @@ -63,6 +68,7 @@ this.inputArity = inputArity; this.outputArity = outputArity; this.runtimeMap = new HashMap<>(); + this.profiledRuntimes = new ArrayList<>(); this.ignoreFailures = ignoreFailures; } @@ -97,6 +103,7 @@ profiled = (ProfiledPushRuntime) ProfiledPushRuntime.time(newRuntimes[j], microOpStats.get(runtimeFactory), false); } + profiledRuntimes.add(profiled); newRuntimes[j] = profiled; } else if (enforce && !profile) { newRuntimes[j] = EnforcePushRuntime.enforce(newRuntimes[j]); @@ -129,6 +136,10 @@ return runtimeMap.get(runtimeFactory); } + public List<IProfiledPushRuntime> getProfiledPushRuntimes() { + return Collections.unmodifiableList(profiledRuntimes); + } + //TODO: refactoring is needed public static IFrameWriter assemblePipeline(AlgebricksPipeline subplan, IFrameWriter writer, IHyracksTaskContext ctx, Map<IPushRuntimeFactory, IPushRuntime> outRuntimeMap) throws HyracksDataException { diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java index 242c603..766bfac 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java @@ -20,12 +20,14 @@ import java.io.DataOutput; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; +import org.apache.hyracks.algebricks.runtime.base.IProfiledPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; @@ -116,7 +118,8 @@ return new SubplanPushRuntime(ctx, false); } - public class SubplanPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime { + public class SubplanPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime + implements IProfiledPushRuntime { protected final IHyracksTaskContext ctx; @@ -126,9 +129,16 @@ boolean profile; + List<IProfiledPushRuntime> timedMicroOps; + + PipelineAssembler[] pipelineAssemblers; + protected SubplanPushRuntime(IHyracksTaskContext ctx, boolean ignoreFailures) throws HyracksDataException { this.ctx = ctx; this.profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME); + if (profile) { + timedMicroOps = new ArrayList<>(); + } this.first = true; IMissingWriter[] missingWriters = new IMissingWriter[missingWriterFactories.length]; @@ -138,7 +148,7 @@ int pipelineCount = pipelines.size(); startOfPipelines = new NestedTupleSourceRuntime[pipelineCount]; - PipelineAssembler[] pipelineAssemblers = new PipelineAssembler[pipelineCount]; + pipelineAssemblers = new PipelineAssembler[pipelineCount]; for (int i = 0; i < pipelineCount; i++) { AlgebricksPipeline pipeline = pipelines.get(i); RecordDescriptor pipelineLastRecordDescriptor = @@ -174,6 +184,12 @@ } } + public void computeTimings() { + for (PipelineAssembler assembler : pipelineAssemblers) { + assembler.getProfiledPushRuntimes().forEach(IProfiledPushRuntime::computeTimings); + } + } + @Override public void open() throws HyracksDataException { // writer opened many times? diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java index 53de340..881fdba 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/ProfiledFrameWriter.java @@ -123,6 +123,7 @@ @Override public void close() throws HyracksDataException { timeMethod(writer::close, totalTime); + } private int getTupleStartOffset(int tupleIndex, int tupleCountOffset, ByteBuffer buffer) { -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20069 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: ionic Gerrit-Change-Id: I4d38710626778ef52abb1cd4fe1e0b26e0d74bec Gerrit-Change-Number: 20069 Gerrit-PatchSet: 1 Gerrit-Owner: Ian Maxon <ima...@apache.org> Gerrit-MessageType: newchange