Re: No effect from --allowNonRestoredState or "execution.savepoint.ignore-unclaimed-state" in K8S application mode
The config options configured by -D param should take effect. It is also the recommended way instead of CLI options(e.g. --fromSavepoint). Not only the K8s application, it also does not work for yarn application and yarn per-job mode. I believe it is indeed a bug in the current implementation and have created a ticket for this[1]. After then you could start the Flink k8s application via the following command. *$FLINK_HOME/bin/flink run-application -t kubernetes-application \-Dkubernetes.cluster-id=$CLUSTER_ID \-Dkubernetes.namespace=$NAMESPACE \-Dkubernetes.container.image=$IMAGE \-Dexecution.savepoint.ignore-unclaimed-state=true -Dexecution.savepoint.path=oss://flink-debug-yiqi/flink-ha \local:///opt/flink/examples/streaming/StateMachineExample.jar* If you still want to use the CLI options, then I expect at least you need to set "--fromSavepoint". [1]. https://issues.apache.org/jira/browse/FLINK-26316 Best, Yang Andrey Bulgakov 于2022年2月23日周三 04:09写道: > Thank you, Yang. That was it! Specifying "--fromSavepoint" and > "--allowNonRestoredState" for "run-application" together did the trick. > > I was a bit confused, because when you run "flink run-application --help", > it only tells you about the "--executor" and "--target" options. So I > assumed I should pass everything else as -D params. I had only tried > passing "--allowNonRestoredState" on the CLI as the last resort but didn't > think to do it together with "--fromSavepoint". > > Thanks again! > > On Sun, Feb 20, 2022 at 9:49 PM Yang Wang wrote: > >> By design, we should support arbitrary config keys via the CLI when using >> generic CLI mode. >> >> Do you have also specified the "--fromSavepoint" along with >> "--allowNonRestoredState" when submitting a Flink job via "flink >> run-application"? >> >> From the current code base, it seems that the CLI options(e.g >> --fromSavepoint, --allowNonRestoredState) have higher priority than Flink >> config options. >> And it will make the savepoint related config options are overwritten >> wrongly. Refer to the implementation[1]. >> >> [1]. >> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java#L181 >> >> >> Best, >> Yang >> >> Andrey Bulgakov 于2022年2月19日周六 08:30写道: >> >>> Hi Austin, >>> >>> Thanks for the reply! Yeah, the docs aren't super explicit about this. >>> >>> But for what it's worth, I'm setting a few options unrelated to >>> kubernetes this way and they all have effect: >>> -Dstate.checkpoints.num-retained=100 \ >>> >>> -Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider >>> \ >>> -Dio.tmp.dirs=/data/flink-local-data \ >>> -Dqueryable-state.enable=true \ >>> >>> The only one i'm having problems with is >>> "execution.savepoint.ignore-unclaimed-state". >>> >>> On Fri, Feb 18, 2022 at 3:42 PM Austin Cawley-Edwards < >>> austin.caw...@gmail.com> wrote: >>> Hi Andrey, It's unclear to me from the docs[1] if the flink native-kubernetes integration supports setting arbitrary config keys via the CLI. I'm cc'ing Yang Wang, who has worked a lot in this area and can hopefully help us out. Best, Austin [1]: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov wrote: > Hey all, > > I'm working on migrating our Flink job away from Hadoop session mode > to K8S application mode. > It's been going great so far but I'm hitting a wall with this > seemingly simple thing. > > In the first phase of the migration I want to remove some operators > (their state can be discarded) and focus on getting the primary pipeline > running first. > For that I have to start the cluster from a savepoint with the > "allowNonRestoredState" parameter turned on. > > The problem is that I can't set it in any way that I'm aware of. I > tried 4 ways separately and simultaneously: > > 1) Adding --allowNonRestoredState to flink run-application > -t kubernetes-application > 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink > run-application -t kubernetes-application > 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my > local flink-conf.yaml where I'm running flink run-application > 4) Overriding it in the application code: > val sigh = new Configuration() > > sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, > true) > env.configure(sigh) > > Every time the resulting pod ends up with "false" value for this > setting in its configmap: > $ kc describe cm/flink-config-flink-test | grep ignore > execution.savepoint.ignore-unclaimed-state: false > > And I get the exception: >
Re: No effect from --allowNonRestoredState or "execution.savepoint.ignore-unclaimed-state" in K8S application mode
Thank you, Yang. That was it! Specifying "--fromSavepoint" and "--allowNonRestoredState" for "run-application" together did the trick. I was a bit confused, because when you run "flink run-application --help", it only tells you about the "--executor" and "--target" options. So I assumed I should pass everything else as -D params. I had only tried passing "--allowNonRestoredState" on the CLI as the last resort but didn't think to do it together with "--fromSavepoint". Thanks again! On Sun, Feb 20, 2022 at 9:49 PM Yang Wang wrote: > By design, we should support arbitrary config keys via the CLI when using > generic CLI mode. > > Do you have also specified the "--fromSavepoint" along with > "--allowNonRestoredState" when submitting a Flink job via "flink > run-application"? > > From the current code base, it seems that the CLI options(e.g > --fromSavepoint, --allowNonRestoredState) have higher priority than Flink > config options. > And it will make the savepoint related config options are overwritten > wrongly. Refer to the implementation[1]. > > [1]. > https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java#L181 > > > Best, > Yang > > Andrey Bulgakov 于2022年2月19日周六 08:30写道: > >> Hi Austin, >> >> Thanks for the reply! Yeah, the docs aren't super explicit about this. >> >> But for what it's worth, I'm setting a few options unrelated to >> kubernetes this way and they all have effect: >> -Dstate.checkpoints.num-retained=100 \ >> >> -Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider >> \ >> -Dio.tmp.dirs=/data/flink-local-data \ >> -Dqueryable-state.enable=true \ >> >> The only one i'm having problems with is >> "execution.savepoint.ignore-unclaimed-state". >> >> On Fri, Feb 18, 2022 at 3:42 PM Austin Cawley-Edwards < >> austin.caw...@gmail.com> wrote: >> >>> Hi Andrey, >>> >>> It's unclear to me from the docs[1] if the flink native-kubernetes >>> integration supports setting arbitrary config keys via the CLI. I'm cc'ing >>> Yang Wang, who has worked a lot in this area and can hopefully help us out. >>> >>> Best, >>> Austin >>> >>> [1]: >>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes >>> >>> On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov >>> wrote: >>> Hey all, I'm working on migrating our Flink job away from Hadoop session mode to K8S application mode. It's been going great so far but I'm hitting a wall with this seemingly simple thing. In the first phase of the migration I want to remove some operators (their state can be discarded) and focus on getting the primary pipeline running first. For that I have to start the cluster from a savepoint with the "allowNonRestoredState" parameter turned on. The problem is that I can't set it in any way that I'm aware of. I tried 4 ways separately and simultaneously: 1) Adding --allowNonRestoredState to flink run-application -t kubernetes-application 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink run-application -t kubernetes-application 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my local flink-conf.yaml where I'm running flink run-application 4) Overriding it in the application code: val sigh = new Configuration() sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, true) env.configure(sigh) Every time the resulting pod ends up with "false" value for this setting in its configmap: $ kc describe cm/flink-config-flink-test | grep ignore execution.savepoint.ignore-unclaimed-state: false And I get the exception: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint . Cannot map checkpoint/savepoint state for operator 68895e9129981bfc6d96d1dad715298e to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI. It seems like something overrides it to false and it never has any effect. Can this be a bug or am I doing something wrong? For context, the savepoint is produced by Flink 1.8.2 and the version I'm trying to run on K8S is 1.14.3. -- With regards, Andrey Bulgakov >> >> -- >> With regards, >> Andrey Bulgakov >> > -- With regards, Andrey Bulgakov
Re: No effect from --allowNonRestoredState or "execution.savepoint.ignore-unclaimed-state" in K8S application mode
By design, we should support arbitrary config keys via the CLI when using generic CLI mode. Do you have also specified the "--fromSavepoint" along with "--allowNonRestoredState" when submitting a Flink job via "flink run-application"? >From the current code base, it seems that the CLI options(e.g --fromSavepoint, --allowNonRestoredState) have higher priority than Flink config options. And it will make the savepoint related config options are overwritten wrongly. Refer to the implementation[1]. [1]. https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java#L181 Best, Yang Andrey Bulgakov 于2022年2月19日周六 08:30写道: > Hi Austin, > > Thanks for the reply! Yeah, the docs aren't super explicit about this. > > But for what it's worth, I'm setting a few options unrelated to kubernetes > this way and they all have effect: > -Dstate.checkpoints.num-retained=100 \ > > -Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider > \ > -Dio.tmp.dirs=/data/flink-local-data \ > -Dqueryable-state.enable=true \ > > The only one i'm having problems with is > "execution.savepoint.ignore-unclaimed-state". > > On Fri, Feb 18, 2022 at 3:42 PM Austin Cawley-Edwards < > austin.caw...@gmail.com> wrote: > >> Hi Andrey, >> >> It's unclear to me from the docs[1] if the flink native-kubernetes >> integration supports setting arbitrary config keys via the CLI. I'm cc'ing >> Yang Wang, who has worked a lot in this area and can hopefully help us out. >> >> Best, >> Austin >> >> [1]: >> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes >> >> On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov wrote: >> >>> Hey all, >>> >>> I'm working on migrating our Flink job away from Hadoop session mode to >>> K8S application mode. >>> It's been going great so far but I'm hitting a wall with this seemingly >>> simple thing. >>> >>> In the first phase of the migration I want to remove some operators >>> (their state can be discarded) and focus on getting the primary pipeline >>> running first. >>> For that I have to start the cluster from a savepoint with the >>> "allowNonRestoredState" parameter turned on. >>> >>> The problem is that I can't set it in any way that I'm aware of. I tried >>> 4 ways separately and simultaneously: >>> >>> 1) Adding --allowNonRestoredState to flink run-application >>> -t kubernetes-application >>> 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink >>> run-application -t kubernetes-application >>> 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my local >>> flink-conf.yaml where I'm running flink run-application >>> 4) Overriding it in the application code: >>> val sigh = new Configuration() >>> sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, >>> true) >>> env.configure(sigh) >>> >>> Every time the resulting pod ends up with "false" value for this setting >>> in its configmap: >>> $ kc describe cm/flink-config-flink-test | grep ignore >>> execution.savepoint.ignore-unclaimed-state: false >>> >>> And I get the exception: >>> java.lang.IllegalStateException: Failed to rollback to >>> checkpoint/savepoint . Cannot map checkpoint/savepoint state for >>> operator 68895e9129981bfc6d96d1dad715298e to the new program, because the >>> operator is not available in the new program. If you want to allow to skip >>> this, you can set the --allowNonRestoredState option on the CLI. >>> >>> It seems like something overrides it to false and it never has any >>> effect. >>> >>> Can this be a bug or am I doing something wrong? >>> >>> For context, the savepoint is produced by Flink 1.8.2 and the version >>> I'm trying to run on K8S is 1.14.3. >>> >>> -- >>> With regards, >>> Andrey Bulgakov >>> >>> > > -- > With regards, > Andrey Bulgakov >
Re: No effect from --allowNonRestoredState or "execution.savepoint.ignore-unclaimed-state" in K8S application mode
Hi Austin, Thanks for the reply! Yeah, the docs aren't super explicit about this. But for what it's worth, I'm setting a few options unrelated to kubernetes this way and they all have effect: -Dstate.checkpoints.num-retained=100 \ -Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider \ -Dio.tmp.dirs=/data/flink-local-data \ -Dqueryable-state.enable=true \ The only one i'm having problems with is "execution.savepoint.ignore-unclaimed-state". On Fri, Feb 18, 2022 at 3:42 PM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote: > Hi Andrey, > > It's unclear to me from the docs[1] if the flink native-kubernetes > integration supports setting arbitrary config keys via the CLI. I'm cc'ing > Yang Wang, who has worked a lot in this area and can hopefully help us out. > > Best, > Austin > > [1]: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes > > On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov wrote: > >> Hey all, >> >> I'm working on migrating our Flink job away from Hadoop session mode to >> K8S application mode. >> It's been going great so far but I'm hitting a wall with this seemingly >> simple thing. >> >> In the first phase of the migration I want to remove some operators >> (their state can be discarded) and focus on getting the primary pipeline >> running first. >> For that I have to start the cluster from a savepoint with the >> "allowNonRestoredState" parameter turned on. >> >> The problem is that I can't set it in any way that I'm aware of. I tried >> 4 ways separately and simultaneously: >> >> 1) Adding --allowNonRestoredState to flink run-application >> -t kubernetes-application >> 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink >> run-application -t kubernetes-application >> 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my local >> flink-conf.yaml where I'm running flink run-application >> 4) Overriding it in the application code: >> val sigh = new Configuration() >> sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, >> true) >> env.configure(sigh) >> >> Every time the resulting pod ends up with "false" value for this setting >> in its configmap: >> $ kc describe cm/flink-config-flink-test | grep ignore >> execution.savepoint.ignore-unclaimed-state: false >> >> And I get the exception: >> java.lang.IllegalStateException: Failed to rollback to >> checkpoint/savepoint . Cannot map checkpoint/savepoint state for >> operator 68895e9129981bfc6d96d1dad715298e to the new program, because the >> operator is not available in the new program. If you want to allow to skip >> this, you can set the --allowNonRestoredState option on the CLI. >> >> It seems like something overrides it to false and it never has any effect. >> >> Can this be a bug or am I doing something wrong? >> >> For context, the savepoint is produced by Flink 1.8.2 and the version I'm >> trying to run on K8S is 1.14.3. >> >> -- >> With regards, >> Andrey Bulgakov >> >> -- With regards, Andrey Bulgakov
Re: No effect from --allowNonRestoredState or "execution.savepoint.ignore-unclaimed-state" in K8S application mode
Hi Andrey, It's unclear to me from the docs[1] if the flink native-kubernetes integration supports setting arbitrary config keys via the CLI. I'm cc'ing Yang Wang, who has worked a lot in this area and can hopefully help us out. Best, Austin [1]: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov wrote: > Hey all, > > I'm working on migrating our Flink job away from Hadoop session mode to > K8S application mode. > It's been going great so far but I'm hitting a wall with this seemingly > simple thing. > > In the first phase of the migration I want to remove some operators (their > state can be discarded) and focus on getting the primary pipeline running > first. > For that I have to start the cluster from a savepoint with the > "allowNonRestoredState" parameter turned on. > > The problem is that I can't set it in any way that I'm aware of. I tried 4 > ways separately and simultaneously: > > 1) Adding --allowNonRestoredState to flink run-application > -t kubernetes-application > 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink > run-application -t kubernetes-application > 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my local > flink-conf.yaml where I'm running flink run-application > 4) Overriding it in the application code: > val sigh = new Configuration() > sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, > true) > env.configure(sigh) > > Every time the resulting pod ends up with "false" value for this setting > in its configmap: > $ kc describe cm/flink-config-flink-test | grep ignore > execution.savepoint.ignore-unclaimed-state: false > > And I get the exception: > java.lang.IllegalStateException: Failed to rollback to > checkpoint/savepoint . Cannot map checkpoint/savepoint state for > operator 68895e9129981bfc6d96d1dad715298e to the new program, because the > operator is not available in the new program. If you want to allow to skip > this, you can set the --allowNonRestoredState option on the CLI. > > It seems like something overrides it to false and it never has any effect. > > Can this be a bug or am I doing something wrong? > > For context, the savepoint is produced by Flink 1.8.2 and the version I'm > trying to run on K8S is 1.14.3. > > -- > With regards, > Andrey Bulgakov > >
No effect from --allowNonRestoredState or "execution.savepoint.ignore-unclaimed-state" in K8S application mode
Hey all, I'm working on migrating our Flink job away from Hadoop session mode to K8S application mode. It's been going great so far but I'm hitting a wall with this seemingly simple thing. In the first phase of the migration I want to remove some operators (their state can be discarded) and focus on getting the primary pipeline running first. For that I have to start the cluster from a savepoint with the "allowNonRestoredState" parameter turned on. The problem is that I can't set it in any way that I'm aware of. I tried 4 ways separately and simultaneously: 1) Adding --allowNonRestoredState to flink run-application -t kubernetes-application 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink run-application -t kubernetes-application 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my local flink-conf.yaml where I'm running flink run-application 4) Overriding it in the application code: val sigh = new Configuration() sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, true) env.configure(sigh) Every time the resulting pod ends up with "false" value for this setting in its configmap: $ kc describe cm/flink-config-flink-test | grep ignore execution.savepoint.ignore-unclaimed-state: false And I get the exception: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint . Cannot map checkpoint/savepoint state for operator 68895e9129981bfc6d96d1dad715298e to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI. It seems like something overrides it to false and it never has any effect. Can this be a bug or am I doing something wrong? For context, the savepoint is produced by Flink 1.8.2 and the version I'm trying to run on K8S is 1.14.3. -- With regards, Andrey Bulgakov