Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager
Hi, Roman Sorry about that I missed one question just now. > if the two configuration options are still in use, why does the FLIP propose to deprecate them? These two configs are usually used to avoid the memory issue, but after introducing the improvement, generally, I think it is no longer necessary to adjust these two configurations to avoid the issue. So I propose to deprecate them in the future when the @Experimental annotation of the newly added config is removed. Best, Yuxin Roman Khachatryan 于2022年12月28日周三 20:10写道: > Thanks for your reply Yuxin, > > > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from > > configurations, which are not calculated. I have described them in the > FLIP > > motivation section. > > The motivation section says about floating buffers: > > FloatingBuffersPerGate is within the range of > [numFloatingBufferThreashold, ExclusiveBuffersPerChannel * numChannels + > DefaultFloatingBuffersPerGate] ... > So my question is what value exactly in this range will it have and how and > where will it be computed? > > As for the ExclusiveBuffersPerChannel, there was a proposal in the thread > to calculate it dynamically (by linear search > from taskmanager.network.memory.buffers-per-channel down to 0). > > Also, if the two configuration options are still in use, why does the FLIP > propose to deprecate them? > > Besides that, wouldn't it be more clear to separate motivation from the > proposed changes? > > Regards, > Roman > > > On Wed, Dec 28, 2022 at 12:19 PM JasonLee <17610775...@163.com> wrote: > > > Hi Yuxin > > > > > > Thanks for the proposal, big + 1 for this FLIP. > > > > > > > > It is difficult for users to calculate the size of network memory. If the > > setting is too small, the task cannot be started. If the setting is too > > large, there may be a waste of resources. As far as possible, Flink > > framework can automatically set a reasonable value, but I have a small > > problem. network memory is not only related to the parallelism of the > task, > > but also to the complexity of the task DAG. The more complex a DAG is, > > shuffle write and shuffle read require larger buffers. How can we > determine > > how many RS and IG a DAG has? > > > > > > > > Best > > JasonLee > > > > > > Replied Message > > | From | Yuxin Tan | > > | Date | 12/28/2022 18:29 | > > | To | | > > | Subject | Re: [DISCUSS] FLIP-266: Simplify network memory > configurations > > for TaskManager | > > Hi, Roman > > > > Thanks for the replay. > > > > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from > > configurations, which are not calculated. I have described them in the > FLIP > > motivation section. > > > > 3. Each gate requires at least one buffer... > > The timeout exception occurs when the ExclusiveBuffersPerChannel > > can not be requested from NetworkBufferPool, which is not caused by the > > change of this Flip. In addition, if we have set the > > ExclusiveBuffersPerChannel > > to 0 when using floating buffers, which can also decrease the probability > > of > > this exception. > > > > 4. It would be great to have experimental results for jobs with different > > exchange types. > > Thanks for the suggestion. I have a test about different exchange types, > > forward > > and rescale, and the results show no differences from the all-to-all > type, > > which > > is also understandable, because the network memory usage is calculated > > with numChannels, independent of the edge type. > > > > Best, > > Yuxin > > > > > > Roman Khachatryan 于2022年12月28日周三 05:27写道: > > > > Hi everyone, > > > > Thanks for the proposal and the discussion. > > > > I couldn't find much details on how exactly the values of > > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated. > > I guess that > > - the threshold evaluation is done on JM > > - floating buffers calculation is done on TM based on the current memory > > available; so it is not taking into account any future tasks submitted > for > > that (or other) job > > Is that correct? > > > > If so, I see the following potential issues: > > > > 1. Each (sub)task might have different values because the actual > > available memory might be different. E.g. some tasks might use exclusive > > buffers and others only floating. That could lead to significant skew > > in processing speed, and in turn to issues with checkpoints and > watermarks. > > > > 2. Re-deployment of a task (e.g. on job failure) might lead to a > completely > > different memory configuration. That, coupled with different values per > > subtask and operator, makes the performance analysis more difficult. > > > > (Regardless of whether it's done on TM or JM): > > 3. Each gate requires at least one buffer [1]. So, in case when no memory > > is available, TM will throw an Allocation timeout exception instead of > > Insufficient buffers exception immediately. A delay here (allocation > > timeout) seems like a regression. > > Besides that,
[DISCUSS] Adding a option for planner to decide which join reorder rule to choose
Hi, devs, I'd like to start a discuss about adding an option called "table.oprimizer.busy-join-reorder-threshold" for planner rule while we try to introduce a new busy join reorder rule[1] into Flink. This join reorder rule is based on dynamic programing[2], which can store all possible intermediate results, and the cost model can be used to select the optimal join reorder result. Compare with the existing Lopt join reorder rule, the new rule can give more possible results and the result can be more accurate. However, the search space of this rule will become very large as the number of tables increases. So we should introduce an option to limit the expansion of search space, if the number of table can be reordered less than the threshold, the new busy join reorder rule is used. On the contrary, the Lopt rule is used. The default threshold intended to be set to 12. One reason is that in the tpc-ds benchmark test, when the number of tables exceeds 12, the optimization time will be very long. The other reason is that it refers to relevant engines, like Spark, whose recommended setting is 12.[3] Looking forward to your feedback. [1] https://issues.apache.org/jira/browse/FLINK-30376 [2] https://courses.cs.duke.edu/compsci516/cps216/spring03/papers/selinger-etal-1979.pdf [3] https://spark.apache.org/docs/3.3.1/configuration.html#runtime-sql-configuration Best regards, Yunhong Zheng
Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager
Thanks for your question, Roman > wouldn't it be more clear to separate motivation from the proposed changes? All the contents in the motivation part are the current implementation of Shuffle reading. The Flip has described where ExclusiveBuffersPerChannel and FloatingBuffersPerGate is from, the description is as follows. "ExclusiveBuffersPerChannel is determined by taskmanager.network.memory.buffers-per-channel, which is 2 by default. FloatingBuffersPerGate ranges from 1 to DefaultFloatingBuffersPerGate. DefaultFloatingBuffersPerGate is determined by taskmanager.network.memory.floating-buffers-per-gate, which is 8 by default." This part is the current implementation of network memory, not the change introduced from the Flip. ExclusiveBuffersPerChannel is used in SingleInputGateFactory#createInputChannel, so it is the number of exclusive buffers in one channel. In one gate, the floating buffers FloatingBuffersPerGate are used in SingleInputGateFactory#create to create the LocalBufferPool of the gate. The buffer number of LocalBufferPool ranges from 1 to the DefaultFloatingBuffersPerGate, which can be seen in SingleInputGateFactory#createBufferPoolFactory, the NettyShuffleUtils.getMinMaxFloatingBuffersPerInputGate controls the buffer number of LocalBufferPool ranges from 1 to DefaultFloatingBuffersPerGate. One gate may contain multiple channels, we call the number of channels as numChannels, So the total buffers are *ExclusiveBuffersPerChannel * numChannels + FloatingBuffersPerGate,*where FloatingBuffersPerGate in LocalBufferPool is a range from 1 to DefaultFloatingBuffersPerGate, so "the range of TotalBuffersPerGate for each InputGate is [ExclusiveBuffersPerChannel * numChannels + 1, ExclusiveBuffersPerChannel * numChannels + DefaultFloatingBuffersPerGate]", which is described in the Flip. All the above descriptions are current implementations, not changes from Flip. Therefore, the motivation section describes the current implementation, while the proposed change section describes the changes that have occurred, which separates motivation from the proposed change. > So my question is what value exactly in this range will it have and how and where will it be computed? Based on the current implementation, a threshold is introduced, taskmanager.memory.network.read-required-buffer.max. As described above, when the the num of exclusive buffers in a gate (ExclusiveBuffersPerChannel * numChannels) is greater than the threshold, all read buffers will use floating buffers, and in order to keep the number of buffers consistent with that before the change, we will modify the creating process of LocalBufferPool. This is an implementation detail and the change is as follows. The floating buffers in LocalBufferPool will be in the range of [numFloatingBufferThreashold, ExclusiveBuffersPerChannel * numChannels + DefaultFloatingBuffersPerGate], which means when calling SingleInputGateFactory#createBufferPoolFactory, the Min buffers of the LocalBufferPool are numFloatingBufferThreashold, the Max buffers of the LocalBufferPool are ExclusiveBuffersPerChannel * numChannels + DefaultFloatingBuffersPerGate. The exact buffer num depends on whether buffers in NetworkBufferPool are sufficient when using LocalBufferPool, which can be seen in the current implementation of LocalBufferPool. And the exact number of floating buffers in LocalBufferPool is in the above range. Best, Yuxin Roman Khachatryan 于2022年12月28日周三 20:10写道: > Thanks for your reply Yuxin, > > > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from > > configurations, which are not calculated. I have described them in the > FLIP > > motivation section. > > The motivation section says about floating buffers: > > FloatingBuffersPerGate is within the range of > [numFloatingBufferThreashold, ExclusiveBuffersPerChannel * numChannels + > DefaultFloatingBuffersPerGate] ... > So my question is what value exactly in this range will it have and how and > where will it be computed? > > As for the ExclusiveBuffersPerChannel, there was a proposal in the thread > to calculate it dynamically (by linear search > from taskmanager.network.memory.buffers-per-channel down to 0). > > Also, if the two configuration options are still in use, why does the FLIP > propose to deprecate them? > > Besides that, wouldn't it be more clear to separate motivation from the > proposed changes? > > Regards, > Roman > > > On Wed, Dec 28, 2022 at 12:19 PM JasonLee <17610775...@163.com> wrote: > > > Hi Yuxin > > > > > > Thanks for the proposal, big + 1 for this FLIP. > > > > > > > > It is difficult for users to calculate the size of network memory. If the > > setting is too small, the task cannot be started. If the setting is too > > large, there may be a waste of resources. As far as possible, Flink > > framework can automatically set a reasonable value, but I have a small > > problem. network memory is not only related to the parallelism of the > task,
[jira] [Created] (FLINK-30532) Add benchmark for DCT, SQLTransformer and StopWordsRemover algorithm
Yunfeng Zhou created FLINK-30532: Summary: Add benchmark for DCT, SQLTransformer and StopWordsRemover algorithm Key: FLINK-30532 URL: https://issues.apache.org/jira/browse/FLINK-30532 Project: Flink Issue Type: Improvement Components: Library / Machine Learning Reporter: Yunfeng Zhou Fix For: ml-2.2.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30531) Reduce operator chain call stack depth
Dong Lin created FLINK-30531: Summary: Reduce operator chain call stack depth Key: FLINK-30531 URL: https://issues.apache.org/jira/browse/FLINK-30531 Project: Flink Issue Type: Improvement Components: Runtime / Task Reporter: Dong Lin Benchmark results show that Flink time to execute simple programs is more than 3X slower than Spark. For example, if we run the following program with object re-use enabled and with parallelism=1, it takes roughtly 120 sec on a macbook, whereas it takes Spark less than 40 sec to run the same logic on the same machine. {code:java} DataStream stream = env.fromSequence(1, 10L) .map(x -> x) .map(x -> x) .map(x -> x) .map(x -> x) .map(x -> x).addSink(new DiscardingSink<>()); {code} It turns out that the operator chain overhead introduced by Flink is surprisingly high. For the above example program, Flink runtime goes through a call stack of 24 functions to produce 1 element. And each extra map(...) operation introduces 4 extra functions in the call stack. Here are the 24 functions in the call stack: {code:bash} StreamTask#processInput StreamOneInputProcessor#processInput StreamTaskSourceInput#emitNext SourceOperator#emitNext IteratorSourceReaderBase#pollNext SourceOutputWithWatermarks#collect AsyncDataOutputToOutput#emitRecord ChainingOutput#collect StreamMap#processElement CountingOutput#collect ChainingOutput#collect StreamMap#processElement CountingOutput#collect ChainingOutput#collect StreamMap#processElement CountingOutput#collect ChainingOutput#collect StreamMap#processElement CountingOutput#collect ChainingOutput#collect StreamMap#processElement CountingOutput#collect ChainingOutput#collect StreamSink#processElement {code} Given the evidence described above, we find the following explanations for why Flink is slow for programs with low computation overhead: * Operator chain currently uses pull-based loop, which has worse branch prediction than push-based loop. * Java's maximum inline level is less than 18 [2]. It is easy for operator chain call stack to exceeds this limit and prevent Java from inlining function calls, which further increases the function call overhead. * For function calls that are not inlined, it requires looking up a virtual table since most functions are virtual functions. [1] [https://arxiv.org/pdf/1610.09166.pdf] [2] [https://bugs.openjdk.org/browse/JDK-8234863] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30530) Flink configuration from user-provided ConfigMap
Arseniy Tashoyan created FLINK-30530: Summary: Flink configuration from user-provided ConfigMap Key: FLINK-30530 URL: https://issues.apache.org/jira/browse/FLINK-30530 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Affects Versions: 1.15.2 Environment: Flink 1.15.2 Flink Kubernetes operator 1.2.0 Reporter: Arseniy Tashoyan Currently the Flink configuration can be specified in the YAML descriptor of FlinkDeployment via the _flinkConfiguration_ setting: {code:yaml} flinkConfiguration: taskmanager.numberOfTaskSlots: "2" ... {code} Same for the logging configuration: {code:yaml} logConfiguration: "log4j-console.properties": | rootLogger.level = DEBUG ...{code} This makes the YAML descriptor overloaded and huge. In addition, Flink and logging configuration may differ for different applications, while the Kubernetes settings maybe same for all applications. Therefore it makes sense to extract Flink and logging configurations from the YAML descriptor. This can be done via a user-provided ConfigMap: {code:yaml} flinkConfigMap: basic-example-flink-config {code} In this example we have a Flink application {_}basic-example{_}. The _basic-example-flink-config_ ConfigMap contains all config files used by Flink: flink-conf.yaml, log4j.properties, possibly other files. Therefore we can have different Flink settings for different applications and the same YAML descriptor for all of them (only the value for flinkConfigMap differs). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30529) Use simple cancel for savepoint upgrades on failing jobs
Gyula Fora created FLINK-30529: -- Summary: Use simple cancel for savepoint upgrades on failing jobs Key: FLINK-30529 URL: https://issues.apache.org/jira/browse/FLINK-30529 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Reporter: Gyula Fora In some cases where savepoint upgrades are required we should consider simply cancelling the job and observing the last checkpoint/savepoint available. We should think through what are the caveats of this approach but this could help upgrade some clusters that would be stuck now. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30528) Job may be stuck in upgrade loop when last-state fallback is disabled and deployment is missing
Gyula Fora created FLINK-30528: -- Summary: Job may be stuck in upgrade loop when last-state fallback is disabled and deployment is missing Key: FLINK-30528 URL: https://issues.apache.org/jira/browse/FLINK-30528 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.3.0 Reporter: Gyula Fora When last-state upgrade fallback is disabled (or we are switching flink versions) and the JM deployment is missing for some reason, we get stuck in the upgrade loop as the spec change /upgrade logic always takes precedence over the JM deployment recovery logic. In this cases the JM deployment need to be recovered first so savepoint upgrade can be executed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30527) Last-state suspend followed by flinkVersion change may lead to state loss
Gyula Fora created FLINK-30527: -- Summary: Last-state suspend followed by flinkVersion change may lead to state loss Key: FLINK-30527 URL: https://issues.apache.org/jira/browse/FLINK-30527 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.3.0, kubernetes-operator-1.2.0 Reporter: Gyula Fora We do not check flink version changes on recovery, so if the user suspended the job using last-state mode (or that was the only mode available) and then subsequently the flinkVersion is changed to a non-HA compatible version, the job would be restored using last-state and state would be lost. In these cases we should set an error in the Flink resource instructing the user that changing version is not allowed at that point. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30526) Handle failures in OpenSearch with ActionRequestFailureHandler being deprecated
Martijn Visser created FLINK-30526: -- Summary: Handle failures in OpenSearch with ActionRequestFailureHandler being deprecated Key: FLINK-30526 URL: https://issues.apache.org/jira/browse/FLINK-30526 Project: Flink Issue Type: Bug Components: Connectors / Opensearch Reporter: Martijn Visser {quote} Hi everyone, I have a streaming application that has Elasticsearch sink. I Upgraded flink version from 1.11 to 1.16 and also moved from ES 7 to OpenSearch 2.0, and now I'm facing some deprected issues, hope you can help me. In the previous version I created ElasticsearchSink and added a failure handler, which protected the sink to not fail on some exceptions. final ActionRequestFailureHandler failureHandler = (action, failure, restStatusCode, indexer) -> { if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) { indexer.add(action); } else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) { log.warn("Got malformed document , action {}", action); // malformed document; simply drop elasticsearchSinkFunction without failing sink } else if (failure instanceof IOException && failure.getCause() instanceof NullPointerException && failure.getMessage().contains("Unable to parse response body")) { //issue with ES 7 and opensearch - that does not send type - while response is waiting for it //at org.elasticsearch.action.DocWriteResponse.(DocWriteResponse.java:127) -- this.type = Objects.requireNonNull(type); log.debug("known issue format the response for ES 7.5.1 and DB OS (opensearch) :{}", failure.getMessage()); } else { // for all other failures, log and don't fail the sink log.error("Got error while trying to perform ES action {}", action, failure); } }; final ElasticsearchSink.Builder builder = new ElasticsearchSink.Builder<>(transportNodes, elasticsearchSinkFunction); In the new version the class ActionRequestFailureHandler is deprecated and after investigation I can't find any way to handle failures. For all failures the sink fails. Is there anything I didn't see? Thanks is advance! {quote} >From the Apache Flink Slack channel >https://apache-flink.slack.com/archives/C03G7LJTS2G/p1672122873318899 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30525) Cannot open jobmanager configuration web page
Weihua Hu created FLINK-30525: - Summary: Cannot open jobmanager configuration web page Key: FLINK-30525 URL: https://issues.apache.org/jira/browse/FLINK-30525 Project: Flink Issue Type: Bug Components: Runtime / Web Frontend Affects Versions: 1.17.0 Reporter: Weihua Hu Attachments: image-2022-12-28-20-37-00-825.png, image-2022-12-28-20-37-05-551.png we remove the environments in rest api in https://issues.apache.org/jira/browse/FLINK-30116. The jobmanager.configuration web page will throw "TypeError: Cannot read properties of undefined (reading 'length')" the environment in jobmanager.configuration web page should be delete too. !image-2022-12-28-20-37-00-825.png! !image-2022-12-28-20-37-05-551.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager
Thanks for your reply Yuxin, > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from > configurations, which are not calculated. I have described them in the FLIP > motivation section. The motivation section says about floating buffers: > FloatingBuffersPerGate is within the range of [numFloatingBufferThreashold, ExclusiveBuffersPerChannel * numChannels + DefaultFloatingBuffersPerGate] ... So my question is what value exactly in this range will it have and how and where will it be computed? As for the ExclusiveBuffersPerChannel, there was a proposal in the thread to calculate it dynamically (by linear search from taskmanager.network.memory.buffers-per-channel down to 0). Also, if the two configuration options are still in use, why does the FLIP propose to deprecate them? Besides that, wouldn't it be more clear to separate motivation from the proposed changes? Regards, Roman On Wed, Dec 28, 2022 at 12:19 PM JasonLee <17610775...@163.com> wrote: > Hi Yuxin > > > Thanks for the proposal, big + 1 for this FLIP. > > > > It is difficult for users to calculate the size of network memory. If the > setting is too small, the task cannot be started. If the setting is too > large, there may be a waste of resources. As far as possible, Flink > framework can automatically set a reasonable value, but I have a small > problem. network memory is not only related to the parallelism of the task, > but also to the complexity of the task DAG. The more complex a DAG is, > shuffle write and shuffle read require larger buffers. How can we determine > how many RS and IG a DAG has? > > > > Best > JasonLee > > > Replied Message > | From | Yuxin Tan | > | Date | 12/28/2022 18:29 | > | To | | > | Subject | Re: [DISCUSS] FLIP-266: Simplify network memory configurations > for TaskManager | > Hi, Roman > > Thanks for the replay. > > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from > configurations, which are not calculated. I have described them in the FLIP > motivation section. > > 3. Each gate requires at least one buffer... > The timeout exception occurs when the ExclusiveBuffersPerChannel > can not be requested from NetworkBufferPool, which is not caused by the > change of this Flip. In addition, if we have set the > ExclusiveBuffersPerChannel > to 0 when using floating buffers, which can also decrease the probability > of > this exception. > > 4. It would be great to have experimental results for jobs with different > exchange types. > Thanks for the suggestion. I have a test about different exchange types, > forward > and rescale, and the results show no differences from the all-to-all type, > which > is also understandable, because the network memory usage is calculated > with numChannels, independent of the edge type. > > Best, > Yuxin > > > Roman Khachatryan 于2022年12月28日周三 05:27写道: > > Hi everyone, > > Thanks for the proposal and the discussion. > > I couldn't find much details on how exactly the values of > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated. > I guess that > - the threshold evaluation is done on JM > - floating buffers calculation is done on TM based on the current memory > available; so it is not taking into account any future tasks submitted for > that (or other) job > Is that correct? > > If so, I see the following potential issues: > > 1. Each (sub)task might have different values because the actual > available memory might be different. E.g. some tasks might use exclusive > buffers and others only floating. That could lead to significant skew > in processing speed, and in turn to issues with checkpoints and watermarks. > > 2. Re-deployment of a task (e.g. on job failure) might lead to a completely > different memory configuration. That, coupled with different values per > subtask and operator, makes the performance analysis more difficult. > > (Regardless of whether it's done on TM or JM): > 3. Each gate requires at least one buffer [1]. So, in case when no memory > is available, TM will throw an Allocation timeout exception instead of > Insufficient buffers exception immediately. A delay here (allocation > timeout) seems like a regression. > Besides that, the regression depends on how much memory is actually > available and how much it is contended, doesn't it? > Should there still be a lower threshold of available memory, below which > the job (task) isn't accepted? > 4. The same threshold for all types of shuffles will likely result in using > exclusive buffers > for point-wise connections and floating buffers for all-to-all ones. I'm > not sure if that's always optimal. It would be great to have experimental > results for jobs with different exchange types, WDYT? > > [1] > https://issues.apache.org/jira/browse/FLINK-24035 > > Regards, > Roman > > > On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan wrote: > > Hi, Weihua > > Thanks for your suggestions. > > 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the >
Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager
Hi, JasonLee Thanks for the feedback. > How can we determine how many RS and IG a DAG has? Network memory is related to the parallelism and the complexity of the task DAG, which I think is correct. However, this Flip can only improve the part issue of the total issue, mainly focusing on memory optimization of Shuffle reading. We can only limit the total read buffers in one InputGate, but can not determine the number of RS and IG. The good news is that this is an independent problem. Maybe we could try to optimize and solve that problem later. Best, Yuxin JasonLee <17610775...@163.com> 于2022年12月28日周三 19:18写道: > Hi Yuxin > > > Thanks for the proposal, big + 1 for this FLIP. > > > > It is difficult for users to calculate the size of network memory. If the > setting is too small, the task cannot be started. If the setting is too > large, there may be a waste of resources. As far as possible, Flink > framework can automatically set a reasonable value, but I have a small > problem. network memory is not only related to the parallelism of the task, > but also to the complexity of the task DAG. The more complex a DAG is, > shuffle write and shuffle read require larger buffers. How can we determine > how many RS and IG a DAG has? > > > > Best > JasonLee > > > Replied Message > | From | Yuxin Tan | > | Date | 12/28/2022 18:29 | > | To | | > | Subject | Re: [DISCUSS] FLIP-266: Simplify network memory configurations > for TaskManager | > Hi, Roman > > Thanks for the replay. > > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from > configurations, which are not calculated. I have described them in the FLIP > motivation section. > > 3. Each gate requires at least one buffer... > The timeout exception occurs when the ExclusiveBuffersPerChannel > can not be requested from NetworkBufferPool, which is not caused by the > change of this Flip. In addition, if we have set the > ExclusiveBuffersPerChannel > to 0 when using floating buffers, which can also decrease the probability > of > this exception. > > 4. It would be great to have experimental results for jobs with different > exchange types. > Thanks for the suggestion. I have a test about different exchange types, > forward > and rescale, and the results show no differences from the all-to-all type, > which > is also understandable, because the network memory usage is calculated > with numChannels, independent of the edge type. > > Best, > Yuxin > > > Roman Khachatryan 于2022年12月28日周三 05:27写道: > > Hi everyone, > > Thanks for the proposal and the discussion. > > I couldn't find much details on how exactly the values of > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated. > I guess that > - the threshold evaluation is done on JM > - floating buffers calculation is done on TM based on the current memory > available; so it is not taking into account any future tasks submitted for > that (or other) job > Is that correct? > > If so, I see the following potential issues: > > 1. Each (sub)task might have different values because the actual > available memory might be different. E.g. some tasks might use exclusive > buffers and others only floating. That could lead to significant skew > in processing speed, and in turn to issues with checkpoints and watermarks. > > 2. Re-deployment of a task (e.g. on job failure) might lead to a completely > different memory configuration. That, coupled with different values per > subtask and operator, makes the performance analysis more difficult. > > (Regardless of whether it's done on TM or JM): > 3. Each gate requires at least one buffer [1]. So, in case when no memory > is available, TM will throw an Allocation timeout exception instead of > Insufficient buffers exception immediately. A delay here (allocation > timeout) seems like a regression. > Besides that, the regression depends on how much memory is actually > available and how much it is contended, doesn't it? > Should there still be a lower threshold of available memory, below which > the job (task) isn't accepted? > 4. The same threshold for all types of shuffles will likely result in using > exclusive buffers > for point-wise connections and floating buffers for all-to-all ones. I'm > not sure if that's always optimal. It would be great to have experimental > results for jobs with different exchange types, WDYT? > > [1] > https://issues.apache.org/jira/browse/FLINK-24035 > > Regards, > Roman > > > On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan wrote: > > Hi, Weihua > > Thanks for your suggestions. > > 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the > total buffer is not enough? > > I think it's a good idea. Will try and check the results in PoC. Before > all > read buffers use floating buffers, I will try to use > (ExclusiveBuffersPerChannel - i) > buffers per channel first. For example, if the user has configured > ExclusiveBuffersPerChannel to 4, it will check whether all read buffers > are sufficient from
[jira] [Created] (FLINK-30524) Unify the style of the logs page
Yongming Zhang created FLINK-30524: -- Summary: Unify the style of the logs page Key: FLINK-30524 URL: https://issues.apache.org/jira/browse/FLINK-30524 Project: Flink Issue Type: Improvement Components: Runtime / Web Frontend Affects Versions: 1.16.0 Reporter: Yongming Zhang Fix For: 1.17.0 There are certain differences in the layout of the "Logs/Stdout" and "Log List" pages. !https://intranetproxy.alipay.com/skylark/lark/0/2022/png/336411/1672225951771-e7d1c4b8-d02d-495b-a390-2fd0a47cc122.png|width=1160,id=ufb596fa9! !https://intranetproxy.alipay.com/skylark/lark/0/2022/png/336411/1672227240081-94627c11-852c-40a2-b1de-853a3a61c050.png|width=1153,id=u22bf640f! I want to unify the style of the logs page and user can view the log in full screen in Logs/Stdout page. !https://intranetproxy.alipay.com/skylark/lark/0/2022/png/336411/1672226099286-f7ea36e3-2c79-43dc-b2ac-ebb400482038.png|width=1146,id=u079e5309! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30523) Refine benchmark of vectorAssembler
weibo zhao created FLINK-30523: -- Summary: Refine benchmark of vectorAssembler Key: FLINK-30523 URL: https://issues.apache.org/jira/browse/FLINK-30523 Project: Flink Issue Type: Improvement Components: Library / Machine Learning Reporter: weibo zhao Refine benchmark of vectorAssembler -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager
Hi Yuxin Thanks for the proposal, big + 1 for this FLIP. It is difficult for users to calculate the size of network memory. If the setting is too small, the task cannot be started. If the setting is too large, there may be a waste of resources. As far as possible, Flink framework can automatically set a reasonable value, but I have a small problem. network memory is not only related to the parallelism of the task, but also to the complexity of the task DAG. The more complex a DAG is, shuffle write and shuffle read require larger buffers. How can we determine how many RS and IG a DAG has? Best JasonLee Replied Message | From | Yuxin Tan | | Date | 12/28/2022 18:29 | | To | | | Subject | Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager | Hi, Roman Thanks for the replay. ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from configurations, which are not calculated. I have described them in the FLIP motivation section. 3. Each gate requires at least one buffer... The timeout exception occurs when the ExclusiveBuffersPerChannel can not be requested from NetworkBufferPool, which is not caused by the change of this Flip. In addition, if we have set the ExclusiveBuffersPerChannel to 0 when using floating buffers, which can also decrease the probability of this exception. 4. It would be great to have experimental results for jobs with different exchange types. Thanks for the suggestion. I have a test about different exchange types, forward and rescale, and the results show no differences from the all-to-all type, which is also understandable, because the network memory usage is calculated with numChannels, independent of the edge type. Best, Yuxin Roman Khachatryan 于2022年12月28日周三 05:27写道: Hi everyone, Thanks for the proposal and the discussion. I couldn't find much details on how exactly the values of ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated. I guess that - the threshold evaluation is done on JM - floating buffers calculation is done on TM based on the current memory available; so it is not taking into account any future tasks submitted for that (or other) job Is that correct? If so, I see the following potential issues: 1. Each (sub)task might have different values because the actual available memory might be different. E.g. some tasks might use exclusive buffers and others only floating. That could lead to significant skew in processing speed, and in turn to issues with checkpoints and watermarks. 2. Re-deployment of a task (e.g. on job failure) might lead to a completely different memory configuration. That, coupled with different values per subtask and operator, makes the performance analysis more difficult. (Regardless of whether it's done on TM or JM): 3. Each gate requires at least one buffer [1]. So, in case when no memory is available, TM will throw an Allocation timeout exception instead of Insufficient buffers exception immediately. A delay here (allocation timeout) seems like a regression. Besides that, the regression depends on how much memory is actually available and how much it is contended, doesn't it? Should there still be a lower threshold of available memory, below which the job (task) isn't accepted? 4. The same threshold for all types of shuffles will likely result in using exclusive buffers for point-wise connections and floating buffers for all-to-all ones. I'm not sure if that's always optimal. It would be great to have experimental results for jobs with different exchange types, WDYT? [1] https://issues.apache.org/jira/browse/FLINK-24035 Regards, Roman On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan wrote: Hi, Weihua Thanks for your suggestions. 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the total buffer is not enough? I think it's a good idea. Will try and check the results in PoC. Before all read buffers use floating buffers, I will try to use (ExclusiveBuffersPerChannel - i) buffers per channel first. For example, if the user has configured ExclusiveBuffersPerChannel to 4, it will check whether all read buffers are sufficient from 4 to 1. Only when ExclusiveBuffersPerChannel of all channels is 1 and all read buffers are insufficient, all read buffers will use floating buffers. If the test results prove better, the FLIP will use this method. 2. Do we really need to change the default value of 'taskmanager.memory.network.max'? Changing taskmanager.memory.network.max will indeed affect some users, but the user only is affected when the 3 conditions are fulfilled. 1) Flink total TM memory is larger than 10g (because the network memory ratio is 0.1). 2) taskmanager.memory.network.max was not initially configured. 3) Other memory, such as managed memory or heap memory, is insufficient. I think the number of jobs fulfilling the conditions is small because when TM uses such a large amount of memory, the network memory requirement may also be large.
[jira] [Created] (FLINK-30522) `SHOW TBLPROPERTIES` can't read properties of table in Spark3
yuzelin created FLINK-30522: --- Summary: `SHOW TBLPROPERTIES` can't read properties of table in Spark3 Key: FLINK-30522 URL: https://issues.apache.org/jira/browse/FLINK-30522 Project: Flink Issue Type: Bug Components: Table Store Affects Versions: table-store-0.4.0 Reporter: yuzelin -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager
Hi, Roman Thanks for the replay. ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from configurations, which are not calculated. I have described them in the FLIP motivation section. > 3. Each gate requires at least one buffer... The timeout exception occurs when the ExclusiveBuffersPerChannel can not be requested from NetworkBufferPool, which is not caused by the change of this Flip. In addition, if we have set the ExclusiveBuffersPerChannel to 0 when using floating buffers, which can also decrease the probability of this exception. > 4. It would be great to have experimental results for jobs with different exchange types. Thanks for the suggestion. I have a test about different exchange types, forward and rescale, and the results show no differences from the all-to-all type, which is also understandable, because the network memory usage is calculated with numChannels, independent of the edge type. Best, Yuxin Roman Khachatryan 于2022年12月28日周三 05:27写道: > Hi everyone, > > Thanks for the proposal and the discussion. > > I couldn't find much details on how exactly the values of > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated. > I guess that > - the threshold evaluation is done on JM > - floating buffers calculation is done on TM based on the current memory > available; so it is not taking into account any future tasks submitted for > that (or other) job > Is that correct? > > If so, I see the following potential issues: > > 1. Each (sub)task might have different values because the actual > available memory might be different. E.g. some tasks might use exclusive > buffers and others only floating. That could lead to significant skew > in processing speed, and in turn to issues with checkpoints and watermarks. > > 2. Re-deployment of a task (e.g. on job failure) might lead to a completely > different memory configuration. That, coupled with different values per > subtask and operator, makes the performance analysis more difficult. > > (Regardless of whether it's done on TM or JM): > 3. Each gate requires at least one buffer [1]. So, in case when no memory > is available, TM will throw an Allocation timeout exception instead of > Insufficient buffers exception immediately. A delay here (allocation > timeout) seems like a regression. > Besides that, the regression depends on how much memory is actually > available and how much it is contended, doesn't it? > Should there still be a lower threshold of available memory, below which > the job (task) isn't accepted? > 4. The same threshold for all types of shuffles will likely result in using > exclusive buffers > for point-wise connections and floating buffers for all-to-all ones. I'm > not sure if that's always optimal. It would be great to have experimental > results for jobs with different exchange types, WDYT? > > [1] > https://issues.apache.org/jira/browse/FLINK-24035 > > Regards, > Roman > > > On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan wrote: > > > Hi, Weihua > > > > Thanks for your suggestions. > > > > > 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the > > total buffer is not enough? > > > > I think it's a good idea. Will try and check the results in PoC. Before > all > > read buffers use floating buffers, I will try to use > > (ExclusiveBuffersPerChannel - i) > > buffers per channel first. For example, if the user has configured > > ExclusiveBuffersPerChannel to 4, it will check whether all read buffers > > are sufficient from 4 to 1. Only when ExclusiveBuffersPerChannel of > > all channels is 1 and all read buffers are insufficient, all read buffers > > will use floating buffers. > > If the test results prove better, the FLIP will use this method. > > > > > 2. Do we really need to change the default value of > > 'taskmanager.memory.network.max'? > > > > Changing taskmanager.memory.network.max will indeed affect some > > users, but the user only is affected when the 3 conditions are fulfilled. > > 1) Flink total TM memory is larger than 10g (because the network memory > > ratio is 0.1). > > 2) taskmanager.memory.network.max was not initially configured. > > 3) Other memory, such as managed memory or heap memory, is insufficient. > > I think the number of jobs fulfilling the conditions is small because > when > > TM > > uses such a large amount of memory, the network memory requirement may > > also be large. And when encountering the issue, the rollback method is > very > > simple, > > configuring taskmanager.memory.network.max as 1g or other values. > > In addition, the reason for modifying the default value is to simplify > the > > network > > configurations in most scenarios. This change does affect a few usage > > scenarios, > > but we should admit that setting the default to any value may not meet > > the requirements of all scenarios. > > > > Best, > > Yuxin > > > > > > Weihua Hu 于2022年12月26日周一 20:35写道: > > > > > Hi Yuxin, > > > Thanks for the proposal. > > > > > > "Insufficient
[jira] [Created] (FLINK-30521) Improve `Altering Tables` of Doc
yuzelin created FLINK-30521: --- Summary: Improve `Altering Tables` of Doc Key: FLINK-30521 URL: https://issues.apache.org/jira/browse/FLINK-30521 Project: Flink Issue Type: Improvement Components: Table Store Affects Versions: table-store-0.4.0 Reporter: yuzelin Add more syntax description in the section `Altering Tables`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job
Thanks for all your feedback! To @Yuxia, > What the sink expect to do to isolate data produced by speculative > executions? IIUC, if the taks failover, it also generate a new attempt. > Does it make difference in isolating data produced? Yes there is something different from the task failover scenario. The attempt number is more necessary for speculative execution than failover. Because there can be only one subtask instance running at the same time in the failover scenario. Let's take FileSystemOutputFormat as an example. For the failover scenario, the temporary directory to store produced data can be something like "$root_dir/task-$taskNumber/". At the initialization phase, subtask deletes and re-creates the temporary directory. However in the speculative execution scenario, it does not work because there might be several subtasks running at the same time. These subtasks might delete, re-create and write the same temporary directory at the same time. The correct temporary directory should be like "$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's necessary to expose the attempt number to the Sink implementation to do the data isolation. To @Lijie, > I have a question about this: does SinkV2 need to do the same thing? Actually, yes. Should we/users do it in the committer? If yes, how does the commiter know > which one is the right subtask attempt? Yes, we/users should do it in the committer. In the current design, the Committer of Sink V2 should get the "which one is the right subtask attempt" information from the "committable data'' produced by SinkWriter. Let's take the FileSink as example, the "committable data" sent to the Committer contains the full path of the files produced by SinkWriter. Users could also pass the attempt number through "committable data" from SinkWriter to Committer. In the "Rejected Alternatives -> Introduce a way to clean leaked data of Sink V2" section of the FLIP document, we discussed some of the reasons that we didn't provide the API like OutputFormat. To @Jing Zhang I have a question about this: Speculative execution of Committer will be > disabled. I agree with your point and I saw the similar requirements to disable speculative > execution for specified operators. However the requirement is not supported currently. I think there should be some > place to describe how to support it. In this FLIP design, the speculative execution of Committer of Sink V2 will be disabled by Flink. It's not an optional operation. Users can not change it. And as you said, "disable speculative execution for specified operators" is not supported in the FLIP. Because it's a bit out of scope: "Sink Supports Speculative Execution For Batch Job". I think it's better to start another FLIP to discuss it. "Fine-grained control of enabling speculative execution for operators" can be the title of that FLIP. And we can discuss there how to enable or disable speculative execution for specified operators including Committer and pre/post-committer of Sink V2. What do you think? Thanks, Biao /'bɪ.aʊ/ On Wed, 28 Dec 2022 at 11:30, Jing Zhang wrote: > Hi Biao, > > Thanks for driving this FLIP. It's meaningful to support speculative > execution > of sinks is important. > > I have a question about this: Speculative execution of Committer will be > disabled. > > I agree with your point and I saw the similar requirements to disable > speculative execution for specified operators. > > However the requirement is not supported currently. I think there should be > some place to describe how to support it. > > Best, > Jing Zhang > > Lijie Wang 于2022年12月27日周二 18:51写道: > > > Hi Biao, > > > > Thanks for driving this FLIP. > > In this FLIP, it introduces "int getFinishedAttempt(int subtaskIndex)" > for > > OutputFormat to know which subtask attempt is the one marked as finished > by > > JM and commit the right data. > > I have a question about this: does SinkV2 need to do the same thing? > Should > > we/users do it in the committer? If yes, how does the commiter know which > > one is the right subtask attempt? > > > > Best, > > Lijie > > > > yuxia 于2022年12月27日周二 10:01写道: > > > > > HI, Biao. > > > Thanks for driving this FLIP. > > > After quick look of this FLIP, I have a question about "expose the > > attempt > > > number which can be used to isolate data produced by speculative > > executions > > > with the same subtask id". > > > What the sink expect to do to isolate data produced by speculative > > > executions? IIUC, if the taks failover, it also generate a new > attempt. > > > Does it make difference in isolating data produced? > > > > > > Best regards, > > > Yuxia > > > > > > - 原始邮件 - > > > 发件人: "Biao Liu" > > > 收件人: "dev" > > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21 > > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch > Job > > > > > > Hi everyone, > > > > > > I would like to start a discussion about making Sink support > speculative > > >
[jira] [Created] (FLINK-30520) Arguments contains with '#' will error split in loadYAMLResource
tanjialiang created FLINK-30520: --- Summary: Arguments contains with '#' will error split in loadYAMLResource Key: FLINK-30520 URL: https://issues.apache.org/jira/browse/FLINK-30520 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.15.3, 1.14.6, 1.16.0 Reporter: tanjialiang When i submit a flink jar job in Kubernetes Application mode which main args contains '#', it will be error split by org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource such as i using flink-kubernetes-operator to submit a job in kubernetes application mode {code:java} apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: word-count spec: image: apache/flink:1.16.0-scala_2.12-java8 flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" jobManager: resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 serviceAccount: flink job: jarURI: local:///opt/flink/examples/streaming/WordCount.jar args: - --output - /tmp/1#.txt parallelism: 2 upgradeMode: stateless {code} It will be error split when loading the flink-conf.yaml !image-2022-12-28-16-30-30-645.png! And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args is right !image-2022-12-28-16-34-25-819.png! Maybe we should have a more strict validate for yaml comment? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30519) Add e2e tests for operator dynamic config
Gyula Fora created FLINK-30519: -- Summary: Add e2e tests for operator dynamic config Key: FLINK-30519 URL: https://issues.apache.org/jira/browse/FLINK-30519 Project: Flink Issue Type: New Feature Components: Kubernetes Operator Reporter: Gyula Fora Fix For: kubernetes-operator-1.4.0 The dynamic config feature is currently not covered by e2e tests and is subject to accidental regressions, as shown in: https://issues.apache.org/jira/browse/FLINK-30329 We should add an e2e test that covers this -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30518) [flink-operator] Kubernetes HA not working due to wrong jobmanager.rpc.address
Binh-Nguyen Tran created FLINK-30518: Summary: [flink-operator] Kubernetes HA not working due to wrong jobmanager.rpc.address Key: FLINK-30518 URL: https://issues.apache.org/jira/browse/FLINK-30518 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.3.0 Environment: ~ flinkdeployment.yaml ~ ``` spec: flinkConfiguration: high-availability: kubernetes high-availability.storageDir: "file:///opt/flink/storage" ... jobManager: replicas: 3 ``` Reporter: Binh-Nguyen Tran Attachments: flink-configmap.png Since flink-conf.yaml is mounted as read-only configmap, the /docker-entrypoint.sh script is not able to inject correct Pod IP to `jobmanager.rpc.address`. This leads to same address (e.g flink.ns-ext) being set for all Job Manager pods. This causes: (1) flink-cluster-config-map always contains same address for all 3 component leaders (see screenshot) (2) Accessing Web UI when jobmanager.replicas > 1 is not possible with error ``` {"errors":["Service temporarily unavailable due to an ongoing leader election. Please refresh."]} ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30517) Decrease log output interval while waiting for YARN JobManager be allocated
Weihua Hu created FLINK-30517: - Summary: Decrease log output interval while waiting for YARN JobManager be allocated Key: FLINK-30517 URL: https://issues.apache.org/jira/browse/FLINK-30517 Project: Flink Issue Type: Improvement Components: Deployment / YARN Affects Versions: 1.16.0 Reporter: Weihua Hu Attachments: image-2022-12-28-15-56-56-045.png Flink Client will retrieve the application status every 250ms after submitting to YARN. If JobManager does not start in 60 seconds, it will log "Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster" every 250ms. This will lead to too many logs. We can keep the check interval at 250ms, but log the message every 1 minute. -- This message was sent by Atlassian Jira (v8.20.10#820010)