Hi Andrey,

It's unclear to me from the docs[1] if the flink native-kubernetes
integration supports setting arbitrary config keys via the CLI. I'm cc'ing
Yang Wang, who has worked a lot in this area and can hopefully help us out.

Best,
Austin

[1]:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes

On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov <m...@andreiko.ru> wrote:

> Hey all,
>
> I'm working on migrating our Flink job away from Hadoop session mode to
> K8S application mode.
> It's been going great so far but I'm hitting a wall with this seemingly
> simple thing.
>
> In the first phase of the migration I want to remove some operators (their
> state can be discarded) and focus on getting the primary pipeline running
> first.
> For that I have to start the cluster from a savepoint with the
> "allowNonRestoredState" parameter turned on.
>
> The problem is that I can't set it in any way that I'm aware of. I tried 4
> ways separately and simultaneously:
>
> 1) Adding --allowNonRestoredState to flink run-application
> -t kubernetes-application
> 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink
> run-application -t kubernetes-application
> 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my local
> flink-conf.yaml where I'm running flink run-application
> 4) Overriding it in the application code:
>     val sigh = new Configuration()
>     sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
> true)
>     env.configure(sigh)
>
> Every time the resulting pod ends up with "false" value for this setting
> in its configmap:
> $ kc describe cm/flink-config-flink-test | grep ignore
> execution.savepoint.ignore-unclaimed-state: false
>
> And I get the exception:
> java.lang.IllegalStateException: Failed to rollback to
> checkpoint/savepoint <URL>. Cannot map checkpoint/savepoint state for
> operator 68895e9129981bfc6d96d1dad715298e to the new program, because the
> operator is not available in the new program. If you want to allow to skip
> this, you can set the --allowNonRestoredState option on the CLI.
>
> It seems like something overrides it to false and it never has any effect.
>
> Can this be a bug or am I doing something wrong?
>
> For context, the savepoint is produced by Flink 1.8.2 and the version I'm
> trying to run on K8S is 1.14.3.
>
> --
> With regards,
> Andrey Bulgakov
>
>

Reply via email to