By design, we should support arbitrary config keys via the CLI when using generic CLI mode.
Do you have also specified the "--fromSavepoint" along with "--allowNonRestoredState" when submitting a Flink job via "flink run-application"? >From the current code base, it seems that the CLI options(e.g --fromSavepoint, --allowNonRestoredState) have higher priority than Flink config options. And it will make the savepoint related config options are overwritten wrongly. Refer to the implementation[1]. [1]. https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java#L181 Best, Yang Andrey Bulgakov <m...@andreiko.ru> 于2022年2月19日周六 08:30写道: > Hi Austin, > > Thanks for the reply! Yeah, the docs aren't super explicit about this. > > But for what it's worth, I'm setting a few options unrelated to kubernetes > this way and they all have effect: > -Dstate.checkpoints.num-retained=100 \ > > -Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider > \ > -Dio.tmp.dirs=/data/flink-local-data \ > -Dqueryable-state.enable=true \ > > The only one i'm having problems with is > "execution.savepoint.ignore-unclaimed-state". > > On Fri, Feb 18, 2022 at 3:42 PM Austin Cawley-Edwards < > austin.caw...@gmail.com> wrote: > >> Hi Andrey, >> >> It's unclear to me from the docs[1] if the flink native-kubernetes >> integration supports setting arbitrary config keys via the CLI. I'm cc'ing >> Yang Wang, who has worked a lot in this area and can hopefully help us out. >> >> Best, >> Austin >> >> [1]: >> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes >> >> On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov <m...@andreiko.ru> wrote: >> >>> Hey all, >>> >>> I'm working on migrating our Flink job away from Hadoop session mode to >>> K8S application mode. >>> It's been going great so far but I'm hitting a wall with this seemingly >>> simple thing. >>> >>> In the first phase of the migration I want to remove some operators >>> (their state can be discarded) and focus on getting the primary pipeline >>> running first. >>> For that I have to start the cluster from a savepoint with the >>> "allowNonRestoredState" parameter turned on. >>> >>> The problem is that I can't set it in any way that I'm aware of. I tried >>> 4 ways separately and simultaneously: >>> >>> 1) Adding --allowNonRestoredState to flink run-application >>> -t kubernetes-application >>> 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink >>> run-application -t kubernetes-application >>> 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my local >>> flink-conf.yaml where I'm running flink run-application >>> 4) Overriding it in the application code: >>> val sigh = new Configuration() >>> sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, >>> true) >>> env.configure(sigh) >>> >>> Every time the resulting pod ends up with "false" value for this setting >>> in its configmap: >>> $ kc describe cm/flink-config-flink-test | grep ignore >>> execution.savepoint.ignore-unclaimed-state: false >>> >>> And I get the exception: >>> java.lang.IllegalStateException: Failed to rollback to >>> checkpoint/savepoint <URL>. Cannot map checkpoint/savepoint state for >>> operator 68895e9129981bfc6d96d1dad715298e to the new program, because the >>> operator is not available in the new program. If you want to allow to skip >>> this, you can set the --allowNonRestoredState option on the CLI. >>> >>> It seems like something overrides it to false and it never has any >>> effect. >>> >>> Can this be a bug or am I doing something wrong? >>> >>> For context, the savepoint is produced by Flink 1.8.2 and the version >>> I'm trying to run on K8S is 1.14.3. >>> >>> -- >>> With regards, >>> Andrey Bulgakov >>> >>> > > -- > With regards, > Andrey Bulgakov >