Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

2022-12-28 Thread Yuxin Tan
Hi, Roman

Sorry about that I missed one question just now.

>  if the two configuration options are still in use, why does the FLIP
propose to deprecate them?
These two configs are usually used to avoid the memory issue, but
after introducing the improvement, generally, I think it is no longer
necessary to adjust these two configurations to avoid the issue. So
I propose to deprecate them in the future when the @Experimental
annotation of the newly added config is removed.

Best,
Yuxin


Roman Khachatryan  于2022年12月28日周三 20:10写道:

> Thanks for your reply Yuxin,
>
> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
> > configurations, which are not calculated. I have described them in the
> FLIP
> > motivation section.
>
> The motivation section says about floating buffers:
> > FloatingBuffersPerGate is within the range of
> [numFloatingBufferThreashold, ExclusiveBuffersPerChannel * numChannels +
> DefaultFloatingBuffersPerGate] ...
> So my question is what value exactly in this range will it have and how and
> where will it be computed?
>
> As for the ExclusiveBuffersPerChannel, there was a proposal in the thread
> to calculate it dynamically (by linear search
> from taskmanager.network.memory.buffers-per-channel down to 0).
>
> Also, if the two configuration options are still in use, why does the FLIP
> propose to deprecate them?
>
> Besides that, wouldn't it be more clear to separate motivation from the
> proposed changes?
>
> Regards,
> Roman
>
>
> On Wed, Dec 28, 2022 at 12:19 PM JasonLee <17610775...@163.com> wrote:
>
> > Hi Yuxin
> >
> >
> > Thanks for the proposal, big + 1 for this FLIP.
> >
> >
> >
> > It is difficult for users to calculate the size of network memory. If the
> > setting is too small, the task cannot be started. If the setting is too
> > large, there may be a waste of resources. As far as possible, Flink
> > framework can automatically set a reasonable value, but I have a small
> > problem. network memory is not only related to the parallelism of the
> task,
> > but also to the complexity of the task DAG. The more complex a DAG is,
> > shuffle write and shuffle read require larger buffers. How can we
> determine
> > how many RS and IG a DAG has?
> >
> >
> >
> > Best
> > JasonLee
> >
> >
> >  Replied Message 
> > | From | Yuxin Tan |
> > | Date | 12/28/2022 18:29 |
> > | To |  |
> > | Subject | Re: [DISCUSS] FLIP-266: Simplify network memory
> configurations
> > for TaskManager |
> > Hi, Roman
> >
> > Thanks for the replay.
> >
> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
> > configurations, which are not calculated. I have described them in the
> FLIP
> > motivation section.
> >
> > 3. Each gate requires at least one buffer...
> > The timeout exception occurs when the ExclusiveBuffersPerChannel
> > can not be requested from NetworkBufferPool, which is not caused by the
> > change of this Flip. In addition, if  we have set the
> > ExclusiveBuffersPerChannel
> > to 0 when using floating buffers, which can also decrease the probability
> > of
> > this exception.
> >
> > 4. It would be great to have experimental results for jobs with different
> > exchange types.
> > Thanks for the suggestion. I have a test about different exchange types,
> > forward
> > and rescale, and the results show no differences from the all-to-all
> type,
> > which
> > is also understandable, because the network memory usage is calculated
> > with numChannels, independent of the edge type.
> >
> > Best,
> > Yuxin
> >
> >
> > Roman Khachatryan  于2022年12月28日周三 05:27写道:
> >
> > Hi everyone,
> >
> > Thanks for the proposal and the discussion.
> >
> > I couldn't find much details on how exactly the values of
> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated.
> > I guess that
> > - the threshold evaluation is done on JM
> > - floating buffers calculation is done on TM based on the current memory
> > available; so it is not taking into account any future tasks submitted
> for
> > that (or other) job
> > Is that correct?
> >
> > If so, I see the following potential issues:
> >
> > 1. Each (sub)task might have different values because the actual
> > available memory might be different. E.g. some tasks might use exclusive
> > buffers and others only floating. That could lead to significant skew
> > in processing speed, and in turn to issues with checkpoints and
> watermarks.
> >
> > 2. Re-deployment of a task (e.g. on job failure) might lead to a
> completely
> > different memory configuration. That, coupled with different values per
> > subtask and operator, makes the performance analysis more difficult.
> >
> > (Regardless of whether it's done on TM or JM):
> > 3. Each gate requires at least one buffer [1]. So, in case when no memory
> > is available, TM will throw an Allocation timeout exception instead of
> > Insufficient buffers exception immediately. A delay here (allocation
> > timeout) seems like a regression.
> > Besides that, 

[DISCUSS] Adding a option for planner to decide which join reorder rule to choose

2022-12-28 Thread yh z
Hi, devs,

I'd like to start a discuss about adding an option called
"table.oprimizer.busy-join-reorder-threshold" for planner rule while we try
to introduce a new busy join reorder rule[1] into Flink.

This join reorder rule is based on dynamic programing[2], which can store
all possible intermediate results, and the cost model can be used to select
the optimal join reorder result. Compare with the existing Lopt join
reorder rule, the new rule can give more possible results and the result
can be more accurate. However, the search space of this rule will become
very large as the number of tables increases. So we should introduce an
option to limit the expansion of search space, if the number of table can
be reordered less than the threshold, the new busy join reorder rule is
used. On the contrary, the Lopt rule is used.

The default threshold intended to be set to 12. One reason is that in the
tpc-ds benchmark test, when the number of tables exceeds 12, the
optimization time will be very long. The other reason is that it refers to
relevant engines, like Spark, whose recommended setting is 12.[3]

Looking forward to your feedback.

[1]  https://issues.apache.org/jira/browse/FLINK-30376
[2]
https://courses.cs.duke.edu/compsci516/cps216/spring03/papers/selinger-etal-1979.pdf
[3]
https://spark.apache.org/docs/3.3.1/configuration.html#runtime-sql-configuration

Best regards,
Yunhong Zheng


Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

2022-12-28 Thread Yuxin Tan
Thanks for your question, Roman

> wouldn't it be more clear to separate motivation from the
proposed changes?
All the contents in the motivation part are the current implementation
of Shuffle reading.
The Flip has described where ExclusiveBuffersPerChannel
and FloatingBuffersPerGate is from, the description is as follows.
"ExclusiveBuffersPerChannel is determined by
taskmanager.network.memory.buffers-per-channel, which is 2 by default.
FloatingBuffersPerGate ranges from 1 to DefaultFloatingBuffersPerGate.
DefaultFloatingBuffersPerGate is determined by
taskmanager.network.memory.floating-buffers-per-gate, which is 8 by
default."
This part is the current implementation of network memory, not the change
introduced
from the Flip.
ExclusiveBuffersPerChannel is used in
SingleInputGateFactory#createInputChannel,
so it is the number of exclusive buffers in one channel. In one gate, the
floating
buffers FloatingBuffersPerGate are used in SingleInputGateFactory#create to
create the LocalBufferPool of the gate. The buffer number of
LocalBufferPool ranges
from 1 to the DefaultFloatingBuffersPerGate, which can be seen in
SingleInputGateFactory#createBufferPoolFactory, the
NettyShuffleUtils.getMinMaxFloatingBuffersPerInputGate controls the buffer
number of LocalBufferPool ranges from 1 to DefaultFloatingBuffersPerGate.
One gate
may contain multiple channels, we call the number of channels as
numChannels, So
the total buffers are
*ExclusiveBuffersPerChannel * numChannels + FloatingBuffersPerGate,*where
FloatingBuffersPerGate in LocalBufferPool is a range from 1 to
DefaultFloatingBuffersPerGate, so
"the range of TotalBuffersPerGate for each InputGate is
[ExclusiveBuffersPerChannel * numChannels
 + 1, ExclusiveBuffersPerChannel * numChannels +
DefaultFloatingBuffersPerGate]",
which is described in the Flip. All the above descriptions are current
implementations, not
changes from Flip. Therefore, the motivation section describes the current
implementation,
while the proposed change section describes the changes that have occurred,
which
separates motivation from the proposed change.

> So my question is what value exactly in this range will it have and how
and
where will it be computed?
Based on the current implementation, a threshold is introduced,
taskmanager.memory.network.read-required-buffer.max. As described above,
when
the the num of exclusive buffers in a gate (ExclusiveBuffersPerChannel *
numChannels)
is greater than the threshold, all read buffers will use floating buffers,
and in order to
keep the number of buffers consistent with that before the change, we will
modify the
creating process of LocalBufferPool.
This is an implementation detail and the change is as follows. The floating
buffers
in LocalBufferPool will be in the range of [numFloatingBufferThreashold,
ExclusiveBuffersPerChannel * numChannels + DefaultFloatingBuffersPerGate],
which means when calling SingleInputGateFactory#createBufferPoolFactory,
the Min buffers of the LocalBufferPool are numFloatingBufferThreashold, the
Max
buffers of the LocalBufferPool are ExclusiveBuffersPerChannel * numChannels
+
DefaultFloatingBuffersPerGate. The exact buffer num depends on whether
buffers
in NetworkBufferPool are sufficient when using LocalBufferPool, which can
be seen
in the current implementation of LocalBufferPool. And the exact number of
floating
buffers in LocalBufferPool is in the above range.

Best,
Yuxin


Roman Khachatryan  于2022年12月28日周三 20:10写道:

> Thanks for your reply Yuxin,
>
> > ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
> > configurations, which are not calculated. I have described them in the
> FLIP
> > motivation section.
>
> The motivation section says about floating buffers:
> > FloatingBuffersPerGate is within the range of
> [numFloatingBufferThreashold, ExclusiveBuffersPerChannel * numChannels +
> DefaultFloatingBuffersPerGate] ...
> So my question is what value exactly in this range will it have and how and
> where will it be computed?
>
> As for the ExclusiveBuffersPerChannel, there was a proposal in the thread
> to calculate it dynamically (by linear search
> from taskmanager.network.memory.buffers-per-channel down to 0).
>
> Also, if the two configuration options are still in use, why does the FLIP
> propose to deprecate them?
>
> Besides that, wouldn't it be more clear to separate motivation from the
> proposed changes?
>
> Regards,
> Roman
>
>
> On Wed, Dec 28, 2022 at 12:19 PM JasonLee <17610775...@163.com> wrote:
>
> > Hi Yuxin
> >
> >
> > Thanks for the proposal, big + 1 for this FLIP.
> >
> >
> >
> > It is difficult for users to calculate the size of network memory. If the
> > setting is too small, the task cannot be started. If the setting is too
> > large, there may be a waste of resources. As far as possible, Flink
> > framework can automatically set a reasonable value, but I have a small
> > problem. network memory is not only related to the parallelism of the
> task,

[jira] [Created] (FLINK-30532) Add benchmark for DCT, SQLTransformer and StopWordsRemover algorithm

2022-12-28 Thread Yunfeng Zhou (Jira)
Yunfeng Zhou created FLINK-30532:


 Summary: Add benchmark for DCT, SQLTransformer and 
StopWordsRemover algorithm
 Key: FLINK-30532
 URL: https://issues.apache.org/jira/browse/FLINK-30532
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Reporter: Yunfeng Zhou
 Fix For: ml-2.2.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30531) Reduce operator chain call stack depth

2022-12-28 Thread Dong Lin (Jira)
Dong Lin created FLINK-30531:


 Summary: Reduce operator chain call stack depth
 Key: FLINK-30531
 URL: https://issues.apache.org/jira/browse/FLINK-30531
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: Dong Lin


Benchmark results show that Flink time to execute simple programs is more than 
3X slower than Spark. For example, if we run the following program with object 
re-use enabled and with parallelism=1, it takes roughtly 120 sec on a macbook, 
whereas it takes Spark less than 40 sec to run the same logic on the same 
machine.
{code:java}
DataStream stream = env.fromSequence(1, 10L)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.map(x -> x)
.map(x -> x).addSink(new DiscardingSink<>());
{code}
 

It turns out that the operator chain overhead introduced by Flink is 
surprisingly high. For the above example program, Flink runtime goes through a 
call stack of 24 functions to produce 1 element. And each extra map(...) 
operation introduces 4 extra functions in the call stack.

Here are the 24 functions in the call stack:
{code:bash}
StreamTask#processInput
StreamOneInputProcessor#processInput
StreamTaskSourceInput#emitNext
SourceOperator#emitNext
IteratorSourceReaderBase#pollNext
SourceOutputWithWatermarks#collect
AsyncDataOutputToOutput#emitRecord
ChainingOutput#collect
StreamMap#processElement
CountingOutput#collect
ChainingOutput#collect
StreamMap#processElement
CountingOutput#collect
ChainingOutput#collect
StreamMap#processElement
CountingOutput#collect
ChainingOutput#collect
StreamMap#processElement
CountingOutput#collect
ChainingOutput#collect
StreamMap#processElement
CountingOutput#collect
ChainingOutput#collect
StreamSink#processElement
{code}
 

Given the evidence described above, we find the following explanations for why 
Flink is slow for programs with low computation overhead:
 * Operator chain currently uses pull-based loop, which has worse branch 
prediction than push-based loop.
 * Java's maximum inline level is less than 18 [2]. It is easy for operator 
chain call stack to exceeds this limit and prevent Java from inlining function 
calls, which further increases the function call overhead.
 * For function calls that are not inlined, it requires looking up a virtual 
table since most functions are virtual functions.

 

[1] [https://arxiv.org/pdf/1610.09166.pdf]

[2] [https://bugs.openjdk.org/browse/JDK-8234863]

 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30530) Flink configuration from user-provided ConfigMap

2022-12-28 Thread Arseniy Tashoyan (Jira)
Arseniy Tashoyan created FLINK-30530:


 Summary: Flink configuration from user-provided ConfigMap
 Key: FLINK-30530
 URL: https://issues.apache.org/jira/browse/FLINK-30530
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: 1.15.2
 Environment: Flink 1.15.2

Flink Kubernetes operator 1.2.0
Reporter: Arseniy Tashoyan


Currently the Flink configuration can be specified in the YAML descriptor of 
FlinkDeployment via the _flinkConfiguration_ setting:
{code:yaml}
flinkConfiguration: 
  taskmanager.numberOfTaskSlots: "2"
  ...
{code}
Same for the logging configuration:
{code:yaml}
logConfiguration: 
    "log4j-console.properties": |
      rootLogger.level = DEBUG
      ...{code}
This makes the YAML descriptor overloaded and huge. In addition, Flink and 
logging configuration may differ for different applications, while the 
Kubernetes settings maybe same for all applications. Therefore it makes sense 
to extract Flink and logging configurations from the YAML descriptor.

This can be done via a user-provided ConfigMap:
{code:yaml}
flinkConfigMap: basic-example-flink-config
{code}
In this example we have a Flink application {_}basic-example{_}. The 
_basic-example-flink-config_ ConfigMap contains all config files used by Flink: 
flink-conf.yaml, log4j.properties, possibly other files.

Therefore we can have different Flink settings for different applications and 
the same YAML descriptor for all of them (only the value for flinkConfigMap 
differs).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30529) Use simple cancel for savepoint upgrades on failing jobs

2022-12-28 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-30529:
--

 Summary: Use simple cancel for savepoint upgrades on failing jobs
 Key: FLINK-30529
 URL: https://issues.apache.org/jira/browse/FLINK-30529
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Reporter: Gyula Fora


In some cases where savepoint upgrades are required we should consider simply 
cancelling the job and observing the last checkpoint/savepoint available.

We should think through what are the caveats of this approach but this could 
help upgrade some clusters that would be stuck now.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30528) Job may be stuck in upgrade loop when last-state fallback is disabled and deployment is missing

2022-12-28 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-30528:
--

 Summary: Job may be stuck in upgrade loop when last-state fallback 
is disabled and deployment is missing
 Key: FLINK-30528
 URL: https://issues.apache.org/jira/browse/FLINK-30528
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.3.0
Reporter: Gyula Fora


When last-state upgrade fallback is disabled (or we are switching flink 
versions) and the JM deployment is missing for some reason, we get stuck in the 
upgrade loop as the spec change /upgrade logic always takes precedence over the 
JM deployment recovery logic.

In this cases the JM deployment need to be recovered first so savepoint upgrade 
can be executed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30527) Last-state suspend followed by flinkVersion change may lead to state loss

2022-12-28 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-30527:
--

 Summary: Last-state suspend followed by flinkVersion change may 
lead to state loss
 Key: FLINK-30527
 URL: https://issues.apache.org/jira/browse/FLINK-30527
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.3.0, kubernetes-operator-1.2.0
Reporter: Gyula Fora


We do not check flink version changes on recovery, so if the user suspended the 
job using last-state mode (or that was the only mode available) and then 
subsequently the flinkVersion is changed to a non-HA compatible version, the 
job would be restored using last-state and state would be lost.

In these cases we should set an error in the Flink resource instructing the 
user that changing version is not allowed at that point.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30526) Handle failures in OpenSearch with ActionRequestFailureHandler being deprecated

2022-12-28 Thread Martijn Visser (Jira)
Martijn Visser created FLINK-30526:
--

 Summary: Handle failures in OpenSearch with 
ActionRequestFailureHandler being deprecated
 Key: FLINK-30526
 URL: https://issues.apache.org/jira/browse/FLINK-30526
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Opensearch
Reporter: Martijn Visser


{quote} Hi everyone,
I have a streaming application that has Elasticsearch sink.
I Upgraded flink version from 1.11 to 1.16 and also moved from ES 7 to 
OpenSearch 2.0, and now I'm facing some deprected issues, hope you can help me.
In the previous version I created ElasticsearchSink and added a failure 
handler, which protected the sink to not fail on some exceptions.
 final ActionRequestFailureHandler failureHandler = (action, failure, 
restStatusCode, indexer) -> {
if (ExceptionUtils.findThrowable(failure, 
EsRejectedExecutionException.class).isPresent()) {
indexer.add(action);
} else if (ExceptionUtils.findThrowable(failure, 
ElasticsearchParseException.class).isPresent()) {
log.warn("Got malformed document , action {}", action);
// malformed document; simply drop elasticsearchSinkFunction 
without failing sink
} else if (failure instanceof IOException && failure.getCause() 
instanceof NullPointerException && failure.getMessage().contains("Unable to 
parse response body")) {
//issue with ES 7 and opensearch - that does not send type - 
while response is waiting for it
//at 
org.elasticsearch.action.DocWriteResponse.(DocWriteResponse.java:127) -- 
this.type = Objects.requireNonNull(type);
log.debug("known issue format the response for ES 7.5.1 and DB 
OS (opensearch) :{}", failure.getMessage());
} else {
// for all other failures, log and don't fail the sink
log.error("Got error while trying to perform ES action {}", 
action, failure);
}
};


 final ElasticsearchSink.Builder builder = new 
ElasticsearchSink.Builder<>(transportNodes, elasticsearchSinkFunction);
In the new version the class ActionRequestFailureHandler is deprecated and 
after investigation I can't find any way to handle failures.
For all failures the sink fails.
Is there anything I didn't see?
Thanks is advance! 
{quote}

>From the Apache Flink Slack channel 
>https://apache-flink.slack.com/archives/C03G7LJTS2G/p1672122873318899



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30525) Cannot open jobmanager configuration web page

2022-12-28 Thread Weihua Hu (Jira)
Weihua Hu created FLINK-30525:
-

 Summary: Cannot open jobmanager configuration web page
 Key: FLINK-30525
 URL: https://issues.apache.org/jira/browse/FLINK-30525
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Affects Versions: 1.17.0
Reporter: Weihua Hu
 Attachments: image-2022-12-28-20-37-00-825.png, 
image-2022-12-28-20-37-05-551.png

we remove the environments in rest api in 
https://issues.apache.org/jira/browse/FLINK-30116.
The jobmanager.configuration web page will throw "TypeError: Cannot read 
properties of undefined (reading 'length')" 

the environment in jobmanager.configuration web page should be delete too.
 !image-2022-12-28-20-37-00-825.png! 
 !image-2022-12-28-20-37-05-551.png! 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

2022-12-28 Thread Roman Khachatryan
Thanks for your reply Yuxin,

> ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
> configurations, which are not calculated. I have described them in the
FLIP
> motivation section.

The motivation section says about floating buffers:
> FloatingBuffersPerGate is within the range of
[numFloatingBufferThreashold, ExclusiveBuffersPerChannel * numChannels +
DefaultFloatingBuffersPerGate] ...
So my question is what value exactly in this range will it have and how and
where will it be computed?

As for the ExclusiveBuffersPerChannel, there was a proposal in the thread
to calculate it dynamically (by linear search
from taskmanager.network.memory.buffers-per-channel down to 0).

Also, if the two configuration options are still in use, why does the FLIP
propose to deprecate them?

Besides that, wouldn't it be more clear to separate motivation from the
proposed changes?

Regards,
Roman


On Wed, Dec 28, 2022 at 12:19 PM JasonLee <17610775...@163.com> wrote:

> Hi Yuxin
>
>
> Thanks for the proposal, big + 1 for this FLIP.
>
>
>
> It is difficult for users to calculate the size of network memory. If the
> setting is too small, the task cannot be started. If the setting is too
> large, there may be a waste of resources. As far as possible, Flink
> framework can automatically set a reasonable value, but I have a small
> problem. network memory is not only related to the parallelism of the task,
> but also to the complexity of the task DAG. The more complex a DAG is,
> shuffle write and shuffle read require larger buffers. How can we determine
> how many RS and IG a DAG has?
>
>
>
> Best
> JasonLee
>
>
>  Replied Message 
> | From | Yuxin Tan |
> | Date | 12/28/2022 18:29 |
> | To |  |
> | Subject | Re: [DISCUSS] FLIP-266: Simplify network memory configurations
> for TaskManager |
> Hi, Roman
>
> Thanks for the replay.
>
> ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
> configurations, which are not calculated. I have described them in the FLIP
> motivation section.
>
> 3. Each gate requires at least one buffer...
> The timeout exception occurs when the ExclusiveBuffersPerChannel
> can not be requested from NetworkBufferPool, which is not caused by the
> change of this Flip. In addition, if  we have set the
> ExclusiveBuffersPerChannel
> to 0 when using floating buffers, which can also decrease the probability
> of
> this exception.
>
> 4. It would be great to have experimental results for jobs with different
> exchange types.
> Thanks for the suggestion. I have a test about different exchange types,
> forward
> and rescale, and the results show no differences from the all-to-all type,
> which
> is also understandable, because the network memory usage is calculated
> with numChannels, independent of the edge type.
>
> Best,
> Yuxin
>
>
> Roman Khachatryan  于2022年12月28日周三 05:27写道:
>
> Hi everyone,
>
> Thanks for the proposal and the discussion.
>
> I couldn't find much details on how exactly the values of
> ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated.
> I guess that
> - the threshold evaluation is done on JM
> - floating buffers calculation is done on TM based on the current memory
> available; so it is not taking into account any future tasks submitted for
> that (or other) job
> Is that correct?
>
> If so, I see the following potential issues:
>
> 1. Each (sub)task might have different values because the actual
> available memory might be different. E.g. some tasks might use exclusive
> buffers and others only floating. That could lead to significant skew
> in processing speed, and in turn to issues with checkpoints and watermarks.
>
> 2. Re-deployment of a task (e.g. on job failure) might lead to a completely
> different memory configuration. That, coupled with different values per
> subtask and operator, makes the performance analysis more difficult.
>
> (Regardless of whether it's done on TM or JM):
> 3. Each gate requires at least one buffer [1]. So, in case when no memory
> is available, TM will throw an Allocation timeout exception instead of
> Insufficient buffers exception immediately. A delay here (allocation
> timeout) seems like a regression.
> Besides that, the regression depends on how much memory is actually
> available and how much it is contended, doesn't it?
> Should there still be a lower threshold of available memory, below which
> the job (task) isn't accepted?
> 4. The same threshold for all types of shuffles will likely result in using
> exclusive buffers
> for point-wise connections and floating buffers for all-to-all ones. I'm
> not sure if that's always optimal. It would be great to have experimental
> results for jobs with different exchange types, WDYT?
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-24035
>
> Regards,
> Roman
>
>
> On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan  wrote:
>
> Hi, Weihua
>
> Thanks for your suggestions.
>
> 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the
> 

Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

2022-12-28 Thread Yuxin Tan
Hi, JasonLee

Thanks for the feedback.

> How can we determine how many RS and IG a DAG has?

Network memory is related to the parallelism and the complexity of the task
DAG, which I think is correct. However, this Flip can only improve the part
issue of the total issue, mainly focusing on memory optimization of Shuffle
reading. We can only limit the total read buffers in one InputGate, but can
not determine the number of RS and IG. The good news is that this is an
independent problem. Maybe we could try to optimize and solve that
problem later.


Best,
Yuxin


JasonLee <17610775...@163.com> 于2022年12月28日周三 19:18写道:

> Hi Yuxin
>
>
> Thanks for the proposal, big + 1 for this FLIP.
>
>
>
> It is difficult for users to calculate the size of network memory. If the
> setting is too small, the task cannot be started. If the setting is too
> large, there may be a waste of resources. As far as possible, Flink
> framework can automatically set a reasonable value, but I have a small
> problem. network memory is not only related to the parallelism of the task,
> but also to the complexity of the task DAG. The more complex a DAG is,
> shuffle write and shuffle read require larger buffers. How can we determine
> how many RS and IG a DAG has?
>
>
>
> Best
> JasonLee
>
>
>  Replied Message 
> | From | Yuxin Tan |
> | Date | 12/28/2022 18:29 |
> | To |  |
> | Subject | Re: [DISCUSS] FLIP-266: Simplify network memory configurations
> for TaskManager |
> Hi, Roman
>
> Thanks for the replay.
>
> ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
> configurations, which are not calculated. I have described them in the FLIP
> motivation section.
>
> 3. Each gate requires at least one buffer...
> The timeout exception occurs when the ExclusiveBuffersPerChannel
> can not be requested from NetworkBufferPool, which is not caused by the
> change of this Flip. In addition, if  we have set the
> ExclusiveBuffersPerChannel
> to 0 when using floating buffers, which can also decrease the probability
> of
> this exception.
>
> 4. It would be great to have experimental results for jobs with different
> exchange types.
> Thanks for the suggestion. I have a test about different exchange types,
> forward
> and rescale, and the results show no differences from the all-to-all type,
> which
> is also understandable, because the network memory usage is calculated
> with numChannels, independent of the edge type.
>
> Best,
> Yuxin
>
>
> Roman Khachatryan  于2022年12月28日周三 05:27写道:
>
> Hi everyone,
>
> Thanks for the proposal and the discussion.
>
> I couldn't find much details on how exactly the values of
> ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated.
> I guess that
> - the threshold evaluation is done on JM
> - floating buffers calculation is done on TM based on the current memory
> available; so it is not taking into account any future tasks submitted for
> that (or other) job
> Is that correct?
>
> If so, I see the following potential issues:
>
> 1. Each (sub)task might have different values because the actual
> available memory might be different. E.g. some tasks might use exclusive
> buffers and others only floating. That could lead to significant skew
> in processing speed, and in turn to issues with checkpoints and watermarks.
>
> 2. Re-deployment of a task (e.g. on job failure) might lead to a completely
> different memory configuration. That, coupled with different values per
> subtask and operator, makes the performance analysis more difficult.
>
> (Regardless of whether it's done on TM or JM):
> 3. Each gate requires at least one buffer [1]. So, in case when no memory
> is available, TM will throw an Allocation timeout exception instead of
> Insufficient buffers exception immediately. A delay here (allocation
> timeout) seems like a regression.
> Besides that, the regression depends on how much memory is actually
> available and how much it is contended, doesn't it?
> Should there still be a lower threshold of available memory, below which
> the job (task) isn't accepted?
> 4. The same threshold for all types of shuffles will likely result in using
> exclusive buffers
> for point-wise connections and floating buffers for all-to-all ones. I'm
> not sure if that's always optimal. It would be great to have experimental
> results for jobs with different exchange types, WDYT?
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-24035
>
> Regards,
> Roman
>
>
> On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan  wrote:
>
> Hi, Weihua
>
> Thanks for your suggestions.
>
> 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the
> total buffer is not enough?
>
> I think it's a good idea. Will try and check the results in PoC. Before
> all
> read buffers use floating buffers, I will try to use
> (ExclusiveBuffersPerChannel - i)
> buffers per channel first. For example, if the user has configured
> ExclusiveBuffersPerChannel to 4, it will check whether all read buffers
> are sufficient from 

[jira] [Created] (FLINK-30524) Unify the style of the logs page

2022-12-28 Thread Yongming Zhang (Jira)
Yongming Zhang created FLINK-30524:
--

 Summary: Unify the style of the logs page
 Key: FLINK-30524
 URL: https://issues.apache.org/jira/browse/FLINK-30524
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Affects Versions: 1.16.0
Reporter: Yongming Zhang
 Fix For: 1.17.0


There are certain differences in the layout of the "Logs/Stdout" and "Log List" 
pages.
!https://intranetproxy.alipay.com/skylark/lark/0/2022/png/336411/1672225951771-e7d1c4b8-d02d-495b-a390-2fd0a47cc122.png|width=1160,id=ufb596fa9!
 
!https://intranetproxy.alipay.com/skylark/lark/0/2022/png/336411/1672227240081-94627c11-852c-40a2-b1de-853a3a61c050.png|width=1153,id=u22bf640f!
I want to unify the style of the logs page and user can view the log in full 
screen in Logs/Stdout page.  
!https://intranetproxy.alipay.com/skylark/lark/0/2022/png/336411/1672226099286-f7ea36e3-2c79-43dc-b2ac-ebb400482038.png|width=1146,id=u079e5309!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30523) Refine benchmark of vectorAssembler

2022-12-28 Thread weibo zhao (Jira)
weibo zhao created FLINK-30523:
--

 Summary: Refine benchmark of vectorAssembler
 Key: FLINK-30523
 URL: https://issues.apache.org/jira/browse/FLINK-30523
 Project: Flink
  Issue Type: Improvement
  Components: Library / Machine Learning
Reporter: weibo zhao


Refine benchmark of vectorAssembler



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

2022-12-28 Thread JasonLee
Hi Yuxin


Thanks for the proposal, big + 1 for this FLIP.



It is difficult for users to calculate the size of network memory. If the 
setting is too small, the task cannot be started. If the setting is too large, 
there may be a waste of resources. As far as possible, Flink framework can 
automatically set a reasonable value, but I have a small problem. network 
memory is not only related to the parallelism of the task, but also to the 
complexity of the task DAG. The more complex a DAG is, shuffle write and 
shuffle read require larger buffers. How can we determine how many RS and IG a 
DAG has?



Best
JasonLee


 Replied Message 
| From | Yuxin Tan |
| Date | 12/28/2022 18:29 |
| To |  |
| Subject | Re: [DISCUSS] FLIP-266: Simplify network memory configurations for 
TaskManager |
Hi, Roman

Thanks for the replay.

ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
configurations, which are not calculated. I have described them in the FLIP
motivation section.

3. Each gate requires at least one buffer...
The timeout exception occurs when the ExclusiveBuffersPerChannel
can not be requested from NetworkBufferPool, which is not caused by the
change of this Flip. In addition, if  we have set the
ExclusiveBuffersPerChannel
to 0 when using floating buffers, which can also decrease the probability
of
this exception.

4. It would be great to have experimental results for jobs with different
exchange types.
Thanks for the suggestion. I have a test about different exchange types,
forward
and rescale, and the results show no differences from the all-to-all type,
which
is also understandable, because the network memory usage is calculated
with numChannels, independent of the edge type.

Best,
Yuxin


Roman Khachatryan  于2022年12月28日周三 05:27写道:

Hi everyone,

Thanks for the proposal and the discussion.

I couldn't find much details on how exactly the values of
ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated.
I guess that
- the threshold evaluation is done on JM
- floating buffers calculation is done on TM based on the current memory
available; so it is not taking into account any future tasks submitted for
that (or other) job
Is that correct?

If so, I see the following potential issues:

1. Each (sub)task might have different values because the actual
available memory might be different. E.g. some tasks might use exclusive
buffers and others only floating. That could lead to significant skew
in processing speed, and in turn to issues with checkpoints and watermarks.

2. Re-deployment of a task (e.g. on job failure) might lead to a completely
different memory configuration. That, coupled with different values per
subtask and operator, makes the performance analysis more difficult.

(Regardless of whether it's done on TM or JM):
3. Each gate requires at least one buffer [1]. So, in case when no memory
is available, TM will throw an Allocation timeout exception instead of
Insufficient buffers exception immediately. A delay here (allocation
timeout) seems like a regression.
Besides that, the regression depends on how much memory is actually
available and how much it is contended, doesn't it?
Should there still be a lower threshold of available memory, below which
the job (task) isn't accepted?
4. The same threshold for all types of shuffles will likely result in using
exclusive buffers
for point-wise connections and floating buffers for all-to-all ones. I'm
not sure if that's always optimal. It would be great to have experimental
results for jobs with different exchange types, WDYT?

[1]
https://issues.apache.org/jira/browse/FLINK-24035

Regards,
Roman


On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan  wrote:

Hi, Weihua

Thanks for your suggestions.

1. How about reducing ExclusiveBuffersPerChannel to 1 first when the
total buffer is not enough?

I think it's a good idea. Will try and check the results in PoC. Before
all
read buffers use floating buffers, I will try to use
(ExclusiveBuffersPerChannel - i)
buffers per channel first. For example, if the user has configured
ExclusiveBuffersPerChannel to 4, it will check whether all read buffers
are sufficient from 4 to 1. Only when ExclusiveBuffersPerChannel of
all channels is 1 and all read buffers are insufficient, all read buffers
will use floating buffers.
If the test results prove better, the FLIP will use this method.

2. Do we really need to change the default value of
'taskmanager.memory.network.max'?

Changing taskmanager.memory.network.max will indeed affect some
users, but the user only is affected when the 3 conditions are fulfilled.
1) Flink total TM memory is larger than 10g (because the network memory
ratio is 0.1).
2) taskmanager.memory.network.max was not initially configured.
3) Other memory, such as managed memory or heap memory, is insufficient.
I think the number of jobs fulfilling the conditions is small because
when
TM
uses such a large amount of memory, the network memory requirement may
also be large. 

[jira] [Created] (FLINK-30522) `SHOW TBLPROPERTIES` can't read properties of table in Spark3

2022-12-28 Thread yuzelin (Jira)
yuzelin created FLINK-30522:
---

 Summary: `SHOW TBLPROPERTIES` can't read properties of table in 
Spark3
 Key: FLINK-30522
 URL: https://issues.apache.org/jira/browse/FLINK-30522
 Project: Flink
  Issue Type: Bug
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: yuzelin






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-266: Simplify network memory configurations for TaskManager

2022-12-28 Thread Yuxin Tan
Hi, Roman

Thanks for the replay.

ExclusiveBuffersPerChannel and FloatingBuffersPerGate are obtained from
configurations, which are not calculated. I have described them in the FLIP
motivation section.

> 3. Each gate requires at least one buffer...
The timeout exception occurs when the ExclusiveBuffersPerChannel
can not be requested from NetworkBufferPool, which is not caused by the
change of this Flip. In addition, if  we have set the
ExclusiveBuffersPerChannel
to 0 when using floating buffers, which can also decrease the probability
of
this exception.

> 4. It would be great to have experimental results for jobs with different
exchange types.
Thanks for the suggestion. I have a test about different exchange types,
forward
and rescale, and the results show no differences from the all-to-all type,
which
is also understandable, because the network memory usage is calculated
with numChannels, independent of the edge type.

Best,
Yuxin


Roman Khachatryan  于2022年12月28日周三 05:27写道:

> Hi everyone,
>
> Thanks for the proposal and the discussion.
>
> I couldn't find much details on how exactly the values of
> ExclusiveBuffersPerChannel and FloatingBuffersPerGate are calculated.
> I guess that
> - the threshold evaluation is done on JM
> - floating buffers calculation is done on TM based on the current memory
> available; so it is not taking into account any future tasks submitted for
> that (or other) job
> Is that correct?
>
> If so, I see the following potential issues:
>
> 1. Each (sub)task might have different values because the actual
> available memory might be different. E.g. some tasks might use exclusive
> buffers and others only floating. That could lead to significant skew
> in processing speed, and in turn to issues with checkpoints and watermarks.
>
> 2. Re-deployment of a task (e.g. on job failure) might lead to a completely
> different memory configuration. That, coupled with different values per
> subtask and operator, makes the performance analysis more difficult.
>
> (Regardless of whether it's done on TM or JM):
> 3. Each gate requires at least one buffer [1]. So, in case when no memory
> is available, TM will throw an Allocation timeout exception instead of
> Insufficient buffers exception immediately. A delay here (allocation
> timeout) seems like a regression.
> Besides that, the regression depends on how much memory is actually
> available and how much it is contended, doesn't it?
> Should there still be a lower threshold of available memory, below which
> the job (task) isn't accepted?
> 4. The same threshold for all types of shuffles will likely result in using
> exclusive buffers
> for point-wise connections and floating buffers for all-to-all ones. I'm
> not sure if that's always optimal. It would be great to have experimental
> results for jobs with different exchange types, WDYT?
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-24035
>
> Regards,
> Roman
>
>
> On Tue, Dec 27, 2022 at 4:12 AM Yuxin Tan  wrote:
>
> > Hi, Weihua
> >
> > Thanks for your suggestions.
> >
> > > 1. How about reducing ExclusiveBuffersPerChannel to 1 first when the
> > total buffer is not enough?
> >
> > I think it's a good idea. Will try and check the results in PoC. Before
> all
> > read buffers use floating buffers, I will try to use
> > (ExclusiveBuffersPerChannel - i)
> > buffers per channel first. For example, if the user has configured
> > ExclusiveBuffersPerChannel to 4, it will check whether all read buffers
> > are sufficient from 4 to 1. Only when ExclusiveBuffersPerChannel of
> > all channels is 1 and all read buffers are insufficient, all read buffers
> > will use floating buffers.
> > If the test results prove better, the FLIP will use this method.
> >
> > > 2. Do we really need to change the default value of
> > 'taskmanager.memory.network.max'?
> >
> > Changing taskmanager.memory.network.max will indeed affect some
> > users, but the user only is affected when the 3 conditions are fulfilled.
> > 1) Flink total TM memory is larger than 10g (because the network memory
> > ratio is 0.1).
> > 2) taskmanager.memory.network.max was not initially configured.
> > 3) Other memory, such as managed memory or heap memory, is insufficient.
> > I think the number of jobs fulfilling the conditions is small because
> when
> > TM
> > uses such a large amount of memory, the network memory requirement may
> > also be large. And when encountering the issue, the rollback method is
> very
> > simple,
> > configuring taskmanager.memory.network.max as 1g or other values.
> > In addition, the reason for modifying the default value is to simplify
> the
> > network
> > configurations in most scenarios. This change does affect a few usage
> > scenarios,
> > but we should admit that setting the default to any value may not meet
> > the requirements of all scenarios.
> >
> > Best,
> > Yuxin
> >
> >
> > Weihua Hu  于2022年12月26日周一 20:35写道:
> >
> > > Hi Yuxin,
> > > Thanks for the proposal.
> > >
> > > "Insufficient 

[jira] [Created] (FLINK-30521) Improve `Altering Tables` of Doc

2022-12-28 Thread yuzelin (Jira)
yuzelin created FLINK-30521:
---

 Summary: Improve `Altering Tables` of Doc
 Key: FLINK-30521
 URL: https://issues.apache.org/jira/browse/FLINK-30521
 Project: Flink
  Issue Type: Improvement
  Components: Table Store
Affects Versions: table-store-0.4.0
Reporter: yuzelin


Add more syntax description in the section `Altering Tables`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch Job

2022-12-28 Thread Biao Liu
Thanks for all your feedback!

To @Yuxia,

> What the sink expect to do to isolate data produced by speculative
> executions?  IIUC, if the taks failover, it also generate a new attempt.
> Does it make difference in isolating data produced?


Yes there is something different from the task failover scenario. The
attempt number is more necessary for speculative execution than failover.
Because there can be only one subtask instance running at the same time in
the failover scenario.

Let's take FileSystemOutputFormat as an example. For the failover scenario,
the temporary directory to store produced data can be something like
"$root_dir/task-$taskNumber/". At the initialization phase, subtask deletes
and re-creates the temporary directory.

However in the speculative execution scenario, it does not work because
there might be several subtasks running at the same time. These subtasks
might delete, re-create and write the same temporary directory at the
same time. The correct temporary directory should be like
"$root_dir/task-$taskNumber/attempt-$attemptNumber". So it's necessary to
expose the attempt number to the Sink implementation to do the data
isolation.


To @Lijie,

> I have a question about this: does SinkV2 need to do the same thing?


Actually, yes.

Should we/users do it in the committer? If yes, how does the commiter know
> which one is the right subtask attempt?


Yes, we/users should do it in the committer.

In the current design, the Committer of Sink V2 should get the "which one
is the right subtask attempt" information from the "committable data''
produced by SinkWriter. Let's take the FileSink as example, the
"committable data" sent to the Committer contains the full path of the
files produced by SinkWriter. Users could also pass the attempt number
through "committable data" from SinkWriter to Committer.

In the "Rejected Alternatives -> Introduce a way to clean leaked data of
Sink V2" section of the FLIP document, we discussed some of the reasons
that we didn't provide the API like OutputFormat.

To @Jing Zhang

I have a question about this: Speculative execution of Committer will be
> disabled.

I agree with your point and I saw the similar requirements to disable
speculative
> execution for specified operators.

However the requirement is not supported currently. I think there
should be some
> place to describe how to support it.


In this FLIP design, the speculative execution of Committer of Sink V2 will
be disabled by Flink. It's not an optional operation. Users can not change
it.
And as you said, "disable speculative execution for specified operators" is
not supported in the FLIP. Because it's a bit out of scope: "Sink Supports
Speculative Execution For Batch Job". I think it's better to start another
FLIP to discuss it. "Fine-grained control of enabling speculative execution
for operators" can be the title of that FLIP. And we can discuss there how
to enable or disable speculative execution for specified operators
including Committer and pre/post-committer of Sink V2.

What do you think?

Thanks,
Biao /'bɪ.aʊ/



On Wed, 28 Dec 2022 at 11:30, Jing Zhang  wrote:

> Hi Biao,
>
> Thanks for driving this FLIP. It's meaningful to support speculative
> execution
> of sinks is important.
>
> I have a question about this: Speculative execution of Committer will be
> disabled.
>
> I agree with your point and I saw the similar requirements to disable
> speculative execution for specified operators.
>
> However the requirement is not supported currently. I think there should be
> some place to describe how to support it.
>
> Best,
> Jing Zhang
>
> Lijie Wang  于2022年12月27日周二 18:51写道:
>
> > Hi Biao,
> >
> > Thanks for driving this FLIP.
> > In this FLIP, it introduces "int getFinishedAttempt(int subtaskIndex)"
> for
> > OutputFormat to know which subtask attempt is the one marked as finished
> by
> > JM and commit the right data.
> > I have a question about this: does SinkV2 need to do the same thing?
> Should
> > we/users do it in the committer? If yes, how does the commiter know which
> > one is the right subtask attempt?
> >
> > Best,
> > Lijie
> >
> > yuxia  于2022年12月27日周二 10:01写道:
> >
> > > HI, Biao.
> > > Thanks for driving this FLIP.
> > > After quick look of this FLIP, I have a question about "expose the
> > attempt
> > > number which can be used to isolate data produced by speculative
> > executions
> > > with the same subtask id".
> > > What the sink expect to do to isolate data produced by speculative
> > > executions?  IIUC, if the taks failover, it also generate a new
> attempt.
> > > Does it make difference in isolating data produced?
> > >
> > > Best regards,
> > > Yuxia
> > >
> > > - 原始邮件 -
> > > 发件人: "Biao Liu" 
> > > 收件人: "dev" 
> > > 发送时间: 星期四, 2022年 12 月 22日 下午 8:16:21
> > > 主题: [DISCUSS] FLIP-281 Sink Supports Speculative Execution For Batch
> Job
> > >
> > > Hi everyone,
> > >
> > > I would like to start a discussion about making Sink support
> speculative
> > > 

[jira] [Created] (FLINK-30520) Arguments contains with '#' will error split in loadYAMLResource

2022-12-28 Thread tanjialiang (Jira)
tanjialiang created FLINK-30520:
---

 Summary: Arguments contains with '#' will error split in 
loadYAMLResource
 Key: FLINK-30520
 URL: https://issues.apache.org/jira/browse/FLINK-30520
 Project: Flink
  Issue Type: Bug
  Components: API / Core
Affects Versions: 1.15.3, 1.14.6, 1.16.0
Reporter: tanjialiang


When i submit a flink jar job in Kubernetes Application mode which main args 
contains '#', it will be error split by 
org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource

 

such as i using flink-kubernetes-operator to submit a job in kubernetes 
application mode

 
{code:java}
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: word-count
spec:
  image: apache/flink:1.16.0-scala_2.12-java8
  flinkVersion: v1_16
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
  jobManager:
resource:
  memory: "2048m"
  cpu: 1
  taskManager:
resource:
  memory: "2048m"
  cpu: 1
  serviceAccount: flink
  job:
jarURI: local:///opt/flink/examples/streaming/WordCount.jar
args:
  - --output
  - /tmp/1#.txt
parallelism: 2
upgradeMode: stateless
{code}
 

 

It will be error split when loading the flink-conf.yaml

!image-2022-12-28-16-30-30-645.png!

 

And i enter to the jobmanager's pod to saw the flink-conf.yaml's program-args 
is right

!image-2022-12-28-16-34-25-819.png!

 

Maybe we should have a more strict validate for yaml comment?

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30519) Add e2e tests for operator dynamic config

2022-12-28 Thread Gyula Fora (Jira)
Gyula Fora created FLINK-30519:
--

 Summary: Add e2e tests for operator dynamic config
 Key: FLINK-30519
 URL: https://issues.apache.org/jira/browse/FLINK-30519
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Gyula Fora
 Fix For: kubernetes-operator-1.4.0


The dynamic config feature is currently not covered by e2e tests and is subject 
to accidental regressions, as shown in:

https://issues.apache.org/jira/browse/FLINK-30329

We should add an e2e test that covers this



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30518) [flink-operator] Kubernetes HA not working due to wrong jobmanager.rpc.address

2022-12-28 Thread Binh-Nguyen Tran (Jira)
Binh-Nguyen Tran created FLINK-30518:


 Summary: [flink-operator] Kubernetes HA not working due to wrong 
jobmanager.rpc.address
 Key: FLINK-30518
 URL: https://issues.apache.org/jira/browse/FLINK-30518
 Project: Flink
  Issue Type: Bug
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.3.0
 Environment: ~ flinkdeployment.yaml ~

```

spec:

  flinkConfiguration:

    high-availability: kubernetes
    high-availability.storageDir: "file:///opt/flink/storage"

    ...

  jobManager:

    replicas: 3

```
Reporter: Binh-Nguyen Tran
 Attachments: flink-configmap.png

Since flink-conf.yaml is mounted as read-only configmap, the 
/docker-entrypoint.sh script is not able to inject correct Pod IP to 
`jobmanager.rpc.address`. This leads to same address (e.g flink.ns-ext) being 
set for all Job Manager pods. This causes:

(1) flink-cluster-config-map always contains same address for all 3 component 
leaders (see screenshot)

(2) Accessing Web UI when jobmanager.replicas > 1 is not possible with error

```

{"errors":["Service temporarily unavailable due to an ongoing leader election. 
Please refresh."]}

```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30517) Decrease log output interval while waiting for YARN JobManager be allocated

2022-12-28 Thread Weihua Hu (Jira)
Weihua Hu created FLINK-30517:
-

 Summary: Decrease log output interval while waiting for YARN 
JobManager be allocated
 Key: FLINK-30517
 URL: https://issues.apache.org/jira/browse/FLINK-30517
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / YARN
Affects Versions: 1.16.0
Reporter: Weihua Hu
 Attachments: image-2022-12-28-15-56-56-045.png

Flink Client will retrieve the application status every 250ms after submitting 
to YARN. 
If JobManager does not start in 60 seconds, it will log "Deployment took more 
than 60 seconds. Please check if the requested resources are available in the 
YARN cluster" every 250ms. This will lead to too many logs. 

We can keep the check interval at 250ms, but log the message every 1 minute.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)