Repository: flink Updated Branches: refs/heads/master f429b4cde -> 92026ff39
[FLINK-8475][config][docs] Integrate Algorithm options This closes #5460. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/023ab749 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/023ab749 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/023ab749 Branch: refs/heads/master Commit: 023ab749cc2d82fcf729d3e3f8052c60280f0af3 Parents: f429b4c Author: zentol <ches...@apache.org> Authored: Tue Jan 30 14:06:30 2018 +0100 Committer: zentol <ches...@apache.org> Committed: Mon Feb 12 17:03:02 2018 +0100 ---------------------------------------------------------------------- .../generated/algorithm_configuration.html | 26 +++++++++++ docs/ops/config.md | 6 +-- .../flink/configuration/AlgorithmOptions.java | 47 ++++++++++++++++++++ .../plantranslate/JobGraphGenerator.java | 11 +++-- .../AbstractCachedBuildSideJoinDriver.java | 7 ++- .../flink/runtime/operators/JoinDriver.java | 7 ++- 6 files changed, 85 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/023ab749/docs/_includes/generated/algorithm_configuration.html ---------------------------------------------------------------------- diff --git a/docs/_includes/generated/algorithm_configuration.html b/docs/_includes/generated/algorithm_configuration.html new file mode 100644 index 0000000..c1406ee --- /dev/null +++ b/docs/_includes/generated/algorithm_configuration.html @@ -0,0 +1,26 @@ +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 65%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>taskmanager.runtime.hashjoin-bloom-filters</h5></td> + <td>false</td> + <td>Flag to activate/deactivate bloom filters in the hybrid hash join implementation. In cases where the hash join needs to spill to disk (datasets larger than the reserved fraction of memory), these bloom filters can greatly reduce the number of spilled records, at the cost some CPU cycles.</td> + </tr> + <tr> + <td><h5>taskmanager.runtime.max-fan</h5></td> + <td>128</td> + <td>The maximal fan-in for external merge joins and fan-out for spilling hash tables. Limits the number of file handles per operator, but may cause intermediate merging/partitioning, if set too small.</td> + </tr> + <tr> + <td><h5>taskmanager.runtime.sort-spilling-threshold</h5></td> + <td>0.8</td> + <td>A sort operation starts spilling when this fraction of its memory budget is full.</td> + </tr> + </tbody> +</table> http://git-wip-us.apache.org/repos/asf/flink/blob/023ab749/docs/ops/config.md ---------------------------------------------------------------------- diff --git a/docs/ops/config.md b/docs/ops/config.md index 41f0349..3236199 100644 --- a/docs/ops/config.md +++ b/docs/ops/config.md @@ -393,11 +393,7 @@ definition. This scheme is used **ONLY** if no other scheme is specified (explic ### Runtime Algorithms -- `taskmanager.runtime.hashjoin-bloom-filters`: Flag to activate/deactivate bloom filters in the hybrid hash join implementation. In cases where the hash join needs to spill to disk (datasets larger than the reserved fraction of memory), these bloom filters can greatly reduce the number of spilled records, at the cost some CPU cycles. (DEFAULT: false) - -- `taskmanager.runtime.max-fan`: The maximal fan-in for external merge joins and fan-out for spilling hash tables. Limits the number of file handles per operator, but may cause intermediate merging/partitioning, if set too small (DEFAULT: 128). - -- `taskmanager.runtime.sort-spilling-threshold`: A sort operation starts spilling when this fraction of its memory budget is full (DEFAULT: 0.8). +{% include generated/algorithm_configuration.html %} ### Resource Manager http://git-wip-us.apache.org/repos/asf/flink/blob/023ab749/flink-core/src/main/java/org/apache/flink/configuration/AlgorithmOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AlgorithmOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AlgorithmOptions.java new file mode 100644 index 0000000..f44432c --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/AlgorithmOptions.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * Configuration parameters for join/sort algorithms. + */ +public class AlgorithmOptions { + + public static final ConfigOption<Boolean> HASH_JOIN_BLOOM_FILTERS = + key("taskmanager.runtime.hashjoin-bloom-filters") + .defaultValue(false) + .withDescription("Flag to activate/deactivate bloom filters in the hybrid hash join implementation." + + " In cases where the hash join needs to spill to disk (datasets larger than the reserved fraction of" + + " memory), these bloom filters can greatly reduce the number of spilled records, at the cost some" + + " CPU cycles."); + + public static final ConfigOption<Integer> SPILLING_MAX_FAN = + key("taskmanager.runtime.max-fan") + .defaultValue(128) + .withDescription("The maximal fan-in for external merge joins and fan-out for spilling hash tables. Limits" + + " the number of file handles per operator, but may cause intermediate merging/partitioning, if set too" + + " small."); + + public static final ConfigOption<Float> SORT_SPILLING_THRESHOLD = + key("taskmanager.runtime.sort-spilling-threshold") + .defaultValue(0.8f) + .withDescription("A sort operation starts spilling when this fraction of its memory budget is full."); +} http://git-wip-us.apache.org/repos/asf/flink/blob/023ab749/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java index 248049e..ae20567 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java @@ -30,6 +30,7 @@ import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry; import org.apache.flink.api.common.distributions.DataDistribution; import org.apache.flink.api.common.operators.util.UserCodeWrapper; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.configuration.AlgorithmOptions; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.dag.TempMode; @@ -145,16 +146,14 @@ public class JobGraphGenerator implements Visitor<PlanNode> { * Creates a new job graph generator that uses the default values for its resource configuration. */ public JobGraphGenerator() { - this.defaultMaxFan = ConfigConstants.DEFAULT_SPILLING_MAX_FAN; - this.defaultSortSpillingThreshold = ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD; + this.defaultMaxFan = AlgorithmOptions.SPILLING_MAX_FAN.defaultValue(); + this.defaultSortSpillingThreshold = AlgorithmOptions.SORT_SPILLING_THRESHOLD.defaultValue(); this.useLargeRecordHandler = ConfigConstants.DEFAULT_USE_LARGE_RECORD_HANDLER; } public JobGraphGenerator(Configuration config) { - this.defaultMaxFan = config.getInteger(ConfigConstants.DEFAULT_SPILLING_MAX_FAN_KEY, - ConfigConstants.DEFAULT_SPILLING_MAX_FAN); - this.defaultSortSpillingThreshold = config.getFloat(ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD_KEY, - ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD); + this.defaultMaxFan = config.getInteger(AlgorithmOptions.SPILLING_MAX_FAN); + this.defaultSortSpillingThreshold = config.getFloat(AlgorithmOptions.SORT_SPILLING_THRESHOLD); this.useLargeRecordHandler = config.getBoolean( ConfigConstants.USE_LARGE_RECORD_HANDLER_KEY, ConfigConstants.DEFAULT_USE_LARGE_RECORD_HANDLER); http://git-wip-us.apache.org/repos/asf/flink/blob/023ab749/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java index 712018b..ff8351c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.AlgorithmOptions; import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstReOpenableHashJoinIterator; import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondReOpenableHashJoinIterator; @@ -80,9 +80,8 @@ public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends Jo this.taskContext.getTaskConfig().getPairComparatorFactory(this.taskContext.getUserCodeClassLoader()); double availableMemory = config.getRelativeMemoryDriver(); - boolean hashJoinUseBitMaps = taskContext.getTaskManagerInfo().getConfiguration().getBoolean( - ConfigConstants.RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY, - ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS); + boolean hashJoinUseBitMaps = taskContext.getTaskManagerInfo().getConfiguration() + .getBoolean(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS); ExecutionConfig executionConfig = taskContext.getExecutionConfig(); objectReuseEnabled = executionConfig.isObjectReuseEnabled(); http://git-wip-us.apache.org/repos/asf/flink/blob/023ab749/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java index b8cb545..585dc0e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparatorFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.AlgorithmOptions; import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.memory.MemoryManager; @@ -123,9 +123,8 @@ public class JoinDriver<IT1, IT2, OT> implements Driver<FlatJoinFunction<IT1, IT LOG.debug("Join Driver object reuse: " + (objectReuseEnabled ? "ENABLED" : "DISABLED") + "."); } - boolean hashJoinUseBitMaps = taskContext.getTaskManagerInfo().getConfiguration().getBoolean( - ConfigConstants.RUNTIME_HASH_JOIN_BLOOM_FILTERS_KEY, - ConfigConstants.DEFAULT_RUNTIME_HASH_JOIN_BLOOM_FILTERS); + boolean hashJoinUseBitMaps = taskContext.getTaskManagerInfo().getConfiguration() + .getBoolean(AlgorithmOptions.HASH_JOIN_BLOOM_FILTERS); // create and return joining iterator according to provided local strategy. if (objectReuseEnabled) {