Re: No effect from --allowNonRestoredState or "execution.savepoint.ignore-unclaimed-state" in K8S application mode

2022-02-22 Thread Yang Wang
The config options configured by -D param should take effect. It is also
the recommended way instead of CLI options(e.g. --fromSavepoint).
Not only the K8s application, it also does not work for yarn application
and yarn per-job mode.
I believe it is indeed a bug in the current implementation and have created
a ticket for this[1].

After then you could start the Flink k8s application via the following
command.






*$FLINK_HOME/bin/flink run-application -t kubernetes-application
\-Dkubernetes.cluster-id=$CLUSTER_ID \-Dkubernetes.namespace=$NAMESPACE
\-Dkubernetes.container.image=$IMAGE
\-Dexecution.savepoint.ignore-unclaimed-state=true
-Dexecution.savepoint.path=oss://flink-debug-yiqi/flink-ha
\local:///opt/flink/examples/streaming/StateMachineExample.jar*


If you still want to use the CLI options, then I expect at least you need
to set "--fromSavepoint".

[1]. https://issues.apache.org/jira/browse/FLINK-26316


Best,
Yang

Andrey Bulgakov  于2022年2月23日周三 04:09写道:

> Thank you, Yang. That was it! Specifying "--fromSavepoint" and
> "--allowNonRestoredState" for "run-application" together did the trick.
>
> I was a bit confused, because when you run "flink run-application --help",
> it only tells you about the "--executor" and "--target" options. So I
> assumed I should pass everything else as -D params. I had only tried
> passing "--allowNonRestoredState" on the CLI as the last resort but didn't
> think to do it together with "--fromSavepoint".
>
> Thanks again!
>
> On Sun, Feb 20, 2022 at 9:49 PM Yang Wang  wrote:
>
>> By design, we should support arbitrary config keys via the CLI when using
>> generic CLI mode.
>>
>> Do you have also specified the "--fromSavepoint" along with
>> "--allowNonRestoredState" when submitting a Flink job via "flink
>> run-application"?
>>
>> From the current code base, it seems that the CLI options(e.g
>> --fromSavepoint, --allowNonRestoredState) have higher priority than Flink
>> config options.
>> And it will make the savepoint related config options are overwritten
>> wrongly. Refer to the implementation[1].
>>
>> [1].
>> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java#L181
>>
>>
>> Best,
>> Yang
>>
>> Andrey Bulgakov  于2022年2月19日周六 08:30写道:
>>
>>> Hi Austin,
>>>
>>> Thanks for the reply! Yeah, the docs aren't super explicit about this.
>>>
>>> But for what it's worth, I'm setting a few options unrelated to
>>> kubernetes this way and they all have effect:
>>> -Dstate.checkpoints.num-retained=100 \
>>>
>>> -Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
>>> \
>>> -Dio.tmp.dirs=/data/flink-local-data \
>>> -Dqueryable-state.enable=true \
>>>
>>> The only one i'm having problems with is
>>> "execution.savepoint.ignore-unclaimed-state".
>>>
>>> On Fri, Feb 18, 2022 at 3:42 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
 Hi Andrey,

 It's unclear to me from the docs[1] if the flink native-kubernetes
 integration supports setting arbitrary config keys via the CLI. I'm cc'ing
 Yang Wang, who has worked a lot in this area and can hopefully help us out.

 Best,
 Austin

 [1]:
 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes

 On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov 
 wrote:

> Hey all,
>
> I'm working on migrating our Flink job away from Hadoop session mode
> to K8S application mode.
> It's been going great so far but I'm hitting a wall with this
> seemingly simple thing.
>
> In the first phase of the migration I want to remove some operators
> (their state can be discarded) and focus on getting the primary pipeline
> running first.
> For that I have to start the cluster from a savepoint with the
> "allowNonRestoredState" parameter turned on.
>
> The problem is that I can't set it in any way that I'm aware of. I
> tried 4 ways separately and simultaneously:
>
> 1) Adding --allowNonRestoredState to flink run-application
> -t kubernetes-application
> 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink
> run-application -t kubernetes-application
> 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my
> local flink-conf.yaml where I'm running flink run-application
> 4) Overriding it in the application code:
> val sigh = new Configuration()
> 
> sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
> true)
> env.configure(sigh)
>
> Every time the resulting pod ends up with "false" value for this
> setting in its configmap:
> $ kc describe cm/flink-config-flink-test | grep ignore
> execution.savepoint.ignore-unclaimed-state: false
>
> And I get the exception:
> 

Re: No effect from --allowNonRestoredState or "execution.savepoint.ignore-unclaimed-state" in K8S application mode

2022-02-22 Thread Andrey Bulgakov
Thank you, Yang. That was it! Specifying "--fromSavepoint" and
"--allowNonRestoredState" for "run-application" together did the trick.

I was a bit confused, because when you run "flink run-application --help",
it only tells you about the "--executor" and "--target" options. So I
assumed I should pass everything else as -D params. I had only tried
passing "--allowNonRestoredState" on the CLI as the last resort but didn't
think to do it together with "--fromSavepoint".

Thanks again!

On Sun, Feb 20, 2022 at 9:49 PM Yang Wang  wrote:

> By design, we should support arbitrary config keys via the CLI when using
> generic CLI mode.
>
> Do you have also specified the "--fromSavepoint" along with
> "--allowNonRestoredState" when submitting a Flink job via "flink
> run-application"?
>
> From the current code base, it seems that the CLI options(e.g
> --fromSavepoint, --allowNonRestoredState) have higher priority than Flink
> config options.
> And it will make the savepoint related config options are overwritten
> wrongly. Refer to the implementation[1].
>
> [1].
> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java#L181
>
>
> Best,
> Yang
>
> Andrey Bulgakov  于2022年2月19日周六 08:30写道:
>
>> Hi Austin,
>>
>> Thanks for the reply! Yeah, the docs aren't super explicit about this.
>>
>> But for what it's worth, I'm setting a few options unrelated to
>> kubernetes this way and they all have effect:
>> -Dstate.checkpoints.num-retained=100 \
>>
>> -Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
>> \
>> -Dio.tmp.dirs=/data/flink-local-data \
>> -Dqueryable-state.enable=true \
>>
>> The only one i'm having problems with is
>> "execution.savepoint.ignore-unclaimed-state".
>>
>> On Fri, Feb 18, 2022 at 3:42 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hi Andrey,
>>>
>>> It's unclear to me from the docs[1] if the flink native-kubernetes
>>> integration supports setting arbitrary config keys via the CLI. I'm cc'ing
>>> Yang Wang, who has worked a lot in this area and can hopefully help us out.
>>>
>>> Best,
>>> Austin
>>>
>>> [1]:
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes
>>>
>>> On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov 
>>> wrote:
>>>
 Hey all,

 I'm working on migrating our Flink job away from Hadoop session mode to
 K8S application mode.
 It's been going great so far but I'm hitting a wall with this seemingly
 simple thing.

 In the first phase of the migration I want to remove some operators
 (their state can be discarded) and focus on getting the primary pipeline
 running first.
 For that I have to start the cluster from a savepoint with the
 "allowNonRestoredState" parameter turned on.

 The problem is that I can't set it in any way that I'm aware of. I
 tried 4 ways separately and simultaneously:

 1) Adding --allowNonRestoredState to flink run-application
 -t kubernetes-application
 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink
 run-application -t kubernetes-application
 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my
 local flink-conf.yaml where I'm running flink run-application
 4) Overriding it in the application code:
 val sigh = new Configuration()
 
 sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
 true)
 env.configure(sigh)

 Every time the resulting pod ends up with "false" value for this
 setting in its configmap:
 $ kc describe cm/flink-config-flink-test | grep ignore
 execution.savepoint.ignore-unclaimed-state: false

 And I get the exception:
 java.lang.IllegalStateException: Failed to rollback to
 checkpoint/savepoint . Cannot map checkpoint/savepoint state for
 operator 68895e9129981bfc6d96d1dad715298e to the new program, because the
 operator is not available in the new program. If you want to allow to skip
 this, you can set the --allowNonRestoredState option on the CLI.

 It seems like something overrides it to false and it never has any
 effect.

 Can this be a bug or am I doing something wrong?

 For context, the savepoint is produced by Flink 1.8.2 and the version
 I'm trying to run on K8S is 1.14.3.

 --
 With regards,
 Andrey Bulgakov


>>
>> --
>> With regards,
>> Andrey Bulgakov
>>
>

-- 
With regards,
Andrey Bulgakov


Re: No effect from --allowNonRestoredState or "execution.savepoint.ignore-unclaimed-state" in K8S application mode

2022-02-20 Thread Yang Wang
By design, we should support arbitrary config keys via the CLI when using
generic CLI mode.

Do you have also specified the "--fromSavepoint" along with
"--allowNonRestoredState" when submitting a Flink job via "flink
run-application"?

>From the current code base, it seems that the CLI options(e.g
--fromSavepoint, --allowNonRestoredState) have higher priority than Flink
config options.
And it will make the savepoint related config options are overwritten
wrongly. Refer to the implementation[1].

[1].
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java#L181


Best,
Yang

Andrey Bulgakov  于2022年2月19日周六 08:30写道:

> Hi Austin,
>
> Thanks for the reply! Yeah, the docs aren't super explicit about this.
>
> But for what it's worth, I'm setting a few options unrelated to kubernetes
> this way and they all have effect:
> -Dstate.checkpoints.num-retained=100 \
>
> -Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
> \
> -Dio.tmp.dirs=/data/flink-local-data \
> -Dqueryable-state.enable=true \
>
> The only one i'm having problems with is
> "execution.savepoint.ignore-unclaimed-state".
>
> On Fri, Feb 18, 2022 at 3:42 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hi Andrey,
>>
>> It's unclear to me from the docs[1] if the flink native-kubernetes
>> integration supports setting arbitrary config keys via the CLI. I'm cc'ing
>> Yang Wang, who has worked a lot in this area and can hopefully help us out.
>>
>> Best,
>> Austin
>>
>> [1]:
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes
>>
>> On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov  wrote:
>>
>>> Hey all,
>>>
>>> I'm working on migrating our Flink job away from Hadoop session mode to
>>> K8S application mode.
>>> It's been going great so far but I'm hitting a wall with this seemingly
>>> simple thing.
>>>
>>> In the first phase of the migration I want to remove some operators
>>> (their state can be discarded) and focus on getting the primary pipeline
>>> running first.
>>> For that I have to start the cluster from a savepoint with the
>>> "allowNonRestoredState" parameter turned on.
>>>
>>> The problem is that I can't set it in any way that I'm aware of. I tried
>>> 4 ways separately and simultaneously:
>>>
>>> 1) Adding --allowNonRestoredState to flink run-application
>>> -t kubernetes-application
>>> 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink
>>> run-application -t kubernetes-application
>>> 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my local
>>> flink-conf.yaml where I'm running flink run-application
>>> 4) Overriding it in the application code:
>>> val sigh = new Configuration()
>>> sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
>>> true)
>>> env.configure(sigh)
>>>
>>> Every time the resulting pod ends up with "false" value for this setting
>>> in its configmap:
>>> $ kc describe cm/flink-config-flink-test | grep ignore
>>> execution.savepoint.ignore-unclaimed-state: false
>>>
>>> And I get the exception:
>>> java.lang.IllegalStateException: Failed to rollback to
>>> checkpoint/savepoint . Cannot map checkpoint/savepoint state for
>>> operator 68895e9129981bfc6d96d1dad715298e to the new program, because the
>>> operator is not available in the new program. If you want to allow to skip
>>> this, you can set the --allowNonRestoredState option on the CLI.
>>>
>>> It seems like something overrides it to false and it never has any
>>> effect.
>>>
>>> Can this be a bug or am I doing something wrong?
>>>
>>> For context, the savepoint is produced by Flink 1.8.2 and the version
>>> I'm trying to run on K8S is 1.14.3.
>>>
>>> --
>>> With regards,
>>> Andrey Bulgakov
>>>
>>>
>
> --
> With regards,
> Andrey Bulgakov
>


Re: No effect from --allowNonRestoredState or "execution.savepoint.ignore-unclaimed-state" in K8S application mode

2022-02-18 Thread Andrey Bulgakov
Hi Austin,

Thanks for the reply! Yeah, the docs aren't super explicit about this.

But for what it's worth, I'm setting a few options unrelated to kubernetes
this way and they all have effect:
-Dstate.checkpoints.num-retained=100 \

-Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
\
-Dio.tmp.dirs=/data/flink-local-data \
-Dqueryable-state.enable=true \

The only one i'm having problems with is
"execution.savepoint.ignore-unclaimed-state".

On Fri, Feb 18, 2022 at 3:42 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi Andrey,
>
> It's unclear to me from the docs[1] if the flink native-kubernetes
> integration supports setting arbitrary config keys via the CLI. I'm cc'ing
> Yang Wang, who has worked a lot in this area and can hopefully help us out.
>
> Best,
> Austin
>
> [1]:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes
>
> On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov  wrote:
>
>> Hey all,
>>
>> I'm working on migrating our Flink job away from Hadoop session mode to
>> K8S application mode.
>> It's been going great so far but I'm hitting a wall with this seemingly
>> simple thing.
>>
>> In the first phase of the migration I want to remove some operators
>> (their state can be discarded) and focus on getting the primary pipeline
>> running first.
>> For that I have to start the cluster from a savepoint with the
>> "allowNonRestoredState" parameter turned on.
>>
>> The problem is that I can't set it in any way that I'm aware of. I tried
>> 4 ways separately and simultaneously:
>>
>> 1) Adding --allowNonRestoredState to flink run-application
>> -t kubernetes-application
>> 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink
>> run-application -t kubernetes-application
>> 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my local
>> flink-conf.yaml where I'm running flink run-application
>> 4) Overriding it in the application code:
>> val sigh = new Configuration()
>> sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
>> true)
>> env.configure(sigh)
>>
>> Every time the resulting pod ends up with "false" value for this setting
>> in its configmap:
>> $ kc describe cm/flink-config-flink-test | grep ignore
>> execution.savepoint.ignore-unclaimed-state: false
>>
>> And I get the exception:
>> java.lang.IllegalStateException: Failed to rollback to
>> checkpoint/savepoint . Cannot map checkpoint/savepoint state for
>> operator 68895e9129981bfc6d96d1dad715298e to the new program, because the
>> operator is not available in the new program. If you want to allow to skip
>> this, you can set the --allowNonRestoredState option on the CLI.
>>
>> It seems like something overrides it to false and it never has any effect.
>>
>> Can this be a bug or am I doing something wrong?
>>
>> For context, the savepoint is produced by Flink 1.8.2 and the version I'm
>> trying to run on K8S is 1.14.3.
>>
>> --
>> With regards,
>> Andrey Bulgakov
>>
>>

-- 
With regards,
Andrey Bulgakov


Re: No effect from --allowNonRestoredState or "execution.savepoint.ignore-unclaimed-state" in K8S application mode

2022-02-18 Thread Austin Cawley-Edwards
Hi Andrey,

It's unclear to me from the docs[1] if the flink native-kubernetes
integration supports setting arbitrary config keys via the CLI. I'm cc'ing
Yang Wang, who has worked a lot in this area and can hopefully help us out.

Best,
Austin

[1]:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes

On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov  wrote:

> Hey all,
>
> I'm working on migrating our Flink job away from Hadoop session mode to
> K8S application mode.
> It's been going great so far but I'm hitting a wall with this seemingly
> simple thing.
>
> In the first phase of the migration I want to remove some operators (their
> state can be discarded) and focus on getting the primary pipeline running
> first.
> For that I have to start the cluster from a savepoint with the
> "allowNonRestoredState" parameter turned on.
>
> The problem is that I can't set it in any way that I'm aware of. I tried 4
> ways separately and simultaneously:
>
> 1) Adding --allowNonRestoredState to flink run-application
> -t kubernetes-application
> 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink
> run-application -t kubernetes-application
> 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my local
> flink-conf.yaml where I'm running flink run-application
> 4) Overriding it in the application code:
> val sigh = new Configuration()
> sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
> true)
> env.configure(sigh)
>
> Every time the resulting pod ends up with "false" value for this setting
> in its configmap:
> $ kc describe cm/flink-config-flink-test | grep ignore
> execution.savepoint.ignore-unclaimed-state: false
>
> And I get the exception:
> java.lang.IllegalStateException: Failed to rollback to
> checkpoint/savepoint . Cannot map checkpoint/savepoint state for
> operator 68895e9129981bfc6d96d1dad715298e to the new program, because the
> operator is not available in the new program. If you want to allow to skip
> this, you can set the --allowNonRestoredState option on the CLI.
>
> It seems like something overrides it to false and it never has any effect.
>
> Can this be a bug or am I doing something wrong?
>
> For context, the savepoint is produced by Flink 1.8.2 and the version I'm
> trying to run on K8S is 1.14.3.
>
> --
> With regards,
> Andrey Bulgakov
>
>


No effect from --allowNonRestoredState or "execution.savepoint.ignore-unclaimed-state" in K8S application mode

2022-02-18 Thread Andrey Bulgakov
Hey all,

I'm working on migrating our Flink job away from Hadoop session mode to K8S
application mode.
It's been going great so far but I'm hitting a wall with this seemingly
simple thing.

In the first phase of the migration I want to remove some operators (their
state can be discarded) and focus on getting the primary pipeline running
first.
For that I have to start the cluster from a savepoint with the
"allowNonRestoredState" parameter turned on.

The problem is that I can't set it in any way that I'm aware of. I tried 4
ways separately and simultaneously:

1) Adding --allowNonRestoredState to flink run-application
-t kubernetes-application
2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink
run-application -t kubernetes-application
3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my local
flink-conf.yaml where I'm running flink run-application
4) Overriding it in the application code:
val sigh = new Configuration()
sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
true)
env.configure(sigh)

Every time the resulting pod ends up with "false" value for this setting in
its configmap:
$ kc describe cm/flink-config-flink-test | grep ignore
execution.savepoint.ignore-unclaimed-state: false

And I get the exception:
java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint
. Cannot map checkpoint/savepoint state for operator
68895e9129981bfc6d96d1dad715298e to the new program, because the operator
is not available in the new program. If you want to allow to skip this, you
can set the --allowNonRestoredState option on the CLI.

It seems like something overrides it to false and it never has any effect.

Can this be a bug or am I doing something wrong?

For context, the savepoint is produced by Flink 1.8.2 and the version I'm
trying to run on K8S is 1.14.3.

-- 
With regards,
Andrey Bulgakov