By design, we should support arbitrary config keys via the CLI when using
generic CLI mode.

Do you have also specified the "--fromSavepoint" along with
"--allowNonRestoredState" when submitting a Flink job via "flink
run-application"?

>From the current code base, it seems that the CLI options(e.g
--fromSavepoint, --allowNonRestoredState) have higher priority than Flink
config options.
And it will make the savepoint related config options are overwritten
wrongly. Refer to the implementation[1].

[1].
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java#L181


Best,
Yang

Andrey Bulgakov <m...@andreiko.ru> 于2022年2月19日周六 08:30写道:

> Hi Austin,
>
> Thanks for the reply! Yeah, the docs aren't super explicit about this.
>
> But for what it's worth, I'm setting a few options unrelated to kubernetes
> this way and they all have effect:
>     -Dstate.checkpoints.num-retained=100 \
>
> -Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
> \
>     -Dio.tmp.dirs=/data/flink-local-data \
>     -Dqueryable-state.enable=true \
>
> The only one i'm having problems with is
> "execution.savepoint.ignore-unclaimed-state".
>
> On Fri, Feb 18, 2022 at 3:42 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hi Andrey,
>>
>> It's unclear to me from the docs[1] if the flink native-kubernetes
>> integration supports setting arbitrary config keys via the CLI. I'm cc'ing
>> Yang Wang, who has worked a lot in this area and can hopefully help us out.
>>
>> Best,
>> Austin
>>
>> [1]:
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes
>>
>> On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov <m...@andreiko.ru> wrote:
>>
>>> Hey all,
>>>
>>> I'm working on migrating our Flink job away from Hadoop session mode to
>>> K8S application mode.
>>> It's been going great so far but I'm hitting a wall with this seemingly
>>> simple thing.
>>>
>>> In the first phase of the migration I want to remove some operators
>>> (their state can be discarded) and focus on getting the primary pipeline
>>> running first.
>>> For that I have to start the cluster from a savepoint with the
>>> "allowNonRestoredState" parameter turned on.
>>>
>>> The problem is that I can't set it in any way that I'm aware of. I tried
>>> 4 ways separately and simultaneously:
>>>
>>> 1) Adding --allowNonRestoredState to flink run-application
>>> -t kubernetes-application
>>> 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink
>>> run-application -t kubernetes-application
>>> 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my local
>>> flink-conf.yaml where I'm running flink run-application
>>> 4) Overriding it in the application code:
>>>     val sigh = new Configuration()
>>>     sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
>>> true)
>>>     env.configure(sigh)
>>>
>>> Every time the resulting pod ends up with "false" value for this setting
>>> in its configmap:
>>> $ kc describe cm/flink-config-flink-test | grep ignore
>>> execution.savepoint.ignore-unclaimed-state: false
>>>
>>> And I get the exception:
>>> java.lang.IllegalStateException: Failed to rollback to
>>> checkpoint/savepoint <URL>. Cannot map checkpoint/savepoint state for
>>> operator 68895e9129981bfc6d96d1dad715298e to the new program, because the
>>> operator is not available in the new program. If you want to allow to skip
>>> this, you can set the --allowNonRestoredState option on the CLI.
>>>
>>> It seems like something overrides it to false and it never has any
>>> effect.
>>>
>>> Can this be a bug or am I doing something wrong?
>>>
>>> For context, the savepoint is produced by Flink 1.8.2 and the version
>>> I'm trying to run on K8S is 1.14.3.
>>>
>>> --
>>> With regards,
>>> Andrey Bulgakov
>>>
>>>
>
> --
> With regards,
> Andrey Bulgakov
>

Reply via email to