Re: Questions about the client synchronously obtaining task execution results

2023-11-19 Thread 刘峻池
Sorry I forgot to add the version information, the version is 1.17

刘峻池  于2023年11月20日周一 13:59写道:

> Hi Flink Community
>
> When I run this command `flink run-application -t yarn-application -sae
>  mainClass somejar` to submit some batch-task on YARN with Application
> Mode, my shell client always terminates after task submission success,
> then the dispatcher cannot receive the client heartbeat for a long time and
> shuts down the task. When I remove `-sae` of the command line,  the task
> won’t be cancelled by dispatcher. But the client also terminates after task
> submission success, I want the client to obtain the task execution results
> before exiting
>
> So, is there a way for the client to synchronously obtain the task
> execution results and then exit? Any answers would be helpful
>
> Thank you,
> Junchi-Liu
>


Questions about the client synchronously obtaining task execution results

2023-11-19 Thread 刘峻池
Hi Flink Community

When I run this command `flink run-application -t yarn-application -sae
 mainClass somejar` to submit some batch-task on YARN with Application
Mode, my shell client always terminates after task submission success,
then the dispatcher cannot receive the client heartbeat for a long time and
shuts down the task. When I remove `-sae` of the command line,  the task
won’t be cancelled by dispatcher. But the client also terminates after task
submission success, I want the client to obtain the task execution results
before exiting

So, is there a way for the client to synchronously obtain the task
execution results and then exit? Any answers would be helpful

Thank you,
Junchi-Liu


Re: [DISCUSS] FLIP-391: Deprecate RuntimeContext#getExecutionConfig

2023-11-19 Thread Junrui Lee
Hi Jing,

Thank you for your feedback. I understand your concerns regarding putting
all methods into the RuntimeContext flat.

I would like to share some of my thoughts on this matter.
Firstly, this FLIP only proposes the addition of three additional methods,
which should not impose too much extra burden on users. Secondly, I agree
that it is important to make it clearer for users to use the
RuntimeContext. However, reorganizing the RuntimeContext to achieve this
requires further discussion. We should focus on a more specific and unified
reorganization of the RuntimeContext interface in future work, rather than
implementing a temporary solution now. Therefore, I prefer not to add a
separate abstraction layer for these three methods in this FLIP.

Please feel free to share any further thoughts.

Best regards,
Junrui

Jing Ge  于2023年11月20日周一 05:46写道:

> Hi Junrui,
>
> Thanks for bringing this to our attention. First of all, it makes sense to
> deprecate RuntimeContext#getExecutionConfig.
>
> Afaic, this is an issue of how we design API with clean concepts/aspects.
> There are two issues mentioned in the FLIP:
>
> 1. short of user-facing abstraction - we just exposed ExecutionConfig
> which mixed methods for users with methods that should only be used
> internally.
> 2. mutable vs immutable - do we want users to be able to modify configs
> during job execution?
>
> An immutable user-facing abstraction design can solve both issues. All
> execution related configs are still consolidated into the abstraction class
> and easy to access. This is another design decision: flat vs. hierarchical.
> Current FLIP removed the execution config abstraction and put all methods
> into RuntimeContext flat, which will end up with more than 30 methods
> offered flat by the RuntimeContext. I am not sure if this could help users
> find the right method in the context of execution config better than
> before.
>
> I might miss something and look forward to your thoughts. Thanks!
>
> Best regards,
> Jing
>
> On Sat, Nov 18, 2023 at 11:21 AM Junrui Lee  wrote:
>
>> Hello Wencong,
>>
>> Thank you for your valuable feedback and suggestions. I want to clarify
>> that reviewing existing methods in the ExecutionConfig is not directly
>> related to the proposal in this FLIP. The main focus of this FLIP is to
>> deprecate the specific method RuntimeContext#getExecutionConfig(). I
>> believe it is important to keep the scope of this FLIP limited. However,
>> your suggestion can certainly be considered as a separate FLIP in the
>> future.
>>
>> Best regards,
>> Junrui
>>
>> Wencong Liu  于2023年11月17日周五 22:08写道:
>>
>>> Hello Junrui,
>>>
>>>
>>> Thanks for the effort. I agree with the proposal to deprecate the
>>> getExecutionConfig() method in the RuntimeContext class. Exposing
>>> the complex ExecutionConfig to user-defined functions can lead to
>>> unnecessary complexity and risks.
>>>
>>>
>>> I also have a suggestion. We could consider reviewing the existing
>>>  methods in ExecutionConfig. If there are methods that are defined
>>>  in ExecutionConfig but currently have no callers, we could consider
>>>  annotating  them as @Internal or directly removing them. Since
>>> users are no longer able to access and invoke these methods,
>>> it would be beneficial to clean up the codebase.
>>>
>>>
>>> +1 (non-binding).
>>>
>>>
>>> Best,
>>> Wencong
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> At 2023-11-15 16:51:15, "Junrui Lee"  wrote:
>>> >Hi all,
>>> >
>>> >I'd like to start a discussion of FLIP-391: Deprecate
>>> >RuntimeContext#getExecutionConfig[1].
>>> >
>>> >Currently, the FLINK RuntimeContext is important for connecting user
>>> >functions to the underlying runtime details. It provides users with
>>> >necessary runtime information during job execution.
>>> >However, he current implementation of the FLINK RuntimeContext exposes
>>> the
>>> >ExecutionConfig to users, resulting in two issues:
>>> >Firstly, the ExecutionConfig contains much unrelated information that
>>> can
>>> >confuse users and complicate management.
>>> >Secondly, exposing the ExecutionConfig allows users to modify it during
>>> job
>>> >execution, which can cause inconsistencies and problems, especially with
>>> >operator chaining.
>>> >
>>> >Therefore, we propose deprecating the RuntimeContext#getExecutionConfig
>>> in
>>> >the FLINK RuntimeContext. In the upcoming FLINK-2.0 version, we plan to
>>> >completely remove the RuntimeContext#getExecutionConfig method.
>>> Instead, we
>>> >will introduce alternative getter methods that enable users to access
>>> >specific information without exposing unnecessary runtime details. These
>>> >getter methods will include:
>>> >
>>> >1. @PublicEvolving  TypeSerializer
>>> >createSerializer(TypeInformation typeInformation);
>>> >2. @PublicEvolving Map getGlobalJobParameters();
>>> >3. @PublicEvolving boolean isObjectReuseEnabled();
>>> >
>>> >Looking forward to your feedback and suggestions, thanks.

Re: [DISCUSS] Change the default restart-strategy to exponential-delay

2023-11-19 Thread Rui Fan
Hi David and Mason,

Thanks for your feedback!

To David:

> Given that the new default feels more complex than the current behavior,
if we decide to do this I think it will be important to include the
rationale you've shared in the documentation.

Sounds make sense to me, I will add the related doc if we
update the default strategy.

To Mason:

> I suppose we could do some benchmarking on what works well for the
resource providers that Flink relies on e.g. Kubernetes. Based on
conferences and blogs,
> it seems most people are relying on Kubernetes to deploy Flink and the
restart strategy has a large dependency on how well Kubernetes can scale to
requests to redeploy the job.

Sorry, I didn't understand what type of benchmarking
we should do, could you elaborate on it? Thanks a lot.

Best,
Rui

On Sat, Nov 18, 2023 at 3:32 AM Mason Chen  wrote:

> Hi Rui,
>
> I suppose we could do some benchmarking on what works well for the
> resource providers that Flink relies on e.g. Kubernetes. Based on
> conferences and blogs, it seems most people are relying on Kubernetes to
> deploy Flink and the restart strategy has a large dependency on how well
> Kubernetes can scale to requests to redeploy the job.
>
> Best,
> Mason
>
> On Fri, Nov 17, 2023 at 10:07 AM David Anderson 
> wrote:
>
>> Rui,
>>
>> I don't have any direct experience with this topic, but given the
>> motivation you shared, the proposal makes sense to me. Given that the new
>> default feels more complex than the current behavior, if we decide to do
>> this I think it will be important to include the rationale you've shared in
>> the documentation.
>>
>> David
>>
>> On Wed, Nov 15, 2023 at 10:17 PM Rui Fan <1996fan...@gmail.com> wrote:
>>
>>> Hi dear flink users and devs:
>>>
>>> FLIP-364[1] intends to make some improvements to restart-strategy
>>> and discuss updating some of the default values of exponential-delay,
>>> and whether exponential-delay can be used as the default
>>> restart-strategy.
>>> After discussing at dev mail list[2], we hope to collect more feedback
>>> from Flink users.
>>>
>>> # Why does the default restart-strategy need to be updated?
>>>
>>> If checkpointing is enabled, the default value is fixed-delay with
>>> Integer.MAX_VALUE restart attempts and '1 s' delay[3]. It means
>>> the job will restart infinitely with high frequency when a job
>>> continues to fail.
>>>
>>> When the Kafka cluster fails, a large number of flink jobs will be
>>> restarted frequently. After the kafka cluster is recovered, a large
>>> number of high-frequency restarts of flink jobs may cause the
>>> kafka cluster to avalanche again.
>>>
>>> Considering the exponential-delay as the default strategy with
>>> a couple of reasons:
>>>
>>> - The exponential-delay can reduce the restart frequency when
>>>   a job continues to fail.
>>> - It can restart a job quickly when a job fails occasionally.
>>> - The restart-strategy.exponential-delay.jitter-factor can avoid r
>>>   estarting multiple jobs at the same time. It’s useful to prevent
>>>   avalanches.
>>>
>>> # What are the current default values[4] of exponential-delay?
>>>
>>> restart-strategy.exponential-delay.initial-backoff : 1s
>>> restart-strategy.exponential-delay.backoff-multiplier : 2.0
>>> restart-strategy.exponential-delay.jitter-factor : 0.1
>>> restart-strategy.exponential-delay.max-backoff : 5 min
>>> restart-strategy.exponential-delay.reset-backoff-threshold : 1h
>>>
>>> backoff-multiplier=2 means that the delay time of each restart
>>> will be doubled. The delay times are:
>>> 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s, 300s, etc.
>>>
>>> The delay time is increased rapidly, it will affect the recover
>>> time for flink jobs.
>>>
>>> # Option improvements
>>>
>>> We think the backoff-multiplier between 1 and 2 is more sensible,
>>> such as:
>>>
>>> restart-strategy.exponential-delay.backoff-multiplier : 1.2
>>> restart-strategy.exponential-delay.max-backoff : 1 min
>>>
>>> After updating, the delay times are:
>>>
>>> 1s, 1.2s, 1.44s, 1.728s, 2.073s, 2.488s, 2.985s, 3.583s, 4.299s,
>>> 5.159s, 6.191s, 7.430s, 8.916s, 10.699s, 12.839s, 15.407s, 18.488s,
>>> 22.186s, 26.623s, 31.948s, 38.337s, etc
>>>
>>> They achieve the following goals:
>>> - When restarts are infrequent in a short period of time, flink can
>>>   quickly restart the job. (For example: the retry delay time when
>>>   restarting 5 times is 2.073s)
>>> - When restarting frequently in a short period of time, flink can
>>>   slightly reduce the restart frequency to prevent avalanches.
>>>   (For example: the retry delay time when retrying 10 times is 5.1 s,
>>>   and the retry delay time when retrying 20 times is 38s, which is not
>>> very
>>> large.)
>>>
>>> As @Mingliang Liu   mentioned at dev mail list: the
>>> one-size-fits-all
>>> default values do not exist. So our goal is that the default values
>>> can be suitable for most jobs.
>>>
>>> Looking forward to your thoughts and feedback, thanks~
>>>
>>> [1] 

Re: [DISCUSS] Change the default restart-strategy to exponential-delay

2023-11-19 Thread Rui Fan
Hi David and Mason,

Thanks for your feedback!

To David:

> Given that the new default feels more complex than the current behavior,
if we decide to do this I think it will be important to include the
rationale you've shared in the documentation.

Sounds make sense to me, I will add the related doc if we
update the default strategy.

To Mason:

> I suppose we could do some benchmarking on what works well for the
resource providers that Flink relies on e.g. Kubernetes. Based on
conferences and blogs,
> it seems most people are relying on Kubernetes to deploy Flink and the
restart strategy has a large dependency on how well Kubernetes can scale to
requests to redeploy the job.

Sorry, I didn't understand what type of benchmarking
we should do, could you elaborate on it? Thanks a lot.

Best,
Rui

On Sat, Nov 18, 2023 at 3:32 AM Mason Chen  wrote:

> Hi Rui,
>
> I suppose we could do some benchmarking on what works well for the
> resource providers that Flink relies on e.g. Kubernetes. Based on
> conferences and blogs, it seems most people are relying on Kubernetes to
> deploy Flink and the restart strategy has a large dependency on how well
> Kubernetes can scale to requests to redeploy the job.
>
> Best,
> Mason
>
> On Fri, Nov 17, 2023 at 10:07 AM David Anderson 
> wrote:
>
>> Rui,
>>
>> I don't have any direct experience with this topic, but given the
>> motivation you shared, the proposal makes sense to me. Given that the new
>> default feels more complex than the current behavior, if we decide to do
>> this I think it will be important to include the rationale you've shared in
>> the documentation.
>>
>> David
>>
>> On Wed, Nov 15, 2023 at 10:17 PM Rui Fan <1996fan...@gmail.com> wrote:
>>
>>> Hi dear flink users and devs:
>>>
>>> FLIP-364[1] intends to make some improvements to restart-strategy
>>> and discuss updating some of the default values of exponential-delay,
>>> and whether exponential-delay can be used as the default
>>> restart-strategy.
>>> After discussing at dev mail list[2], we hope to collect more feedback
>>> from Flink users.
>>>
>>> # Why does the default restart-strategy need to be updated?
>>>
>>> If checkpointing is enabled, the default value is fixed-delay with
>>> Integer.MAX_VALUE restart attempts and '1 s' delay[3]. It means
>>> the job will restart infinitely with high frequency when a job
>>> continues to fail.
>>>
>>> When the Kafka cluster fails, a large number of flink jobs will be
>>> restarted frequently. After the kafka cluster is recovered, a large
>>> number of high-frequency restarts of flink jobs may cause the
>>> kafka cluster to avalanche again.
>>>
>>> Considering the exponential-delay as the default strategy with
>>> a couple of reasons:
>>>
>>> - The exponential-delay can reduce the restart frequency when
>>>   a job continues to fail.
>>> - It can restart a job quickly when a job fails occasionally.
>>> - The restart-strategy.exponential-delay.jitter-factor can avoid r
>>>   estarting multiple jobs at the same time. It’s useful to prevent
>>>   avalanches.
>>>
>>> # What are the current default values[4] of exponential-delay?
>>>
>>> restart-strategy.exponential-delay.initial-backoff : 1s
>>> restart-strategy.exponential-delay.backoff-multiplier : 2.0
>>> restart-strategy.exponential-delay.jitter-factor : 0.1
>>> restart-strategy.exponential-delay.max-backoff : 5 min
>>> restart-strategy.exponential-delay.reset-backoff-threshold : 1h
>>>
>>> backoff-multiplier=2 means that the delay time of each restart
>>> will be doubled. The delay times are:
>>> 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s, 300s, etc.
>>>
>>> The delay time is increased rapidly, it will affect the recover
>>> time for flink jobs.
>>>
>>> # Option improvements
>>>
>>> We think the backoff-multiplier between 1 and 2 is more sensible,
>>> such as:
>>>
>>> restart-strategy.exponential-delay.backoff-multiplier : 1.2
>>> restart-strategy.exponential-delay.max-backoff : 1 min
>>>
>>> After updating, the delay times are:
>>>
>>> 1s, 1.2s, 1.44s, 1.728s, 2.073s, 2.488s, 2.985s, 3.583s, 4.299s,
>>> 5.159s, 6.191s, 7.430s, 8.916s, 10.699s, 12.839s, 15.407s, 18.488s,
>>> 22.186s, 26.623s, 31.948s, 38.337s, etc
>>>
>>> They achieve the following goals:
>>> - When restarts are infrequent in a short period of time, flink can
>>>   quickly restart the job. (For example: the retry delay time when
>>>   restarting 5 times is 2.073s)
>>> - When restarting frequently in a short period of time, flink can
>>>   slightly reduce the restart frequency to prevent avalanches.
>>>   (For example: the retry delay time when retrying 10 times is 5.1 s,
>>>   and the retry delay time when retrying 20 times is 38s, which is not
>>> very
>>> large.)
>>>
>>> As @Mingliang Liu   mentioned at dev mail list: the
>>> one-size-fits-all
>>> default values do not exist. So our goal is that the default values
>>> can be suitable for most jobs.
>>>
>>> Looking forward to your thoughts and feedback, thanks~
>>>
>>> [1] 

Re: The generated schema is not correct when using filesystem connector and avro format

2023-11-19 Thread Hang Ruan
Hi, julia.

I have read the code about this part. The problem as you said is that the
RowType passed to the avro-confluent format is nullable, which will cause
union with null in the schema.
I think FLINK-30438 is the same problem as yours. But I find the RowType
passed to avro-confluent format in Kafka connector is not nullable (from
`getCatalogTable().getResolvedSchema().toPhysicalRowDataType()`).

For your case, you have to modify the code in FileSystemTableSink.java#L299

to
provide a non-nullable RowType. Or you could raise a fix for the filesystem
connector.

Best,
Hang


julia bogdan  于2023年11月15日周三 01:10写道:

> Hi!
>
> I'm facing an issue with the output schema for FileSystemTableSink.
> In FileSystemTableSink#createWriter (FileSystemTableSink.java#L29
> 9)
> the original nullability of the underlying logical data type is not
> preserved, which introduces unnecessarily union with null in the schema,
> i.e. for avro, it generates [null, {"type":"record", "fields": ...}]
>  instead of {"type":"record", "fields": ...}.
> https://issues.apache.org/jira/browse/FLINK-30438 describes the same
> problem, but not sure if the root cause is the same.
> We use Flink 1.16.0, but it's relevant for newer versions.
>
>
> Looking at the source code, the issue exists because DataType::ROW
> instantiates RowType with isNullable = true by default (constructor here
> ).
> Similar DataType creation is followed by nullability check and calling
> .notNull() in DataTypeUtils
> 
> .
>
> I wonder whether someone had the same issue and whether there is a
> workaround.
>
>
> Thank you,
> Yuliya
>


Re:Re:flink的sql gateway支持自定义的UDF吗?

2023-11-19 Thread RS
Hi,
这种ADD JAR的方式测试了也可以用,谢谢了老哥


Thanks





在 2023-11-01 17:34:48,"Xuyang"  写道:
>Hi, 
>你指的是sql gateway上 ADD JAR这种方式来上传自定义UDF的jar包[1]么?
>
>
>
>
>[1] 
>https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/jar/
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2023-11-01 14:21:04,"RS"  写道:
>>Hi
>>flink的sql gateway支持自定义的UDF吗?包括java和python的,有示例可以参考下吗?


Re:Re:flink的sql gateway支持自定义的UDF吗?

2023-11-19 Thread RS
Hi,
是的,自定义的UDF比较多,或者实现方式不同,所以加载的时候,想单独加载下,
sql-client有个参数就可以支持,-j 
sql gateway为什么不提供了?


Thanks





在 2023-11-01 17:34:48,"Xuyang"  写道:
>Hi, 
>你指的是sql gateway上 ADD JAR这种方式来上传自定义UDF的jar包[1]么?
>
>
>
>
>[1] 
>https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/jar/
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2023-11-01 14:21:04,"RS"  写道:
>>Hi
>>flink的sql gateway支持自定义的UDF吗?包括java和python的,有示例可以参考下吗?


Re: [DISCUSS] FLIP-391: Deprecate RuntimeContext#getExecutionConfig

2023-11-19 Thread Jing Ge via user
Hi Junrui,

Thanks for bringing this to our attention. First of all, it makes sense to
deprecate RuntimeContext#getExecutionConfig.

Afaic, this is an issue of how we design API with clean concepts/aspects.
There are two issues mentioned in the FLIP:

1. short of user-facing abstraction - we just exposed ExecutionConfig which
mixed methods for users with methods that should only be used internally.
2. mutable vs immutable - do we want users to be able to modify configs
during job execution?

An immutable user-facing abstraction design can solve both issues. All
execution related configs are still consolidated into the abstraction class
and easy to access. This is another design decision: flat vs. hierarchical.
Current FLIP removed the execution config abstraction and put all methods
into RuntimeContext flat, which will end up with more than 30 methods
offered flat by the RuntimeContext. I am not sure if this could help users
find the right method in the context of execution config better than
before.

I might miss something and look forward to your thoughts. Thanks!

Best regards,
Jing

On Sat, Nov 18, 2023 at 11:21 AM Junrui Lee  wrote:

> Hello Wencong,
>
> Thank you for your valuable feedback and suggestions. I want to clarify
> that reviewing existing methods in the ExecutionConfig is not directly
> related to the proposal in this FLIP. The main focus of this FLIP is to
> deprecate the specific method RuntimeContext#getExecutionConfig(). I
> believe it is important to keep the scope of this FLIP limited. However,
> your suggestion can certainly be considered as a separate FLIP in the
> future.
>
> Best regards,
> Junrui
>
> Wencong Liu  于2023年11月17日周五 22:08写道:
>
>> Hello Junrui,
>>
>>
>> Thanks for the effort. I agree with the proposal to deprecate the
>> getExecutionConfig() method in the RuntimeContext class. Exposing
>> the complex ExecutionConfig to user-defined functions can lead to
>> unnecessary complexity and risks.
>>
>>
>> I also have a suggestion. We could consider reviewing the existing
>>  methods in ExecutionConfig. If there are methods that are defined
>>  in ExecutionConfig but currently have no callers, we could consider
>>  annotating  them as @Internal or directly removing them. Since
>> users are no longer able to access and invoke these methods,
>> it would be beneficial to clean up the codebase.
>>
>>
>> +1 (non-binding).
>>
>>
>> Best,
>> Wencong
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> At 2023-11-15 16:51:15, "Junrui Lee"  wrote:
>> >Hi all,
>> >
>> >I'd like to start a discussion of FLIP-391: Deprecate
>> >RuntimeContext#getExecutionConfig[1].
>> >
>> >Currently, the FLINK RuntimeContext is important for connecting user
>> >functions to the underlying runtime details. It provides users with
>> >necessary runtime information during job execution.
>> >However, he current implementation of the FLINK RuntimeContext exposes
>> the
>> >ExecutionConfig to users, resulting in two issues:
>> >Firstly, the ExecutionConfig contains much unrelated information that can
>> >confuse users and complicate management.
>> >Secondly, exposing the ExecutionConfig allows users to modify it during
>> job
>> >execution, which can cause inconsistencies and problems, especially with
>> >operator chaining.
>> >
>> >Therefore, we propose deprecating the RuntimeContext#getExecutionConfig
>> in
>> >the FLINK RuntimeContext. In the upcoming FLINK-2.0 version, we plan to
>> >completely remove the RuntimeContext#getExecutionConfig method. Instead,
>> we
>> >will introduce alternative getter methods that enable users to access
>> >specific information without exposing unnecessary runtime details. These
>> >getter methods will include:
>> >
>> >1. @PublicEvolving  TypeSerializer
>> >createSerializer(TypeInformation typeInformation);
>> >2. @PublicEvolving Map getGlobalJobParameters();
>> >3. @PublicEvolving boolean isObjectReuseEnabled();
>> >
>> >Looking forward to your feedback and suggestions, thanks.
>> >
>> >[1]
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278465937
>> >
>> >Best regards,
>> >Junrui
>>
>