Dmitry Lychagin has submitted this change and it was merged. Change subject: [NO ISSUE][RT] Window operator runtime optimization ......................................................................
[NO ISSUE][RT] Window operator runtime optimization - user model changes: yes - storage format changes: no - interface changes: no Details: - Improve memory management for window operators - Add "compiler.windowmemory" property that specifies memory budget for each window operator (default is 4MB, min is 160KB) - Consolidated negative window operator testcases into a single one Change-Id: I6756e92046883f79db339ef490cca8bc8b7b1fb8 Reviewed-on: https://asterix-gerrit.ics.uci.edu/3227 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java M asterixdb/asterix-app/src/main/resources/cc.conf M asterixdb/asterix-app/src/main/resources/cc2.conf M asterixdb/asterix-app/src/main/resources/cc3.conf M asterixdb/asterix-app/src/main/resources/cc4.conf M asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java M asterixdb/asterix-app/src/test/resources/cc-compression.conf M asterixdb/asterix-app/src/test/resources/cc-multipart.conf M asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf M asterixdb/asterix-app/src/test/resources/cc-ssl.conf M asterixdb/asterix-app/src/test/resources/cc-storage.conf M asterixdb/asterix-app/src/test/resources/cc-stringoffset.conf M asterixdb/asterix-app/src/test/resources/cc.conf M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ntile_01/ntile_01.5.query.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.5.query.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.6.query.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.12.query.sqlpp R asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.1.query.sqlpp R asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.2.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.3.query.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.9.query.sqlpp M asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm M asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm M asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java M hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java A hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowPartitionReader.java A hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowPartitionWriter.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java M hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java M hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java 57 files changed, 742 insertions(+), 308 deletions(-) Approvals: Anon. E. Moose #1000171: Ali Alsuliman: Looks good to me, approved Jenkins: Verified; ; Verified Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java index a1f819d..efffda2 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Set; import org.apache.asterix.algebra.operators.physical.BTreeSearchPOperator; import org.apache.asterix.algebra.operators.physical.InvertedIndexPOperator; @@ -68,7 +67,6 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.WindowPOperator; import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn; -import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.algebricks.rewriter.util.JoinUtils; @@ -285,7 +283,7 @@ } case WINDOW: { WindowOperator winOp = (WindowOperator) op; - WindowPOperator physOp = createWindowPOperator(winOp); + WindowPOperator physOp = createWindowPOperator(winOp, context); op.setPhysicalOperator(physOp); break; } @@ -344,7 +342,8 @@ aggOp.setMergeExpressions(mergeExpressionRefs); } - private static WindowPOperator createWindowPOperator(WindowOperator winOp) throws CompilationException { + private static WindowPOperator createWindowPOperator(WindowOperator winOp, IOptimizationContext context) + throws CompilationException { List<Mutable<ILogicalExpression>> partitionExprs = winOp.getPartitionExpressions(); List<LogicalVariable> partitionColumns = new ArrayList<>(partitionExprs.size()); for (Mutable<ILogicalExpression> pe : partitionExprs) { @@ -377,7 +376,9 @@ boolean nestedTrivialAggregates = winOp.hasNestedPlans() && winOp.getNestedPlans().stream().allMatch(AnalysisUtil::isTrivialAggregateSubplan); + int memSizeInFrames = context.getPhysicalOptimizationConfig().getMaxFramesForWindow(); + return new WindowPOperator(partitionColumns, partitionMaterialization, orderColumns, frameStartIsMonotonic, - frameEndIsMonotonic, nestedTrivialAggregates); + frameEndIsMonotonic, nestedTrivialAggregates, memSizeInFrames); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java index 0bdc987..d155756 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java @@ -130,14 +130,14 @@ public static final String PREFIX_INTERNAL_PARAMETERS = "_internal"; private static final Set<String> CONFIGURABLE_PARAMETER_NAMES = ImmutableSet.of(CompilerProperties.COMPILER_JOINMEMORY_KEY, CompilerProperties.COMPILER_GROUPMEMORY_KEY, - CompilerProperties.COMPILER_SORTMEMORY_KEY, CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY, - CompilerProperties.COMPILER_PARALLELISM_KEY, CompilerProperties.COMPILER_SORT_PARALLEL_KEY, - CompilerProperties.COMPILER_SORT_SAMPLES_KEY, FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, - FuzzyUtils.SIM_FUNCTION_PROP_NAME, FuzzyUtils.SIM_THRESHOLD_PROP_NAME, - StartFeedStatement.WAIT_FOR_COMPLETION, FeedActivityDetails.FEED_POLICY_NAME, - FeedActivityDetails.COLLECT_LOCATIONS, SqlppQueryRewriter.INLINE_WITH_OPTION, - SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION, "hash_merge", "output-record-type", - AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION); + CompilerProperties.COMPILER_SORTMEMORY_KEY, CompilerProperties.COMPILER_WINDOWMEMORY_KEY, + CompilerProperties.COMPILER_TEXTSEARCHMEMORY_KEY, CompilerProperties.COMPILER_PARALLELISM_KEY, + CompilerProperties.COMPILER_SORT_PARALLEL_KEY, CompilerProperties.COMPILER_SORT_SAMPLES_KEY, + FunctionUtil.IMPORT_PRIVATE_FUNCTIONS, FuzzyUtils.SIM_FUNCTION_PROP_NAME, + FuzzyUtils.SIM_THRESHOLD_PROP_NAME, StartFeedStatement.WAIT_FOR_COMPLETION, + FeedActivityDetails.FEED_POLICY_NAME, FeedActivityDetails.COLLECT_LOCATIONS, + SqlppQueryRewriter.INLINE_WITH_OPTION, SqlppExpressionToPlanTranslator.REWRITE_IN_AS_OR_OPTION, + "hash_merge", "output-record-type", AbstractIntroduceAccessMethodRule.NO_INDEX_ONLY_PLAN_OPTION); private final IRewriterFactory rewriterFactory; private final IAstPrintVisitorFactory astPrintVisitorFactory; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java index 2942a95..3fffdac 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java @@ -36,15 +36,17 @@ private final long groupByMemorySize; private final long joinMemorySize; private final long sortMemorySize; + private final long windowMemorySize; private final long textSearchMemorySize; private final long frameSize; public OperatorResourcesComputer(int numComputationPartitions, int sortFrameLimit, int groupFrameLimit, - int joinFrameLimit, int textSearchFrameLimit, long frameSize) { + int joinFrameLimit, int windowFrameLimit, int textSearchFrameLimit, long frameSize) { this.numComputationPartitions = numComputationPartitions; this.groupByMemorySize = groupFrameLimit * frameSize; this.joinMemorySize = joinFrameLimit * frameSize; this.sortMemorySize = sortFrameLimit * frameSize; + this.windowMemorySize = windowFrameLimit * frameSize; this.textSearchMemorySize = textSearchFrameLimit * frameSize; this.frameSize = frameSize; } @@ -145,13 +147,9 @@ private long getWindowRequiredMemory(WindowOperator op) { WindowPOperator physOp = (WindowPOperator) op.getPhysicalOperator(); - int frameCount = 2; - if (physOp.isPartitionMaterialization()) { - frameCount++; - } - if (op.hasNestedPlans()) { - frameCount += 2; - } - return getOperatorRequiredMemory(op, frameSize * frameCount); + // memory budget configuration only applies to window operators that materialize partitions (non-streaming) + // streaming window operators only need 2 frames: output + copy + long memorySize = physOp.isPartitionMaterialization() ? windowMemorySize : 2 * frameSize; + return getOperatorRequiredMemory(op, memorySize); } } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java index d9ead33..0f4c4c0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/ResourceUtils.java @@ -58,10 +58,11 @@ final int sortFrameLimit = physicalOptimizationConfig.getMaxFramesExternalSort(); final int groupFrameLimit = physicalOptimizationConfig.getMaxFramesForGroupBy(); final int joinFrameLimit = physicalOptimizationConfig.getMaxFramesForJoin(); + final int windowFrameLimit = physicalOptimizationConfig.getMaxFramesForWindow(); final int textSearchFrameLimit = physicalOptimizationConfig.getMaxFramesForTextSearch(); final List<PlanStage> planStages = getStages(plan); return getStageBasedRequiredCapacity(planStages, computationLocations.getLocations().length, sortFrameLimit, - groupFrameLimit, joinFrameLimit, textSearchFrameLimit, frameSize); + groupFrameLimit, joinFrameLimit, windowFrameLimit, textSearchFrameLimit, frameSize); } public static List<PlanStage> getStages(ILogicalPlan plan) throws AlgebricksException { @@ -73,9 +74,10 @@ } public static IClusterCapacity getStageBasedRequiredCapacity(List<PlanStage> stages, int computationLocations, - int sortFrameLimit, int groupFrameLimit, int joinFrameLimit, int textSearchFrameLimit, int frameSize) { + int sortFrameLimit, int groupFrameLimit, int joinFrameLimit, int windowFrameLimit, int textSearchFrameLimit, + int frameSize) { final OperatorResourcesComputer computer = new OperatorResourcesComputer(computationLocations, sortFrameLimit, - groupFrameLimit, joinFrameLimit, textSearchFrameLimit, frameSize); + groupFrameLimit, joinFrameLimit, windowFrameLimit, textSearchFrameLimit, frameSize); final IClusterCapacity clusterCapacity = new ClusterCapacity(); final Long maxRequiredMemory = stages.stream().mapToLong(stage -> stage.getRequiredMemory(computer)).max() .orElseThrow(IllegalStateException::new); diff --git a/asterixdb/asterix-app/src/main/resources/cc.conf b/asterixdb/asterix-app/src/main/resources/cc.conf index 8877be8..c9c7bb9 100644 --- a/asterixdb/asterix-app/src/main/resources/cc.conf +++ b/asterixdb/asterix-app/src/main/resources/cc.conf @@ -53,6 +53,7 @@ compiler.groupmemory=160KB compiler.joinmemory=256KB compiler.textsearchmemory=160KB +compiler.windowmemory=192KB compiler.sort.parallel=false messaging.frame.size=4096 messaging.frame.count=512 diff --git a/asterixdb/asterix-app/src/main/resources/cc2.conf b/asterixdb/asterix-app/src/main/resources/cc2.conf index 65dbafc..46f7168 100644 --- a/asterixdb/asterix-app/src/main/resources/cc2.conf +++ b/asterixdb/asterix-app/src/main/resources/cc2.conf @@ -53,6 +53,7 @@ compiler.groupmemory=160KB compiler.joinmemory=256KB compiler.textsearchmemory=160KB +compiler.windowmemory=192KB compiler.parallelism=-1 messaging.frame.size=4096 messaging.frame.count=512 diff --git a/asterixdb/asterix-app/src/main/resources/cc3.conf b/asterixdb/asterix-app/src/main/resources/cc3.conf index 20aa70d..0b26ef3 100644 --- a/asterixdb/asterix-app/src/main/resources/cc3.conf +++ b/asterixdb/asterix-app/src/main/resources/cc3.conf @@ -53,6 +53,7 @@ compiler.groupmemory=160KB compiler.joinmemory=256KB compiler.textsearchmemory=160KB +compiler.windowmemory=192KB compiler.parallelism=3 messaging.frame.size=4096 messaging.frame.count=512 diff --git a/asterixdb/asterix-app/src/main/resources/cc4.conf b/asterixdb/asterix-app/src/main/resources/cc4.conf index 5bdf8ea..6a66d25 100644 --- a/asterixdb/asterix-app/src/main/resources/cc4.conf +++ b/asterixdb/asterix-app/src/main/resources/cc4.conf @@ -50,6 +50,7 @@ compiler.sortmemory=320KB compiler.groupmemory=160KB compiler.joinmemory=256KB +compiler.windowmemory=192KB messaging.frame.size=4096 messaging.frame.count=512 compiler.parallelism=-1 diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java index d3113ca..094009e 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/resource/PlanStagesGeneratorTest.java @@ -301,7 +301,7 @@ private void assertRequiredMemory(List<PlanStage> stages, long expectedMemory) { final IClusterCapacity clusterCapacity = ResourceUtils.getStageBasedRequiredCapacity(stages, PARALLELISM, - FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_SIZE); + FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_LIMIT, FRAME_SIZE); Assert.assertEquals(clusterCapacity.getAggregatedMemoryByteSize(), expectedMemory); } } diff --git a/asterixdb/asterix-app/src/test/resources/cc-compression.conf b/asterixdb/asterix-app/src/test/resources/cc-compression.conf index 904707a..e58d691 100644 --- a/asterixdb/asterix-app/src/test/resources/cc-compression.conf +++ b/asterixdb/asterix-app/src/test/resources/cc-compression.conf @@ -53,5 +53,6 @@ compiler.groupmemory=160KB compiler.joinmemory=256KB compiler.textsearchmemory=160KB +compiler.windowmemory=192KB messaging.frame.size=4096 messaging.frame.count=512 diff --git a/asterixdb/asterix-app/src/test/resources/cc-multipart.conf b/asterixdb/asterix-app/src/test/resources/cc-multipart.conf index 9c64ab4..4d2087b 100644 --- a/asterixdb/asterix-app/src/test/resources/cc-multipart.conf +++ b/asterixdb/asterix-app/src/test/resources/cc-multipart.conf @@ -51,5 +51,6 @@ compiler.sortmemory=320KB compiler.groupmemory=160KB compiler.joinmemory=256KB +compiler.windowmemory=192KB messaging.frame.size=4096 messaging.frame.count=512 diff --git a/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf b/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf index 811a40d..8995e19 100644 --- a/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf +++ b/asterixdb/asterix-app/src/test/resources/cc-small-txn-log-partition.conf @@ -49,6 +49,7 @@ compiler.sortmemory=320KB compiler.groupmemory=160KB compiler.joinmemory=256KB +compiler.windowmemory=192KB messaging.frame.size=4096 messaging.frame.count=512 txn.log.partitionsize=2MB diff --git a/asterixdb/asterix-app/src/test/resources/cc-ssl.conf b/asterixdb/asterix-app/src/test/resources/cc-ssl.conf index ea00513..db04d2b 100644 --- a/asterixdb/asterix-app/src/test/resources/cc-ssl.conf +++ b/asterixdb/asterix-app/src/test/resources/cc-ssl.conf @@ -64,6 +64,7 @@ compiler.groupmemory=160KB compiler.joinmemory=256KB compiler.textsearchmemory=160KB +compiler.windowmemory=192KB messaging.frame.size=4096 messaging.frame.count=512 ssl.enabled=true \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/cc-storage.conf b/asterixdb/asterix-app/src/test/resources/cc-storage.conf index b6bed24..cf748c6 100644 --- a/asterixdb/asterix-app/src/test/resources/cc-storage.conf +++ b/asterixdb/asterix-app/src/test/resources/cc-storage.conf @@ -51,6 +51,7 @@ compiler.groupmemory=160KB compiler.joinmemory=256KB compiler.textsearchmemory=160KB +compiler.windowmemory=192KB messaging.frame.size=4096 messaging.frame.count=512 txn.log.checkpoint.pollfrequency=10 diff --git a/asterixdb/asterix-app/src/test/resources/cc-stringoffset.conf b/asterixdb/asterix-app/src/test/resources/cc-stringoffset.conf index ef38fc2..1d05c28 100644 --- a/asterixdb/asterix-app/src/test/resources/cc-stringoffset.conf +++ b/asterixdb/asterix-app/src/test/resources/cc-stringoffset.conf @@ -53,6 +53,7 @@ compiler.sortmemory=320KB compiler.groupmemory=160KB compiler.joinmemory=256KB +compiler.windowmemory=192KB compiler.stringoffset=1 messaging.frame.size=4096 messaging.frame.count=512 diff --git a/asterixdb/asterix-app/src/test/resources/cc.conf b/asterixdb/asterix-app/src/test/resources/cc.conf index 2694408..adb71a5 100644 --- a/asterixdb/asterix-app/src/test/resources/cc.conf +++ b/asterixdb/asterix-app/src/test/resources/cc.conf @@ -53,5 +53,6 @@ compiler.groupmemory=160KB compiler.joinmemory=256KB compiler.textsearchmemory=160KB +compiler.windowmemory=192KB messaging.frame.size=4096 messaging.frame.count=512 diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ntile_01/ntile_01.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ntile_01/ntile_01.5.query.sqlpp index 5655f18..e58ba11 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ntile_01/ntile_01.5.query.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ntile_01/ntile_01.5.query.sqlpp @@ -21,6 +21,8 @@ * Expected Res : SUCCESS */ +set `compiler.windowmemory` "256KB"; + use test; q1_ntile(10, 1000, 4) diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.5.query.sqlpp index 3120eb3..d477713 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.5.query.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.5.query.sqlpp @@ -21,6 +21,8 @@ * Expected Res : SUCCESS */ +set `compiler.windowmemory` "192KB"; + use test; q1_percent_rank(10, 1000, 3) diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.6.query.sqlpp index 8f38718..8b92104 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.6.query.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/percent_rank_01/percent_rank_01.6.query.sqlpp @@ -21,6 +21,8 @@ * Expected Res : SUCCESS */ +set `compiler.windowmemory` "512KB"; + use test; from q0_rnd() rnd diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.12.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.12.query.sqlpp index d73a7f4..36f40c6 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.12.query.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.12.query.sqlpp @@ -21,6 +21,8 @@ * Expected Res : SUCCESS */ +set `compiler.windowmemory` "224KB"; + use test; FROM tenk1 diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ntile_02/ntile_02.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.1.query.sqlpp similarity index 100% rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ntile_02/ntile_02.1.query.sqlpp rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.1.query.sqlpp diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ratio_to_report_02_negative/ratio_to_report_02_negative.1.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.2.query.sqlpp similarity index 100% rename from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/ratio_to_report_02_negative/ratio_to_report_02_negative.1.query.sqlpp rename to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.2.query.sqlpp diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.3.query.sqlpp new file mode 100644 index 0000000..23be774 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_negative/win_negative.3.query.sqlpp @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : Test illegal value for 'compiler.windowmemory' + * : Must be at least 160KB (5 frames) + * Expected Res : FAILURE + */ + +set `compiler.windowmemory` "100KB"; + +from range(1, 10) t +select t, first_value(t) over(order by t) \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp index 2d561b9..1e4d076 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.2.query.sqlpp @@ -22,6 +22,9 @@ * Expected Res : SUCCESS */ +/* 1 frame for partition writer */ +set `compiler.windowmemory` "160KB"; + use test; q1_sum_1_preceding_1_following(10); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp index fec6158..4546867 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.3.query.sqlpp @@ -22,6 +22,9 @@ * Expected Res : SUCCESS */ +/* 1 frame for partition writer */ +set `compiler.windowmemory` "160KB"; + use test; q1_sum_1_preceding_1_following(10000); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp index 84b5234..d772965 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.4.query.sqlpp @@ -22,6 +22,9 @@ * Expected Res : SUCCESS */ +/* 2 frames for partition writer */ +set `compiler.windowmemory` "192KB"; + use test; with N as 10000, W as 5000 diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp index 8a4374f..8c0c6b9 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.5.query.sqlpp @@ -23,6 +23,9 @@ * Expected Res : SUCCESS */ +/* 2 frames for partition writer */ +set `compiler.windowmemory` "192KB"; + use test; with N as 10000, W as 5000 diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp index 91c0a31..5f5c508 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.6.query.sqlpp @@ -23,6 +23,9 @@ * Expected Res : SUCCESS */ +/* 2 frames for partition writer */ +set `compiler.windowmemory` "192KB"; + use test; with N as 10000, W as 5000 diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.9.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.9.query.sqlpp index ad6913c..a2d3a53 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.9.query.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/win_opt_02/win_opt_02.9.query.sqlpp @@ -22,6 +22,9 @@ * Expected Res : SUCCESS */ +/* 1 frame for partition writer */ +set `compiler.windowmemory` "160KB"; + use test; q2_max_unbounded_preceding_n_following(5000); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm index da464c7..2bdb6c1 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm @@ -16,6 +16,7 @@ "compiler\.sort\.samples" : 100, "compiler\.sortmemory" : 327680, "compiler\.textsearchmemory" : 163840, + "compiler\.windowmemory" : 196608, "default\.dir" : "target/io/dir/asterixdb", "log\.dir" : "logs/", "log\.level" : "INFO", diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm index fa8f48e..599d6b4 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm @@ -16,6 +16,7 @@ "compiler\.sort\.samples" : 100, "compiler\.sortmemory" : 327680, "compiler\.textsearchmemory" : 163840, + "compiler\.windowmemory" : 196608, "default\.dir" : "target/io/dir/asterixdb", "log\.dir" : "logs/", "log\.level" : "WARN", diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm index 801900c..56d9dd9 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm @@ -16,6 +16,7 @@ "compiler\.sort\.samples" : 100, "compiler\.sortmemory" : 327680, "compiler\.textsearchmemory" : 163840, + "compiler\.windowmemory" : 196608, "default\.dir" : "target/io/dir/asterixdb", "log\.dir" : "logs/", "log\.level" : "WARN", diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index d368b45..c905e0c 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@ -9278,13 +9278,6 @@ </compilation-unit> </test-case> <test-case FilePath="window"> - <compilation-unit name="ntile_02"> - <output-dir compare="Text">ntile_01</output-dir> - <expected-error>ASX0002: Type mismatch</expected-error> - <source-location>false</source-location> - </compilation-unit> - </test-case> - <test-case FilePath="window"> <compilation-unit name="percent_rank_01"> <output-dir compare="Text">percent_rank_01</output-dir> </compilation-unit> @@ -9305,14 +9298,17 @@ </compilation-unit> </test-case> <test-case FilePath="window"> - <compilation-unit name="ratio_to_report_02_negative"> - <output-dir compare="Text">ratio_to_report_01</output-dir> - <expected-error>ASX1101: Unexpected ORDER BY clause in window expression</expected-error> + <compilation-unit name="row_number_01"> + <output-dir compare="Text">row_number_01</output-dir> </compilation-unit> </test-case> <test-case FilePath="window"> - <compilation-unit name="row_number_01"> - <output-dir compare="Text">row_number_01</output-dir> + <compilation-unit name="win_negative"> + <output-dir compare="Text">misc_01</output-dir> + <expected-error>ASX0002: Type mismatch</expected-error> + <expected-error>ASX1101: Unexpected ORDER BY clause in window expression</expected-error> + <expected-error>ASX1037: Invalid query parameter compiler.windowmemory</expected-error> + <source-location>false</source-location> </compilation-unit> </test-case> <test-case FilePath="window"> diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java index 4bfbf11..7c67b96 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CompilerProperties.java @@ -43,6 +43,10 @@ LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(32L, MEGABYTE), "The memory budget (in bytes) for a group by operator instance in a partition"), + COMPILER_WINDOWMEMORY( + LONG_BYTE_UNIT, + StorageUtil.getLongSizeInBytes(4L, MEGABYTE), + "The memory budget (in bytes) for a window operator instance in a partition"), COMPILER_TEXTSEARCHMEMORY( LONG_BYTE_UNIT, StorageUtil.getLongSizeInBytes(32L, MEGABYTE), @@ -108,6 +112,8 @@ public static final String COMPILER_JOINMEMORY_KEY = Option.COMPILER_JOINMEMORY.ini(); + public static final String COMPILER_WINDOWMEMORY_KEY = Option.COMPILER_WINDOWMEMORY.ini(); + public static final String COMPILER_TEXTSEARCHMEMORY_KEY = Option.COMPILER_TEXTSEARCHMEMORY.ini(); public static final String COMPILER_PARALLELISM_KEY = Option.COMPILER_PARALLELISM.ini(); @@ -134,6 +140,10 @@ return accessor.getLong(Option.COMPILER_GROUPMEMORY); } + public long getWindowMemorySize() { + return accessor.getLong(Option.COMPILER_WINDOWMEMORY); + } + public long getTextSearchMemorySize() { return accessor.getLong(Option.COMPILER_TEXTSEARCHMEMORY); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java index 9269b5e..e8643cd 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/OptimizationConfUtil.java @@ -33,6 +33,8 @@ private static final int MIN_FRAME_LIMIT_FOR_SORT = 3; private static final int MIN_FRAME_LIMIT_FOR_GROUP_BY = 4; private static final int MIN_FRAME_LIMIT_FOR_JOIN = 5; + // 1 (output) + 1 (input copy) + 1 (partition writer) + 2 (seekable partition reader) + private static final int MIN_FRAME_LIMIT_FOR_WINDOW = 5; // one for query, two for intermediate results, one for final result, and one for reading an inverted list private static final int MIN_FRAME_LIMIT_FOR_TEXT_SEARCH = 5; @@ -49,6 +51,9 @@ int joinFrameLimit = getFrameLimit(CompilerProperties.COMPILER_JOINMEMORY_KEY, (String) querySpecificConfig.get(CompilerProperties.COMPILER_JOINMEMORY_KEY), compilerProperties.getJoinMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_JOIN, sourceLoc); + int windowFrameLimit = getFrameLimit(CompilerProperties.COMPILER_WINDOWMEMORY_KEY, + (String) querySpecificConfig.get(CompilerProperties.COMPILER_WINDOWMEMORY_KEY), + compilerProperties.getWindowMemorySize(), frameSize, MIN_FRAME_LIMIT_FOR_WINDOW, sourceLoc); int textSearchFrameLimit = getTextSearchNumFrames(compilerProperties, querySpecificConfig, sourceLoc); int sortNumSamples = getSortSamples(compilerProperties, querySpecificConfig, sourceLoc); boolean fullParallelSort = getSortParallel(compilerProperties, querySpecificConfig); @@ -58,6 +63,7 @@ physOptConf.setMaxFramesExternalSort(sortFrameLimit); physOptConf.setMaxFramesExternalGroupBy(groupFrameLimit); physOptConf.setMaxFramesForJoin(joinFrameLimit); + physOptConf.setMaxFramesForWindow(windowFrameLimit); physOptConf.setMaxFramesForTextSearch(textSearchFrameLimit); physOptConf.setSortParallel(fullParallelSort); physOptConf.setSortSamples(sortNumSamples); diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java index 1d8c47c..9a5f6d6 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/WindowPOperator.java @@ -82,15 +82,19 @@ private final boolean nestedTrivialAggregates; + // The maximum number of in-memory frames that this operator can use. + private final int memSizeInFrames; + public WindowPOperator(List<LogicalVariable> partitionColumns, boolean partitionMaterialization, List<OrderColumn> orderColumns, boolean frameStartIsMonotonic, boolean frameEndIsMonotonic, - boolean nestedTrivialAggregates) { + boolean nestedTrivialAggregates, int memSizeInFrames) { this.partitionColumns = partitionColumns; this.partitionMaterialization = partitionMaterialization; this.orderColumns = orderColumns; this.frameStartIsMonotonic = frameStartIsMonotonic; this.frameEndIsMonotonic = frameEndIsMonotonic; this.nestedTrivialAggregates = nestedTrivialAggregates; + this.memSizeInFrames = memSizeInFrames; } @Override @@ -227,7 +231,7 @@ runtime = new WindowNestedPlansUnboundedRuntimeFactory(partitionColumnsList, partitionComparatorFactories, orderComparatorFactories, frameMaxObjects, projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, - aggregatorOutputSchemaSize, nestedAggFactory); + aggregatorOutputSchemaSize, nestedAggFactory, memSizeInFrames); } else if (frameEndIsMonotonic && nestedTrivialAggregates) { // special case #2: accumulating frame from beginning of the partition, no exclusions, no offset, // trivial aggregate subplan ( aggregate + nts ) @@ -236,7 +240,8 @@ partitionComparatorFactories, orderComparatorFactories, frameValueExprEvalsAndComparators.first, frameValueExprEvalsAndComparators.second, frameEndExprEvals, frameMaxObjects, projectionColumnsExcludingSubplans, - runningAggOutColumns, runningAggFactories, aggregatorOutputSchemaSize, nestedAggFactory); + runningAggOutColumns, runningAggFactories, aggregatorOutputSchemaSize, nestedAggFactory, + memSizeInFrames); } } // default case @@ -248,12 +253,12 @@ winOp.getFrameExcludeNegationStartIdx(), frameExcludeExprEvalsAndComparators.second, frameOffsetExprEval, context.getBinaryIntegerInspectorFactory(), frameMaxObjects, projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, - aggregatorOutputSchemaSize, nestedAggFactory); + aggregatorOutputSchemaSize, nestedAggFactory, memSizeInFrames); } } else if (partitionMaterialization) { runtime = new WindowMaterializingRuntimeFactory(partitionColumnsList, partitionComparatorFactories, orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns, - runningAggFactories); + runningAggFactories, memSizeInFrames); } else { runtime = new WindowSimpleRuntimeFactory(partitionColumnsList, partitionComparatorFactories, orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns, diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java index d879d36..f9ea0c4 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java @@ -30,6 +30,7 @@ private static final String MAX_FRAMES_EXTERNAL_GROUP_BY = "MAX_FRAMES_EXTERNAL_GROUP_BY"; private static final String MAX_FRAMES_FOR_JOIN_LEFT_INPUT = "MAX_FRAMES_FOR_JOIN_LEFT_INPUT"; private static final String MAX_FRAMES_FOR_JOIN = "MAX_FRAMES_FOR_JOIN"; + private static final String MAX_FRAMES_FOR_WINDOW = "MAX_FRAMES_FOR_WINDOW"; private static final String MAX_FRAMES_FOR_TEXTSEARCH = "MAX_FRAMES_FOR_TEXTSEARCH"; private static final String FUDGE_FACTOR = "FUDGE_FACTOR"; private static final String MAX_RECORDS_PER_FRAME = "MAX_RECORDS_PER_FRAME"; @@ -113,6 +114,15 @@ setInt(MAX_FRAMES_EXTERNAL_SORT, frameLimit); } + public int getMaxFramesForWindow() { + int frameSize = getFrameSize(); + return getInt(MAX_FRAMES_FOR_WINDOW, (int) (((long) 4 * MB) / frameSize)); + } + + public void setMaxFramesForWindow(int frameLimit) { + setInt(MAX_FRAMES_FOR_WINDOW, frameLimit); + } + public int getMaxFramesForTextSearch() { int frameSize = getFrameSize(); return getInt(MAX_FRAMES_FOR_TEXTSEARCH, (int) (((long) 32 * MB) / frameSize)); diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java index 0449cf5..b6cd0eb 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/AbstractRunningAggregatePushRuntime.java @@ -86,10 +86,11 @@ return new ArrayTupleBuilder(projectionList.length); } - protected void produceTuples(IFrameTupleAccessor accessor, int beginIdx, int endIdx) throws HyracksDataException { + protected void produceTuples(IFrameTupleAccessor accessor, int beginIdx, int endIdx, FrameTupleReference tupleRef) + throws HyracksDataException { for (int t = beginIdx; t <= endIdx; t++) { - tRef.reset(accessor, t); - produceTuple(tupleBuilder, accessor, t, tRef); + tupleRef.reset(accessor, t); + produceTuple(tupleBuilder, accessor, t, tupleRef); appendToFrameFromTupleBuilder(tupleBuilder); } } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java index 4ca166f..fd3e97d 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggrun/RunningAggregatePushRuntime.java @@ -36,6 +36,6 @@ @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { tAccess.reset(buffer); - produceTuples(tAccess, 0, tAccess.getTupleCount() - 1); + produceTuples(tAccess, 0, tAccess.getTupleCount() - 1, tRef); } } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java index 9adeb4d..807f6bf 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java @@ -26,6 +26,7 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.primitive.VoidPointable; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; @@ -49,9 +50,10 @@ IBinaryComparatorFactory[] partitionComparatorFactories, IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize, - WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) { + WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx, int memSizeInFrames, + SourceLocation sourceLoc) { super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns, - runningAggOutColumns, runningAggFactories, ctx); + runningAggOutColumns, runningAggFactories, ctx, memSizeInFrames, sourceLoc); this.nestedAggFactory = nestedAggFactory; this.nestedAggOutSchemaSize = nestedAggOutSchemaSize; } @@ -75,23 +77,23 @@ /** * Aggregator created by - * {@link WindowAggregatorDescriptorFactory#createAggregator(IHyracksTaskContext, RecordDescriptor, RecordDescriptor, int[], int[], long) - * WindowAggregatorDescriptorFactory.createAggregator(...)} + * {@link WindowAggregatorDescriptorFactory#createAggregator(IHyracksTaskContext, RecordDescriptor, + * RecordDescriptor, int[], int[], long) WindowAggregatorDescriptorFactory.createAggregator(...)} * does not process argument tuple in init() */ - void nestedAggInit() throws HyracksDataException { + final void nestedAggInit() throws HyracksDataException { nestedAgg.init(null, null, -1, null); } - void nestedAggAggregate(FrameTupleAccessor tAccess, int tIndex) throws HyracksDataException { + final void nestedAggAggregate(FrameTupleAccessor tAccess, int tIndex) throws HyracksDataException { nestedAgg.aggregate(tAccess, tIndex, null, -1, null); } - void nestedAggOutputFinalResult(ArrayTupleBuilder outTupleBuilder) throws HyracksDataException { + final void nestedAggOutputFinalResult(ArrayTupleBuilder outTupleBuilder) throws HyracksDataException { nestedAgg.outputFinalResult(outTupleBuilder, null, -1, null); } - void nestedAggOutputPartialResult(ArrayTupleBuilder outTupleBuilder) throws HyracksDataException { + final void nestedAggOutputPartialResult(ArrayTupleBuilder outTupleBuilder) throws HyracksDataException { nestedAgg.outputPartialResult(outTupleBuilder, null, -1, null); } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansRuntimeFactory.java index 53857ac..0b8cc06 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansRuntimeFactory.java @@ -25,7 +25,7 @@ /** * Base class for window runtime factories that compute nested aggregates */ -abstract class AbstractWindowNestedPlansRuntimeFactory extends AbstractWindowRuntimeFactory { +abstract class AbstractWindowNestedPlansRuntimeFactory extends WindowMaterializingRuntimeFactory { private static final long serialVersionUID = 1L; @@ -37,9 +37,9 @@ IBinaryComparatorFactory[] partitionComparatorFactories, IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories, - int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory) { + int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, int memSizeInFrames) { super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, - projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories); + projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, memSizeInFrames); this.nestedAggFactory = nestedAggFactory; this.nestedAggOutSchemaSize = nestedAggOutSchemaSize; } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java index a63aaf8..9cc25d0 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java @@ -30,12 +30,14 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter; public abstract class AbstractWindowPushRuntime extends AbstractRunningAggregatePushRuntime<IWindowAggregateEvaluator> { + protected final SourceLocation sourceLoc; private final int[] partitionColumns; private final IBinaryComparatorFactory[] partitionComparatorFactories; private IBinaryComparator[] partitionComparators; @@ -48,11 +50,20 @@ AbstractWindowPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories, IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns, - IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx) { + IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx, + SourceLocation sourceLoc) { super(projectionColumns, runningAggOutColumns, runningAggFactories, IWindowAggregateEvaluator.class, ctx); this.partitionColumns = partitionColumns; this.partitionComparatorFactories = partitionComparatorFactories; this.orderComparatorFactories = orderComparatorFactories; + this.sourceLoc = sourceLoc; + } + + /** + * Number of frames reserved by this operator: {@link #frame}, {@link #copyFrame} + */ + int getReservedFrameCount() { + return 2; } @Override @@ -78,7 +89,7 @@ @Override public void close() throws HyracksDataException { - if (inPartition) { + if (inPartition && !failed) { endPartition(); } super.close(); diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java index 4e97d6c..889803a 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingPushRuntime.java @@ -22,16 +22,11 @@ import java.nio.ByteBuffer; import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; -import org.apache.hyracks.api.comm.FrameHelper; import org.apache.hyracks.api.comm.IFrame; -import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; -import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; -import org.apache.hyracks.dataflow.common.io.RunFileWriter; +import org.apache.hyracks.api.exceptions.SourceLocation; import org.apache.hyracks.storage.common.arraylist.IntArrayList; /** @@ -40,46 +35,42 @@ */ class WindowMaterializingPushRuntime extends AbstractWindowPushRuntime { + private final int memSizeInFrames; + private long partitionLength; - IFrame curFrame; + private WindowPartitionWriter partitionWriter; - private long curFrameId; + WindowPartitionReader partitionReader; private int chunkBeginIdx; private IntArrayList chunkEndIdx; - private RunFileWriter run; - - private long runLastFrameId; - WindowMaterializingPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories, IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns, - IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx) { + IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx, int memSizeInFrames, + SourceLocation sourceLoc) { super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns, - runningAggOutColumns, runningAggFactories, ctx); - } - - @Override - public void open() throws HyracksDataException { - super.open(); - run = null; - curFrameId = -1; + runningAggOutColumns, runningAggFactories, ctx, sourceLoc); + this.memSizeInFrames = memSizeInFrames; } @Override protected void init() throws HyracksDataException { super.init(); - curFrame = new VSizeFrame(ctx); + String runFilePrefix = getClass().getName(); + partitionWriter = new WindowPartitionWriter(ctx, memSizeInFrames - getReservedFrameCount(), runFilePrefix, + getPartitionReaderSlotCount(), sourceLoc); + partitionReader = partitionWriter.getReader(); chunkEndIdx = new IntArrayList(128, 128); } @Override public void close() throws HyracksDataException { super.close(); - if (run != null) { - run.erase(); + if (partitionWriter != null) { + partitionWriter.close(); } } @@ -87,41 +78,17 @@ protected void beginPartitionImpl() throws HyracksDataException { chunkEndIdx.clear(); partitionLength = 0; - if (run != null) { - run.rewind(); - } + partitionWriter.reset(); } @Override protected void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, int tBeginIdx, int tEndIdx) throws HyracksDataException { - // save the frame. first one to memory, remaining ones to the run file boolean isFirstChunk = chunkEndIdx.isEmpty(); + partitionWriter.nextFrame(frameId, frameBuffer); if (isFirstChunk) { - if (frameId != curFrameId) { - int pos = frameBuffer.position(); - curFrame.ensureFrameSize(frameBuffer.capacity()); - FrameUtils.copyAndFlip(frameBuffer, curFrame.getBuffer()); - frameBuffer.position(pos); - curFrameId = frameId; - } chunkBeginIdx = tBeginIdx; - } else { - if (tBeginIdx != 0) { - throw new IllegalStateException(String.valueOf(tBeginIdx)); - } - if (run == null) { - FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(getClass().getSimpleName()); - run = new RunFileWriter(file, ctx.getIoManager()); - run.open(); - } - int pos = frameBuffer.position(); - frameBuffer.position(0); - run.nextFrame(frameBuffer); - frameBuffer.position(pos); - runLastFrameId = frameId; } - chunkEndIdx.add(tEndIdx); partitionLength += tEndIdx - tBeginIdx + 1; } @@ -130,40 +97,32 @@ protected void endPartitionImpl() throws HyracksDataException { runningAggInitPartition(partitionLength); - int nChunks = getPartitionChunkCount(); - if (nChunks == 1) { - producePartitionTuples(0, null); - } else { - GeneratedRunFileReader reader = run.createReader(); - reader.open(); - try { - for (int chunkIdx = 0; chunkIdx < nChunks; chunkIdx++) { - if (chunkIdx > 0) { - reader.nextFrame(curFrame); - } - producePartitionTuples(chunkIdx, reader); - } - curFrameId = runLastFrameId; - } finally { - reader.close(); - } + partitionReader.open(); + for (int chunkIdx = 0, nChunks = getPartitionChunkCount(); chunkIdx < nChunks; chunkIdx++) { + IFrame chunkFrame = partitionReader.nextFrame(true); + producePartitionTuples(chunkIdx, chunkFrame); } + partitionReader.close(); } - protected void producePartitionTuples(int chunkIdx, GeneratedRunFileReader reader) throws HyracksDataException { - tAccess.reset(curFrame.getBuffer()); - produceTuples(tAccess, getTupleBeginIdx(chunkIdx), getTupleEndIdx(chunkIdx)); + void producePartitionTuples(int chunkIdx, IFrame chunkFrame) throws HyracksDataException { + tAccess.reset(chunkFrame.getBuffer()); + produceTuples(tAccess, getTupleBeginIdx(chunkIdx), getTupleEndIdx(chunkIdx), tRef); } - int getPartitionChunkCount() { + final int getPartitionChunkCount() { return chunkEndIdx.size(); } - int getTupleBeginIdx(int chunkIdx) { + final int getTupleBeginIdx(int chunkIdx) { return chunkIdx == 0 ? chunkBeginIdx : 0; } - int getTupleEndIdx(int chunkIdx) { + final int getTupleEndIdx(int chunkIdx) { return chunkEndIdx.get(chunkIdx); } + + int getPartitionReaderSlotCount() { + return -1; // forward only reader by default + } } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java index 1b02fb1..8bb3147 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowMaterializingRuntimeFactory.java @@ -34,18 +34,22 @@ private static final long serialVersionUID = 1L; + final int memSizeInFrames; + public WindowMaterializingRuntimeFactory(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories, IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumnsExcludingSubplans, - int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories) { + int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories, int memSizeInFrames) { super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories); + this.memSizeInFrames = memSizeInFrames; } @Override public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) { return new WindowMaterializingPushRuntime(partitionColumns, partitionComparatorFactories, - orderComparatorFactories, projectionList, runningAggOutColumns, runningAggFactories, ctx); + orderComparatorFactories, projectionList, runningAggOutColumns, runningAggFactories, ctx, + memSizeInFrames, sourceLoc); } @Override diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java index cb4f534..aa9d402 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java @@ -19,27 +19,23 @@ package org.apache.hyracks.algebricks.runtime.operators.win; -import java.nio.ByteBuffer; - import org.apache.hyracks.algebricks.data.IBinaryIntegerInspector; import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory; import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.api.comm.IFrame; -import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IBinaryComparator; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.primitive.VoidPointable; import org.apache.hyracks.data.std.util.DataUtils; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference; -import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; import org.apache.hyracks.storage.common.MultiComparator; /** @@ -47,6 +43,14 @@ * as well as regular aggregates (in nested plans) over window frames. */ class WindowNestedPlansPushRuntime extends AbstractWindowNestedPlansPushRuntime { + + private static final int PARTITION_POSITION_SLOT = 0; + + private static final int FRAME_POSITION_SLOT = 1; + + private static final int TMP_POSITION_SLOT = 2; + + private static final int PARTITION_READER_SLOT_COUNT = TMP_POSITION_SLOT + 1; private final boolean frameValueExists; @@ -106,14 +110,6 @@ private final int frameMaxObjects; - private IFrame copyFrame2; - - private IFrame runFrame; - - private int runFrameChunkId; - - private long runFrameSize; - private FrameTupleAccessor tAccess2; private FrameTupleReference tRef2; @@ -124,8 +120,6 @@ private int tBeginIdxFrameStartGlobal; - private long readerPosFrameStartGlobal; - WindowNestedPlansPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories, IBinaryComparatorFactory[] orderComparatorFactories, IScalarEvaluatorFactory[] frameValueEvalFactories, IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameStartEvalFactories, @@ -134,9 +128,11 @@ IBinaryComparatorFactory[] frameExcludeComparatorFactories, IScalarEvaluatorFactory frameOffsetEvalFactory, IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory, int frameMaxObjects, int[] projectionColumns, int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories, - int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) { + int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx, + int memSizeInFrames, SourceLocation sourceLoc) { super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns, - runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx); + runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx, + memSizeInFrames, sourceLoc); this.frameValueEvalFactories = frameValueEvalFactories; this.frameValueExists = frameValueEvalFactories != null && frameValueEvalFactories.length > 0; this.frameStartEvalFactories = frameStartEvalFactories; @@ -158,7 +154,6 @@ @Override protected void init() throws HyracksDataException { super.init(); - if (frameValueExists) { frameValueEvals = createEvaluators(frameValueEvalFactories, ctx); frameValueComparators = MultiComparator.create(frameValueComparatorFactories); @@ -183,9 +178,6 @@ frameOffsetPointable = VoidPointable.FACTORY.createPointable(); bii = binaryIntegerInspectorFactory.createBinaryIntegerInspector(ctx); } - - runFrame = new VSizeFrame(ctx); - copyFrame2 = new VSizeFrame(ctx); tAccess2 = new FrameTupleAccessor(inputRecordDesc); tRef2 = new FrameTupleReference(); } @@ -195,31 +187,22 @@ super.beginPartitionImpl(); chunkIdxFrameStartGlobal = -1; tBeginIdxFrameStartGlobal = -1; - readerPosFrameStartGlobal = -1; - runFrameChunkId = -1; } @Override - protected void producePartitionTuples(int chunkIdx, GeneratedRunFileReader reader) throws HyracksDataException { - boolean frameStartForward = frameStartIsMonotonic && chunkIdxFrameStartGlobal >= 0; + protected void producePartitionTuples(int chunkIdx, IFrame chunkFrame) throws HyracksDataException { + partitionReader.savePosition(PARTITION_POSITION_SLOT); - long readerPos = -1; int nChunks = getPartitionChunkCount(); - if (nChunks > 1) { - readerPos = reader.position(); - if (chunkIdx == 0) { - ByteBuffer curFrameBuffer = curFrame.getBuffer(); - int pos = curFrameBuffer.position(); - copyFrame2.ensureFrameSize(curFrameBuffer.capacity()); - FrameUtils.copyAndFlip(curFrameBuffer, copyFrame2.getBuffer()); - curFrameBuffer.position(pos); - } - } + boolean isFirstChunkInPartition = chunkIdx == 0; - tAccess.reset(curFrame.getBuffer()); + tAccess.reset(chunkFrame.getBuffer()); int tBeginIdx = getTupleBeginIdx(chunkIdx); int tEndIdx = getTupleEndIdx(chunkIdx); + for (int tIdx = tBeginIdx; tIdx <= tEndIdx; tIdx++) { + boolean isFirstTupleInPartition = isFirstChunkInPartition && tIdx == tBeginIdx; + tRef.reset(tAccess, tIdx); // running aggregates @@ -245,34 +228,23 @@ nestedAggInit(); + boolean frameStartForward = frameStartIsMonotonic && chunkIdxFrameStartGlobal >= 0; int chunkIdxInnerStart = frameStartForward ? chunkIdxFrameStartGlobal : 0; int tBeginIdxInnerStart = frameStartForward ? tBeginIdxFrameStartGlobal : -1; - if (nChunks > 1) { - reader.seek(frameStartForward ? readerPosFrameStartGlobal : 0); + + if (chunkIdxInnerStart < nChunks) { + if (frameStartForward && !isFirstTupleInPartition) { + partitionReader.restorePosition(FRAME_POSITION_SLOT); + } else { + partitionReader.rewind(); + } } int chunkIdxFrameStartLocal = -1, tBeginIdxFrameStartLocal = -1; - long readerPosFrameStartLocal = -1; frame_loop: for (int chunkIdxInner = chunkIdxInnerStart; chunkIdxInner < nChunks; chunkIdxInner++) { - long readerPosFrameInner; - IFrame frameInner; - if (chunkIdxInner == 0) { - // first chunk's frame is always in memory - frameInner = chunkIdx == 0 ? curFrame : copyFrame2; - readerPosFrameInner = 0; - } else { - readerPosFrameInner = reader.position(); - if (runFrameChunkId == chunkIdxInner) { - // runFrame has this chunk, so just advance the reader - reader.seek(readerPosFrameInner + runFrameSize); - } else { - reader.nextFrame(runFrame); - runFrameSize = reader.position() - readerPosFrameInner; - runFrameChunkId = chunkIdxInner; - } - frameInner = runFrame; - } + partitionReader.savePosition(TMP_POSITION_SLOT); + IFrame frameInner = partitionReader.nextFrame(false); tAccess2.reset(frameInner.getBuffer()); int tBeginIdxInner; @@ -294,17 +266,19 @@ // skip if value < start continue; } + // inside the frame if (chunkIdxFrameStartLocal < 0) { - // save position of the first tuple that matches the frame start. - // we'll continue from it in the next frame iteration + // save position of the first tuple in this frame + // will continue from it in the next frame iteration chunkIdxFrameStartLocal = chunkIdxInner; tBeginIdxFrameStartLocal = tIdxInner; - readerPosFrameStartLocal = readerPosFrameInner; + partitionReader.copyPosition(TMP_POSITION_SLOT, FRAME_POSITION_SLOT); } } if (frameEndExists && frameValueComparators.compare(frameValuePointables, frameEndPointables) > 0) { - // skip and exit if value > end + // value > end => beyond the frame end + // exit the frame loop break frame_loop; } } @@ -331,27 +305,22 @@ } } - nestedAggOutputFinalResult(tupleBuilder); - appendToFrameFromTupleBuilder(tupleBuilder); - if (frameStartIsMonotonic) { - frameStartForward = true; if (chunkIdxFrameStartLocal >= 0) { chunkIdxFrameStartGlobal = chunkIdxFrameStartLocal; tBeginIdxFrameStartGlobal = tBeginIdxFrameStartLocal; - readerPosFrameStartGlobal = readerPosFrameStartLocal; } else { - // frame start not found, set start beyond the last chunk + // frame start not found, set it beyond the last chunk chunkIdxFrameStartGlobal = nChunks; tBeginIdxFrameStartGlobal = 0; - readerPosFrameStartGlobal = 0; } } + + nestedAggOutputFinalResult(tupleBuilder); + appendToFrameFromTupleBuilder(tupleBuilder); } - if (nChunks > 1) { - reader.seek(readerPos); - } + partitionReader.restorePosition(PARTITION_POSITION_SLOT); } private boolean isExcluded() throws HyracksDataException { @@ -368,4 +337,9 @@ } return true; } + + @Override + protected int getPartitionReaderSlotCount() { + return PARTITION_READER_SLOT_COUNT; + } } \ No newline at end of file diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java index e550d65..fe7e93f 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java @@ -19,21 +19,17 @@ package org.apache.hyracks.algebricks.runtime.operators.win; -import java.nio.ByteBuffer; - import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.api.comm.IFrame; -import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference; -import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; import org.apache.hyracks.storage.common.MultiComparator; /** @@ -41,7 +37,15 @@ * as well as regular aggregates (in nested plans) over accumulating window frames * (unbounded preceding to current row or N following). */ -class WindowNestedPlansRunningPushRuntime extends AbstractWindowNestedPlansPushRuntime { +final class WindowNestedPlansRunningPushRuntime extends AbstractWindowNestedPlansPushRuntime { + + private static final int PARTITION_POSITION_SLOT = 0; + + private static final int FRAME_POSITION_SLOT = 1; + + private static final int TMP_POSITION_SLOT = 2; + + private static final int PARTITION_READER_SLOT_COUNT = TMP_POSITION_SLOT + 1; private final IScalarEvaluatorFactory[] frameValueEvalFactories; @@ -61,14 +65,6 @@ private final int frameMaxObjects; - private IFrame copyFrame2; - - private IFrame runFrame; - - private int runFrameChunkId; - - private long runFrameSize; - private FrameTupleAccessor tAccess2; private FrameTupleReference tRef2; @@ -77,8 +73,6 @@ private int tBeginIdxFrameEndGlobal; - private long readerPosFrameEndGlobal; - private int toWrite; WindowNestedPlansRunningPushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories, @@ -86,9 +80,11 @@ IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameEndEvalFactories, int frameMaxObjects, int[] projectionColumns, int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize, - WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) { + WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx, int memSizeInFrames, + SourceLocation sourceLoc) { super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns, - runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx); + runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx, + memSizeInFrames, sourceLoc); this.frameValueEvalFactories = frameValueEvalFactories; this.frameEndEvalFactories = frameEndEvalFactories; this.frameValueComparatorFactories = frameValueComparatorFactories; @@ -98,15 +94,11 @@ @Override protected void init() throws HyracksDataException { super.init(); - frameValueEvals = createEvaluators(frameValueEvalFactories, ctx); frameValueComparators = MultiComparator.create(frameValueComparatorFactories); frameValuePointables = createPointables(frameValueEvalFactories.length); frameEndEvals = createEvaluators(frameEndEvalFactories, ctx); frameEndPointables = createPointables(frameEndEvalFactories.length); - - runFrame = new VSizeFrame(ctx); - copyFrame2 = new VSizeFrame(ctx); tAccess2 = new FrameTupleAccessor(inputRecordDesc); tRef2 = new FrameTupleReference(); } @@ -117,32 +109,25 @@ nestedAggInit(); chunkIdxFrameEndGlobal = 0; tBeginIdxFrameEndGlobal = -1; - readerPosFrameEndGlobal = 0; - runFrameChunkId = -1; toWrite = frameMaxObjects; } @Override - protected void producePartitionTuples(int chunkIdx, GeneratedRunFileReader reader) throws HyracksDataException { - long readerPos = -1; + protected void producePartitionTuples(int chunkIdx, IFrame chunkFrame) throws HyracksDataException { + partitionReader.savePosition(PARTITION_POSITION_SLOT); + int nChunks = getPartitionChunkCount(); - if (nChunks > 1) { - readerPos = reader.position(); - if (chunkIdx == 0) { - ByteBuffer curFrameBuffer = curFrame.getBuffer(); - int pos = curFrameBuffer.position(); - copyFrame2.ensureFrameSize(curFrameBuffer.capacity()); - FrameUtils.copyAndFlip(curFrameBuffer, copyFrame2.getBuffer()); - curFrameBuffer.position(pos); - } - } + boolean isFirstChunkInPartition = chunkIdx == 0; + boolean isLastChunkInPartition = chunkIdx == nChunks - 1; - boolean isLastChunk = chunkIdx == nChunks - 1; - - tAccess.reset(curFrame.getBuffer()); + tAccess.reset(chunkFrame.getBuffer()); int tBeginIdx = getTupleBeginIdx(chunkIdx); int tEndIdx = getTupleEndIdx(chunkIdx); + for (int tIdx = tBeginIdx; tIdx <= tEndIdx; tIdx++) { + boolean isFirstTupleInPartition = isFirstChunkInPartition && tIdx == tBeginIdx; + boolean isLastTupleInPartition = isLastChunkInPartition && tIdx == tEndIdx; + tRef.reset(tAccess, tIdx); // running aggregates @@ -153,40 +138,28 @@ int chunkIdxInnerStart = chunkIdxFrameEndGlobal; int tBeginIdxInnerStart = tBeginIdxFrameEndGlobal; - if (nChunks > 1) { - reader.seek(readerPosFrameEndGlobal); + + if (chunkIdxInnerStart < nChunks) { + if (!isFirstTupleInPartition) { + partitionReader.restorePosition(FRAME_POSITION_SLOT); + } else { + partitionReader.rewind(); + } } int chunkIdxFrameEndLocal = -1, tBeginIdxFrameEndLocal = -1; - long readerPosFrameEndLocal = -1; frame_loop: for (int chunkIdxInner = chunkIdxInnerStart; chunkIdxInner < nChunks; chunkIdxInner++) { - long readerPosFrameInner; - IFrame frameInner; - if (chunkIdxInner == 0) { - // first chunk's frame is always in memory - frameInner = chunkIdx == 0 ? curFrame : copyFrame2; - readerPosFrameInner = 0; - } else { - readerPosFrameInner = reader.position(); - if (runFrameChunkId == chunkIdxInner) { - // runFrame has this chunk, so just advance the reader - reader.seek(readerPosFrameInner + runFrameSize); - } else { - reader.nextFrame(runFrame); - runFrameSize = reader.position() - readerPosFrameInner; - runFrameChunkId = chunkIdxInner; - } - frameInner = runFrame; - } + partitionReader.savePosition(TMP_POSITION_SLOT); + IFrame frameInner = partitionReader.nextFrame(false); tAccess2.reset(frameInner.getBuffer()); int tBeginIdxInner; - if (tBeginIdxInnerStart < 0) { - tBeginIdxInner = getTupleBeginIdx(chunkIdxInner); - } else { + if (tBeginIdxInnerStart >= 0) { tBeginIdxInner = tBeginIdxInnerStart; tBeginIdxInnerStart = -1; + } else { + tBeginIdxInner = getTupleBeginIdx(chunkIdxInner); } int tEndIdxInner = getTupleEndIdx(chunkIdxInner); @@ -194,14 +167,14 @@ tRef2.reset(tAccess2, tIdxInner); evaluate(frameValueEvals, tRef2, frameValuePointables); + if (frameValueComparators.compare(frameValuePointables, frameEndPointables) > 0) { - // save position of the tuple that matches the frame end. - // we'll continue from it in the next outer iteration + // value > end => beyond the frame end + // save position of the current tuple, will continue from it in the next outer iteration chunkIdxFrameEndLocal = chunkIdxInner; tBeginIdxFrameEndLocal = tIdxInner; - readerPosFrameEndLocal = readerPosFrameInner; - - // skip and exit if value > end + partitionReader.copyPosition(TMP_POSITION_SLOT, FRAME_POSITION_SLOT); + // exit the frame loop break frame_loop; } @@ -213,28 +186,28 @@ } } - boolean isLastTuple = isLastChunk && tIdx == tEndIdx; - if (isLastTuple) { + if (chunkIdxFrameEndLocal >= 0) { + chunkIdxFrameEndGlobal = chunkIdxFrameEndLocal; + tBeginIdxFrameEndGlobal = tBeginIdxFrameEndLocal; + } else { + // frame end not found, set it beyond the last chunk + chunkIdxFrameEndGlobal = nChunks; + tBeginIdxFrameEndGlobal = 0; + } + + if (isLastTupleInPartition) { nestedAggOutputFinalResult(tupleBuilder); } else { nestedAggOutputPartialResult(tupleBuilder); } appendToFrameFromTupleBuilder(tupleBuilder); - - if (chunkIdxFrameEndLocal >= 0) { - chunkIdxFrameEndGlobal = chunkIdxFrameEndLocal; - tBeginIdxFrameEndGlobal = tBeginIdxFrameEndLocal; - readerPosFrameEndGlobal = readerPosFrameEndLocal; - } else { - // could not find the end, set beyond the last chunk - chunkIdxFrameEndGlobal = nChunks; - tBeginIdxFrameEndGlobal = 0; - readerPosFrameEndGlobal = 0; - } } - if (nChunks > 1) { - reader.seek(readerPos); - } + partitionReader.restorePosition(PARTITION_POSITION_SLOT); + } + + @Override + protected int getPartitionReaderSlotCount() { + return PARTITION_READER_SLOT_COUNT; } } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java index ddeaf2b..53692d1 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningRuntimeFactory.java @@ -50,10 +50,10 @@ IBinaryComparatorFactory[] frameValueComparatorFactories, IScalarEvaluatorFactory[] frameEndEvalFactories, int frameMaxObjects, int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize, - WindowAggregatorDescriptorFactory nestedAggFactory) { + WindowAggregatorDescriptorFactory nestedAggFactory, int memSizeInFrames) { super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, - nestedAggFactory); + nestedAggFactory, memSizeInFrames); this.frameValueEvalFactories = frameValueEvalFactories; this.frameValueComparatorFactories = frameValueComparatorFactories; this.frameEndEvalFactories = frameEndEvalFactories; @@ -65,7 +65,7 @@ return new WindowNestedPlansRunningPushRuntime(partitionColumns, partitionComparatorFactories, orderComparatorFactories, frameValueEvalFactories, frameValueComparatorFactories, frameEndEvalFactories, frameMaxObjects, projectionList, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, - nestedAggFactory, ctx); + nestedAggFactory, ctx, memSizeInFrames, sourceLoc); } @Override diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java index f754b91..4a7c837 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRuntimeFactory.java @@ -68,10 +68,10 @@ IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory, int frameMaxObjects, int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize, - WindowAggregatorDescriptorFactory nestedAggFactory) { + WindowAggregatorDescriptorFactory nestedAggFactory, int memSizeInFrames) { super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, - nestedAggFactory); + nestedAggFactory, memSizeInFrames); this.frameValueEvalFactories = frameValueEvalFactories; this.frameValueComparatorFactories = frameValueComparatorFactories; this.frameStartEvalFactories = frameStartEvalFactories; @@ -92,7 +92,7 @@ frameStartEvalFactories, frameStartIsMonotonic, frameEndEvalFactories, frameExcludeEvalFactories, frameExcludeNegationStartIdx, frameExcludeComparatorFactories, frameOffsetEvalFactory, binaryIntegerInspectorFactory, frameMaxObjects, projectionList, runningAggOutColumns, - runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx); + runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx, memSizeInFrames, sourceLoc); } @Override diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java index b25a36c..d97c855 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedPushRuntime.java @@ -26,6 +26,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; import org.apache.hyracks.dataflow.common.utils.TupleUtils; @@ -55,9 +56,11 @@ IBinaryComparatorFactory[] partitionComparatorFactories, IBinaryComparatorFactory[] orderComparatorFactories, int frameMaxObjects, int[] projectionColumns, int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories, - int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx) { + int nestedAggOutSchemaSize, WindowAggregatorDescriptorFactory nestedAggFactory, IHyracksTaskContext ctx, + int memSizeInFrames, SourceLocation sourceLoc) { super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns, - runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx); + runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, nestedAggFactory, ctx, + memSizeInFrames, sourceLoc); this.frameMaxObjects = frameMaxObjects; } diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java index 0f7d9cf..b2dfbca 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansUnboundedRuntimeFactory.java @@ -41,10 +41,10 @@ IBinaryComparatorFactory[] orderComparatorFactories, int frameMaxObjects, int[] projectionColumnsExcludingSubplans, int[] runningAggOutColumns, IRunningAggregateEvaluatorFactory[] runningAggFactories, int nestedAggOutSchemaSize, - WindowAggregatorDescriptorFactory nestedAggFactory) { + WindowAggregatorDescriptorFactory nestedAggFactory, int memSizeInFrames) { super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumnsExcludingSubplans, runningAggOutColumns, runningAggFactories, nestedAggOutSchemaSize, - nestedAggFactory); + nestedAggFactory, memSizeInFrames); this.frameMaxObjects = frameMaxObjects; } @@ -52,7 +52,7 @@ public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) { return new WindowNestedPlansUnboundedPushRuntime(partitionColumns, partitionComparatorFactories, orderComparatorFactories, frameMaxObjects, projectionList, runningAggOutColumns, runningAggFactories, - nestedAggOutSchemaSize, nestedAggFactory, ctx); + nestedAggOutSchemaSize, nestedAggFactory, ctx, memSizeInFrames, sourceLoc); } @Override diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowPartitionReader.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowPartitionReader.java new file mode 100644 index 0000000..208effc --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowPartitionReader.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.win; + +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +interface WindowPartitionReader { + + void open() throws HyracksDataException; + + IFrame nextFrame(boolean primaryScan) throws HyracksDataException; + + void close() throws HyracksDataException; + + // position manipulation + + void rewind(); + + void savePosition(int slotNo); + + void restorePosition(int slotNo); + + void copyPosition(int slotFrom, int slotTo); +} diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowPartitionWriter.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowPartitionWriter.java new file mode 100644 index 0000000..b3eb317 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowPartitionWriter.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hyracks.algebricks.runtime.operators.win; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; +import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader; +import org.apache.hyracks.dataflow.common.io.RunFileWriter; + +final class WindowPartitionWriter { + + private final IHyracksTaskContext ctx; + + private final String fileNamePrefix; + + private final SourceLocation sourceLoc; + + private final IFrame[] writerFrames; + + private int writerFrameCount; + + private long writerFirstFrameId; + + private long writerLastFrameId; + + private RunFileWriter fileWriter; + + private final AbstractWindowPartitionReader partitionReader; + + WindowPartitionWriter(IHyracksTaskContext ctx, int memSizeInFrames, String fileNamePrefix, + int readerPositionStoreSize, SourceLocation sourceLoc) throws HyracksDataException { + this.ctx = ctx; + this.fileNamePrefix = fileNamePrefix; + this.sourceLoc = sourceLoc; + partitionReader = readerPositionStoreSize < 1 ? new WindowPartitionForwardReader() + : new WindowPartitionSeekableReader(readerPositionStoreSize); + int writerFrameBudget = memSizeInFrames - partitionReader.getReservedFrameCount(); + if (writerFrameBudget < 1) { + throw new IllegalArgumentException(String.valueOf(memSizeInFrames)); + } + writerFrames = new IFrame[writerFrameBudget]; + // Allocate one writer frame here. Remaining frames will be allocated lazily while writing + allocateFrames(writerFrames, 1); + writerFirstFrameId = writerLastFrameId = -1; + } + + void close() throws HyracksDataException { + try { + partitionReader.closeFileReader(); + } finally { + if (fileWriter != null) { + fileWriter.close(); + } + } + } + + void reset() { + writerFrameCount = 0; + if (fileWriter != null) { + fileWriter.rewind(); + } + } + + void nextFrame(long frameId, ByteBuffer frameBuffer) throws HyracksDataException { + if (frameId < 0) { + throw new IllegalArgumentException(String.valueOf(frameId)); + } + if (writerFrameCount == 0) { + if (writerFirstFrameId != frameId) { + copyToFrame(frameBuffer, writerFrames[0]); + writerFirstFrameId = frameId; + } + } else if (writerFrameCount < writerFrames.length) { + IFrame writerFrame = writerFrames[writerFrameCount]; + if (writerFrame == null) { + writerFrames[writerFrameCount] = writerFrame = new VSizeFrame(ctx); + } + copyToFrame(frameBuffer, writerFrame); + } else { + if (fileWriter == null) { + FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(fileNamePrefix); + fileWriter = new RunFileWriter(file, ctx.getIoManager()); + fileWriter.open(); + } + int pos = frameBuffer.position(); + frameBuffer.position(0); + fileWriter.nextFrame(frameBuffer); + frameBuffer.position(pos); + } + writerLastFrameId = frameId; + writerFrameCount++; + } + + WindowPartitionReader getReader() { + return partitionReader; + } + + private void allocateFrames(IFrame[] outFrames, int count) throws HyracksDataException { + for (int i = 0; i < count; i++) { + outFrames[i] = new VSizeFrame(ctx); + } + } + + private static void copyToFrame(ByteBuffer fromBuffer, IFrame toFrame) throws HyracksDataException { + toFrame.ensureFrameSize(fromBuffer.capacity()); + int fromPosition = fromBuffer.position(); + FrameUtils.copyAndFlip(fromBuffer, toFrame.getBuffer()); + fromBuffer.position(fromPosition); + } + + private static <T> void swap(T[] array1, int index1, T[] array2, int index2) { + T item1 = array1[index1]; + array1[index1] = array2[index2]; + array2[index2] = item1; + } + + private abstract class AbstractWindowPartitionReader implements WindowPartitionReader { + + int readerFrameIdx = -1; + + GeneratedRunFileReader fileReader; + + @Override + public void open() throws HyracksDataException { + if (readerFrameIdx >= 0) { + throw new IllegalStateException(String.valueOf(readerFrameIdx)); + } + readerFrameIdx = 0; + + if (writerFrameCount > writerFrames.length) { + openFileReader(); + } + } + + @Override + public final void close() throws HyracksDataException { + if (readerFrameIdx != writerFrameCount) { + throw new IllegalStateException(); + } + + // closeImpl() must guarantee that first writer frame will contain content of the last partition frame + closeImpl(); + writerFirstFrameId = writerLastFrameId; + + readerFrameIdx = -1; + + if (writerFrameCount > writerFrames.length) { + closeFileReader(); + } + } + + void openFileReader() throws HyracksDataException { + if (fileReader != null) { + throw new IllegalStateException(); + } + fileReader = fileWriter.createReader(); + fileReader.open(); + } + + void closeFileReader() throws HyracksDataException { + GeneratedRunFileReader r = fileReader; + if (r != null) { + fileReader = null; + r.close(); + } + } + + void readFromFileReader(IFrame outFrame) throws HyracksDataException { + if (!fileReader.nextFrame(outFrame)) { + throw HyracksDataException.create(ErrorCode.EOF, sourceLoc); + } + } + + @Override + public final IFrame nextFrame(boolean primaryScan) throws HyracksDataException { + if (readerFrameIdx < 0) { + throw new IllegalStateException(); + } + if (readerFrameIdx >= writerFrameCount) { + throw HyracksDataException.create(ErrorCode.EOF, sourceLoc); + } + IFrame frame = nextFrameImpl(primaryScan); + readerFrameIdx++; + return frame; + } + + abstract void closeImpl() throws HyracksDataException; + + abstract IFrame nextFrameImpl(boolean primaryScan) throws HyracksDataException; + + abstract int getReservedFrameCount(); + } + + private final class WindowPartitionForwardReader extends AbstractWindowPartitionReader { + + @Override + IFrame nextFrameImpl(boolean primaryScan) throws HyracksDataException { + if (!primaryScan) { + throw new IllegalArgumentException(); + } + if (readerFrameIdx < writerFrames.length) { + return writerFrames[readerFrameIdx]; + } else { + IFrame writerFrame0 = writerFrames[0]; + readFromFileReader(writerFrame0); + return writerFrame0; + } + } + + @Override + void closeImpl() { + int endFrameIdx = readerFrameIdx - 1; + if (endFrameIdx > 0 && endFrameIdx < writerFrames.length) { + // last partition frame is in writerFrames -> make it the first one + swap(writerFrames, 0, writerFrames, endFrameIdx); + } + } + + @Override + public void savePosition(int slotNo) { + throw new UnsupportedOperationException(); + } + + @Override + public void copyPosition(int slotFrom, int slotTo) { + throw new UnsupportedOperationException(); + } + + @Override + public void restorePosition(int slotNo) { + throw new UnsupportedOperationException(); + } + + @Override + public void rewind() { + throw new UnsupportedOperationException(); + } + + @Override + int getReservedFrameCount() { + return 0; + } + } + + private final class WindowPartitionSeekableReader extends AbstractWindowPartitionReader { + + private final IFrame[] fileFrames; + + private final long[] fileFrameIdxs; + + private final long[] fileFrameSizes; + + private final long[] filePositionStore; + + private final int[] readerFrameIdxStore; + + private WindowPartitionSeekableReader(int positionStoreSize) throws HyracksDataException { + fileFrames = new IFrame[2]; // run file frames: one for primary scan, another for non-primary + allocateFrames(fileFrames, fileFrames.length); + fileFrameIdxs = new long[fileFrames.length]; + fileFrameSizes = new long[fileFrames.length]; + filePositionStore = new long[positionStoreSize]; + readerFrameIdxStore = new int[positionStoreSize]; + } + + @Override + public void open() throws HyracksDataException { + super.open(); + Arrays.fill(fileFrameIdxs, -1); + Arrays.fill(filePositionStore, -1); + Arrays.fill(readerFrameIdxStore, -1); + } + + @Override + IFrame nextFrameImpl(boolean primaryScan) throws HyracksDataException { + if (readerFrameIdx < writerFrames.length) { + return writerFrames[readerFrameIdx]; + } else { + int fileFrameSlot = primaryScan ? 0 : 1; + IFrame fileFrameRef = fileFrames[fileFrameSlot]; + long filePosition = fileReader.position(); + if (readerFrameIdx == fileFrameIdxs[fileFrameSlot]) { + fileReader.seek(filePosition + fileFrameSizes[fileFrameSlot]); + } else { + readFromFileReader(fileFrameRef); + fileFrameSizes[fileFrameSlot] = fileReader.position() - filePosition; + fileFrameIdxs[fileFrameSlot] = readerFrameIdx; + } + return fileFrameRef; + } + } + + @Override + public void closeImpl() { + int endFrameIdx = readerFrameIdx - 1; + if (endFrameIdx >= writerFrames.length) { + // last partition frame was in the run file -> get contents from the file frame + swap(writerFrames, 0, fileFrames, 0); + } else if (endFrameIdx > 0) { + // last partition frame is in writerFrames -> make it the first one + swap(writerFrames, 0, writerFrames, endFrameIdx); + } + } + + @Override + public void savePosition(int slotNo) { + readerFrameIdxStore[slotNo] = readerFrameIdx; + filePositionStore[slotNo] = fileReader != null ? fileReader.position() : 0; + } + + @Override + public void copyPosition(int slotFrom, int slotTo) { + readerFrameIdxStore[slotTo] = readerFrameIdxStore[slotFrom]; + filePositionStore[slotTo] = filePositionStore[slotFrom]; + } + + @Override + public void restorePosition(int slotNo) { + seek(readerFrameIdxStore[slotNo], filePositionStore[slotNo]); + } + + @Override + public void rewind() { + seek(0, 0); + } + + private void seek(int readerFrameIdx, long filePosition) { + this.readerFrameIdx = readerFrameIdx; + if (fileReader != null) { + fileReader.seek(filePosition); + } + } + + @Override + int getReservedFrameCount() { + return fileFrames.length; + } + } +} diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java index bf71ed9..f7f1a25 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimplePushRuntime.java @@ -25,6 +25,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.SourceLocation; /** * Runtime for window operators that evaluates running aggregates without partition materialization. @@ -33,9 +34,10 @@ WindowSimplePushRuntime(int[] partitionColumns, IBinaryComparatorFactory[] partitionComparatorFactories, IBinaryComparatorFactory[] orderComparatorFactories, int[] projectionColumns, int[] runningAggOutColumns, - IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx) { + IRunningAggregateEvaluatorFactory[] runningAggFactories, IHyracksTaskContext ctx, + SourceLocation sourceLoc) { super(partitionColumns, partitionComparatorFactories, orderComparatorFactories, projectionColumns, - runningAggOutColumns, runningAggFactories, ctx); + runningAggOutColumns, runningAggFactories, ctx, sourceLoc); } @Override @@ -47,7 +49,7 @@ protected void partitionChunkImpl(long frameId, ByteBuffer frameBuffer, int tBeginIdx, int tEndIdx) throws HyracksDataException { tAccess.reset(frameBuffer); - produceTuples(tAccess, tBeginIdx, tEndIdx); + produceTuples(tAccess, tBeginIdx, tEndIdx, tRef); } @Override diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java index ded399f..2d1cdde 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowSimpleRuntimeFactory.java @@ -43,7 +43,7 @@ @Override public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(IHyracksTaskContext ctx) { return new WindowSimplePushRuntime(partitionColumns, partitionComparatorFactories, orderComparatorFactories, - projectionList, runningAggOutColumns, runningAggFactories, ctx); + projectionList, runningAggOutColumns, runningAggFactories, ctx, sourceLoc); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index d6860f7..4ff0df3 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -153,6 +153,7 @@ public static final int NO_RANGEMAP_PRODUCED = 117; public static final int RANGEMAP_NOT_FOUND = 118; public static final int UNSUPPORTED_WINDOW_SPEC = 119; + public static final int EOF = 120; // Compilation error codes. public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index e95bf76..5c9863c 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -136,6 +136,7 @@ 117 = No range map produced for parallel sort 118 = Range map was not found for parallel sort 119 = Unsupported window specification: PARTITION BY %1$s, ORDER BY %2$s +120 = End of file 10000 = The given rule collection %1$s is not an instance of the List class. 10001 = Cannot compose partition constraint %1$s with %2$s diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java index 03fdb49..50cacb2 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileReader.java @@ -73,7 +73,7 @@ int readLength = ioManager.syncRead(handle, readPtr, frame.getBuffer()); if (readLength <= 0) { - throw new HyracksDataException("Premature end of file"); + throw HyracksDataException.create(ErrorCode.EOF); } readPtr += readLength; frame.ensureFrameSize(frame.getMinSize() * FrameHelper.deserializeNumOfMinFrame(frame.getBuffer())); @@ -81,7 +81,7 @@ if (readPtr < size) { readLength = ioManager.syncRead(handle, readPtr, frame.getBuffer()); if (readLength < 0) { - throw new HyracksDataException("Premature end of file"); + throw HyracksDataException.create(ErrorCode.EOF); } readPtr += readLength; } -- To view, visit https://asterix-gerrit.ics.uci.edu/3227 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I6756e92046883f79db339ef490cca8bc8b7b1fb8 Gerrit-PatchSet: 3 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Dmitry Lychagin <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Dmitry Lychagin <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
