Re: Flink 1.17 upgrade issue when using azure storage account for checkpoints/savepoints
On Sat, Mar 25, 2023 at 02:01:24PM +0530, Jessy Ping wrote: > Root cause: Caused by: java.util.concurrent.CompletionException: > java.lang.RuntimeException: java.lang.ClassNotFoundException: Class > org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found We have similar error with Google Cloud Storage, and there is workaround in slack thread https://apache-flink.slack.com/archives/C03G7LJTS2G/p1679320815257449 -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15
On Tue, Jun 07, 2022 at 03:26:43PM +0800, czc...@gmail.com wrote: > On Mon, Jun 06, 2022 at 10:42:08AM +0800, Shengkai Fang wrote: > > Hi. In my experience, the step to debug classloading problems are as > > follows: > > Thanks for the help. We get the following log when using > `-verbose:class`: > > [2.074s][info][class,load] org.apache.hadoop.fs.FileSystem source: > file:/opt/flink/opt/flink-s3-fs-hadoop-1.15.0.jar > ... > [8.094s][info][class,load] > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem source: > file:/opt/flink/opt/flink-gs-fs-hadoop-1.15.0.jar > > > It looks like application uses hadoop.fs.FileSystem from > flink-s3-fs-hadoop-1.15.0.jar, and use GoogleHadoopFileSystem from > flink-gs-fs-hadoop-1.15.0.jar, and they are incompatible. Since we run > Flink in both AWS and GCP, our base image contains both plugins at the > same time. Any idea how to workaround it? > > We also try to set `classloader.resolve-order: parent-first`. However, > we got another error causing by library conflict between Flink and our > application: > > Caused by: com.fasterxml.jackson.databind.JsonMappingException: Scala > module 2.11.3 requires Jackson Databind version >= 2.11.0 and < 2.12.0 We solve the problem by moving plugins into correct plugins directory. Thanks for the help from slack. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15
On Thu, Jun 02, 2022 at 06:23:20PM +0800, Qingsheng Ren wrote: > Thanks for the input ChangZhuo. > > Could you check if the configuration "classloader.resolve-order” is > set to “parent-first” in your Flink 1.14 cluster? I didn’t notice any > changes related to the user code classloader in Flink 1.15. If my > assumption is correct, you package the gcs-connector into your job JAR > but the Hadoop FS dependencies are not included, so > org.apache.hadoop.fs.FileSystem is loaded by app classloader from > flink-s3-fs-hadoop.jar under the lib of Flink, but > GoogleHadoopFileSystem is loaded by user code classloader from job > JAR. Setting the resolve order to "parent-first" could bypass the > issue [1] so I assume you have this config in 1.14 but not in 1.15. > Please forgive me if I understand incorrectly! No, we do not config classloader.resolve-order in both 1.14, and 1.15 setup. We will check if "parent-first" can solve the problem, thanks for the advise. Also, in 1.14, we include the following jars into /opt/flink/lib to support GCS: * flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar * gcs-connector-hadoop3-2.2.2-shaded.jar In 1.15, we add flink-gs-fs-hadoop-1.15.0.jar to /opt/flink/lib to support GCS. Maybe this different causes problem? -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15
15.0.jar:1.15.0] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291) ~[flink-dist-1.15.0.jar:1.15.0] ... 13 more Caused by: java.lang.ClassCastException: class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class org.apache.hadoop.fs.FileSystem (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app') at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] .listPath() ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?] at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.15.0.jar:1.15.0] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.15.0.jar:1.15.0] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist-1.15.0.jar:1.15.0] at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291) ~[flink-dist-1.15.0.jar:1.15.0] ... 13 more 2022-06-02 00:25:57,830 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally.. > > Best regards, > > Qingsheng > > > On Jun 2, 2022, at 09:08, ChangZhuo Chen (陳昌倬) wrote: > > > > Hi, > > > > We use GCS as storage, and have the following functions to list files in > > GCS path for Flink batch mode to buidl states: > > > > > > def listPath(p: String): Seq[String] = { > >val path = new Path(p) > >val fs = path.getFileSystem(new Configuration()) > >fs.listStatus(path) match { > > case null => Seq() > > case xs => xs.map(_.getPath.toString) > >} > > } > > > > This function works fine in Flink 1.14. However, in Flink 1.15, we have > > the following exception: > > > > Caused by: java.lang.ClassCastException: class > > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to > > class org.apache.hadoop.fs.FileSystem > > (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and > > org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app') > > at > > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374) > > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) > > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > at > > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) > > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) > > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) > > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) > > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] > > at .listPath() ~[?:?] > > > > We found a similar issue in Spark [0]. However, we are not sure if it is > > related, and if it is, how can we apply this fix. Any help is welcome. > > > > > > [0] https://issues.apache.org/jira/browse/SPARK-9206 > > > > > > -- > > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org > > http://czchen.info/ > > Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B > -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15
Hi, We use GCS as storage, and have the following functions to list files in GCS path for Flink batch mode to buidl states: def listPath(p: String): Seq[String] = { val path = new Path(p) val fs = path.getFileSystem(new Configuration()) fs.listStatus(path) match { case null => Seq() case xs => xs.map(_.getPath.toString) } } This function works fine in Flink 1.14. However, in Flink 1.15, we have the following exception: Caused by: java.lang.ClassCastException: class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class org.apache.hadoop.fs.FileSystem (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app') at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0] at .listPath() ~[?:?] We found a similar issue in Spark [0]. However, we are not sure if it is related, and if it is, how can we apply this fix. Any help is welcome. [0] https://issues.apache.org/jira/browse/SPARK-9206 -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: Prometheus metrics does not work in 1.15.0 taskmanager
On Wed, May 04, 2022 at 01:53:01PM +0200, Chesnay Schepler wrote: > Disabling the kafka metrics _should_ work. Is there anyway to disable Kafka metrics when using low level process function? -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: Prometheus metrics does not work in 1.15.0 taskmanager
On Tue, May 03, 2022 at 10:32:03AM +0200, Peter Schrott wrote: > Hi! > > I also discovered problems with the PrometheusReporter on Flink 1.15.0, > coming from 1.14.4. I already consulted the mailing list: > https://lists.apache.org/thread/m8ohrfkrq1tqgq7lowr9p226z3yc0fgc > I have not found the underlying problem or a solution to it. > > Actually, after re-checking, I see the same log WARNINGS as > ChangZhou described. > > As I described, it seems to be an issue with my job. If no job, or an > example job runs on the taskmanager the basic metrics work just fine. Maybe > ChangZhou can confirm this? > > @ChangZhou what's your job setup? I am running a streaming SQL job, but > also using data streams API to create the streaming environment and from > that the table environment and finally using a StatementSet to execute > multiple SQL statements in one job. We are running a streaming application with low level API with Kubernetes operator FlinkDeployment. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: Prometheus metrics does not work in 1.15.0 taskmanager
On Tue, May 03, 2022 at 10:28:18AM +0200, Chesnay Schepler wrote: > Is there any warning in the logs containing "Error while handling metric"? No, we don't find any "Error while handling metric" -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: Prometheus metrics does not work in 1.15.0 taskmanager
On Tue, May 03, 2022 at 01:00:42AM -0700, Mason Chen wrote: > Hi ChangZhou, > > The warning log indicates that the metric was previously defined and so the > runtime is handling the "duplicate" metric by ignoring it. This is > typically a benign message unless you rely on this metric. Is it possible > that you are using the same task name for different tasks? It would be > defined by the `.name(...)` API in your job graph instantiation. > > Can you clarify what it means that your endpoint isn't working--some > metrics missing, endpoint is timing out, etc.? Also, can you confirm from > logs that the PrometheusReporter was created properly? Endpoint isn't working means we got empty reply from Prometheus endpoint. The following is our testing for taskmanager Prometheus endpoint. curl localhost:9249 curl: (52) Empty reply from server We have the following log in taskmanager, so PrometheusReporter was created properly. 2022-05-03 01:48:16,678 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: metrics.reporter.prom.class, org.apache.flink.metrics.prometheus.PrometheusReporter ... 2022-05-03 01:48:23,665 INFO org.apache.flink.metrics.prometheus.PrometheusReporter [] - Started PrometheusReporter HTTP server on port 9249. 2022-05-03 01:48:23,669 INFO org.apache.flink.runtime.metrics.MetricRegistryImpl [] - Reporting metrics for reporter prom of type org.apache.flink.metrics.prometheus.PrometheusReporter. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Prometheus metrics does not work in 1.15.0 taskmanager
Hi, We found that taskmanager Prometheus endpoint does not work after upgrading from 1.14.3 to 1.15.0. Jobmanager Prometheus endpoint is okay in 1.15.0, so we think the problem is not in image we used. Any idea how to fix this problem? Also, we found the following log in taskmanager, but not jobmanager. Not sure if they are related to this issue. 2022-05-03 01:48:32,839 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'numBytesInLocal'. Metric will not be reported.[10.210.47.134, taskmanager, , , , 8, Shuffle, Netty, Input] 2022-05-03 01:48:32,839 WARN org.apache.flink.metrics.MetricGroup [] - Name collision: Group already contains a Metric with the name 'numBytesInLocalPerSecond'. Metric will not be reported.[10.210.47.134, taskmanager, , , , 8, Shuffle, Netty, Input] ... -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
flink operator sometimes cannot start jobmanager after upgrading
/flink-deployment-name] JobManager is being deployed 2022-04-29 09:41:35,756 o.a.f.k.o.c.FlinkDeploymentController [INFO ][namespace/flink-deployment-name] Reconciliation successfully completed 2022-04-29 09:41:40,759 o.a.f.k.o.c.FlinkDeploymentController [INFO ][namespace/flink-deployment-name] Starting reconciliation 2022-04-29 09:41:40,760 o.a.f.k.o.o.JobObserver[INFO ][namespace/flink-deployment-name] Observing JobManager deployment. Previous status: DEPLOYING 2022-04-29 09:41:40,864 o.a.f.k.o.o.JobObserver[INFO ][namespace/flink-deployment-name] JobManager is being deployed 2022-04-29 09:41:40,864 o.a.f.k.o.c.FlinkDeploymentController [INFO ][namespace/flink-deployment-name] Reconciliation successfully completed 2022-04-29 09:41:45,867 o.a.f.k.o.c.FlinkDeploymentController [INFO ][namespace/flink-deployment-name] Starting reconciliation 2022-04-29 09:41:45,868 o.a.f.k.o.o.JobObserver[INFO ][namespace/flink-deployment-name] Observing JobManager deployment. Previous status: DEPLOYING 2022-04-29 09:41:45,870 o.a.f.k.o.o.JobObserver[INFO ][namespace/flink-deployment-name] JobManager deployment port is ready, waiting for the Flink REST API... 2022-04-29 09:41:45,870 o.a.f.k.o.c.FlinkDeploymentController [INFO ][namespace/flink-deployment-name] Reconciliation successfully completed 2022-04-29 09:41:55,901 o.a.f.k.o.c.FlinkDeploymentController [INFO ][namespace/flink-deployment-name] Starting reconciliation 2022-04-29 09:41:55,902 o.a.f.k.o.o.JobObserver[INFO ][namespace/flink-deployment-name] Observing JobManager deployment. Previous status: DEPLOYED_NOT_READY 2022-04-29 09:41:55,902 o.a.f.k.o.o.JobObserver[INFO ][namespace/flink-deployment-name] JobManager deployment is ready 2022-04-29 09:41:55,902 o.a.f.k.o.o.JobObserver[INFO ][namespace/flink-deployment-name] Observing job status 2022-04-29 09:41:56,294 o.a.f.k.o.o.JobObserver[INFO ][namespace/flink-deployment-name] No job found on cluster yet 2022-04-29 09:41:56,294 o.a.f.k.o.c.FlinkDeploymentController [INFO ][namespace/flink-deployment-name] Reconciliation successfully completed 2022-04-29 09:41:58,443 o.a.f.k.o.c.FlinkDeploymentController [INFO ][namespace/flink-deployment-name] Starting reconciliation 2022-04-29 09:41:58,445 o.a.f.k.o.o.JobObserver[INFO ][namespace/flink-deployment-name] Observing job status 2022-04-29 09:42:10,489 o.a.f.k.o.o.JobObserver [ERROR][namespace/flink-deployment-name] Exception while listing jobs 2022-04-29 09:42:10,489 o.a.f.k.o.o.JobObserver[INFO ][namespace/flink-deployment-name] Observing JobManager deployment. Previous status: READY 2022-04-29 09:42:10,489 o.a.f.k.o.o.JobObserver[INFO ][namespace/flink-deployment-name] JobManager deployment does not exist 2022-04-29 09:42:10,490 o.a.f.k.o.c.FlinkDeploymentController [INFO ][namespace/flink-deployment-name] Reconciliation successfully completed 2022-04-29 09:42:25,521 o.a.f.k.o.c.FlinkDeploymentController [INFO ][namespace/flink-deployment-name] Starting reconciliation 2022-04-29 09:42:25,522 o.a.f.k.o.o.JobObserver[INFO ][namespace/flink-deployment-name] Observing JobManager deployment. Previous status: MISSING 2022-04-29 09:42:25,522 o.a.f.k.o.o.JobObserver[INFO ][namespace/flink-deployment-name] JobManager deployment does not exist 2022-04-29 09:42:25,522 o.a.f.k.o.c.FlinkDeploymentController [INFO ][namespace/flink-deployment-name] Reconciliation successfully completed 2022-04-29 09:42:40,526 o.a.f.k.o.c.FlinkDeploymentController [INFO ][namespace/flink-deployment-name] Starting reconciliation 2022-04-29 09:42:40,527 o.a.f.k.o.o.JobObserver[INFO ][namespace/flink-deployment-name] Observing JobManager deployment. Previous status: MISSING 2022-04-29 09:42:40,527 o.a.f.k.o.o.JobObserver[INFO ][namespace/flink-deployment-name] JobManager deployment does not exist 2022-04-29 09:42:40,527 o.a.f.k.o.c.FlinkDeploymentController [INFO ][namespace/flink-deployment-name] Reconciliation successfully completed ... 2022-04-29 10:00:55,862 o.a.f.k.o.c.FlinkDeploymentController [INFO ][namespace/flink-deployment-name] Starting reconciliation 2022-04-29 10:00:55,863 o.a.f.k.o.o.JobObserver[INFO ][namespace/flink-deployment-name] Observing JobManager deployment. Previous status: MISSING 2022-04-29 10:00:55,863 o.a.f.k.o.o.JobObserver[INFO ][namespace/flink-deployment-name] JobManager deployment does not exist 2022-04-29 10:00:55,863 o.a.f.k.o.c.FlinkDeploymentController [INFO ][namespace/flink-deployment-name] Reconciliation successfully completed [0] https://github.com/apache/flink-kubernetes-operator -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc
Re: how to setup working dir in Flink operator
On Mon, Apr 25, 2022 at 05:15:58PM +0800, Yang Wang wrote: > Using the pod template to configure the local SSD(via host-path or local > PV) is the correct way. > After that, either "java.io.tmpdir" or "process.taskmanager.working-dir" in > CR should take effect. > > Maybe you need to share the complete pod yaml and logs of failed > TaskManager. In our test result, `process.working-dir`, or `process.taskmanager.working-dir` seem to be ignored. Only `io.tmps.dir` can change how Flink stores `flink-io-*`, `localState/`, etc. The following is our test environment: - configuration: io.tmps.dir: /srv/working-dir result: flink-io-*, localState/ are in /srv/working-dir - configuration: process.working-dir: /srv/working-dir result: flink-io-*, localState/ are in /tmp - configuration: process.taskmanager.working-dir: /srv/working-dir result: flink-io-*, localState/ are in /tmp all other configuration are the same. > nit: if the TaskManager pod crashed and was deleted too fast, you could > kill the JobManager first, then you will have enough time to get the logs > and yamls. Thanks for the tip. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
how to setup working dir in Flink operator
Hi, We try to migrate our application from `Flink on standalone Kubernetes` to `Application mode on Flink operator`. However, we cannot configure to use local SSD for RocksDB state successful. Any through? Detail: In original `Flink on standalone Kubernetes`: - set `io.tmp.dirs` to local SSD and Flink uses local SSD for its data. In new `Application mode on Flink operator`: - set `io.tmp.dirs` to local SSD causes taskmanager crashloop. We are still trying to get the exact error message since it disappers very fast. - set `workingDir` in pod template does not work. Flink still uses /tmp to store its data. - set `process.taskmanager.working-dir` does not work. Flink still uses /tmp to store its data. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: Questions about checkpoint retention
On Fri, Jan 28, 2022 at 02:43:11PM +0800, Caizhi Weng wrote: > Chen-Che Huang 于2022年1月27日周四 11:10写道: > > We have two questions for checkpoint retention. > > > >1. When our cron job creates a savepoint called SP, it seems those > >checkpoints created earlier SP still cannot be deleted. We thought the > > new > >checkpoints are generated based on SP and thus old checkpoints before SP > >will be useless. However, it seems the checkpoint mechanism doesn't work > > as > >we thought. Is what we thought correct? > >2. To save storage cost, we’d like to know what checkpoints can be > >deleted. Currently, each version of our app has 10 checkpoints. We wonder > >whether we can delete checkpoints generated for previous versions of our > >apps? Some details below: * We have two GCS buckets to store checkpoints and savepoints, like the following: * gs://flink-checkpoints has no retention configuration. * gs://flink-savepoints has retention 5 days. * The checkpoint configuration are: * state.backend.incremental: true * RETAIN_ON_CANCELLATION * We create savepoint every 4 hours for recovery. * The business requires to have up to 180 days historical data. The questions are: * We want to set retention on gs://flink-checkpoints to reduce storage cost. However, Flink sometimes cannot restore from checkpoint due to missing data when retention is configured on gs://flink-checkpoints. Is there any way to config retention safely for Flink? * We don't use DELETE_ON_CANCELLATION to avoid deleting state data by accidently. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: Time different between checkpoint and savepoint restoration in GCS
On Mon, Oct 25, 2021 at 05:19:32PM +0800, JING ZHANG wrote: > Hi, > > We wonder if this is expected behavior or not? > I think it's expected. You could find more information in document [1]. > Checkpoints and Savepoints differ in their implementation. Checkpoints are > designed to be lightweight and fast. They might (but don’t necessarily have > to) make use of different features of the underlying state backend and try > to restore data as fast as possible. As an example, incremental Checkpoints > with the RocksDB State backend use RocksDB’s internal format instead of > Flink’s native format. This is used to speed up the checkpointing process > of RocksDB that makes them the first instance of a more lightweight > Checkpointing mechanism. On the contrary, Savepoints are designed to focus > more on the portability of data and support any changes made to the job > that make them slightly more expensive to produce and restore. > Besides, Savepoints binary format is different from checkpoint format. > Flink savepoint binary format is unified across all state backends. [2] > That means you can take a Savepoint with one state backend and then restore > it using another. So when restore from Savepoint file, the job need to read > from unified binary format and write into format based on the underlying > state backend. When restore from checkpoint file, this step maybe easier, > for example, load the files directly into underlying state backend. > [1] > https://www.ververica.com/blog/differences-between-savepoints-and-checkpoints-in-flink > [2] > https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint Thanks for the information. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Time different between checkpoint and savepoint restoration in GCS
Hi, We found that our application savepoint restoration time (~ 40 mins) is much slower than checkpoint restoration time (~ 4 mins). We wonder if this is expected behavior or not? Some detail about the environment: * Flink version: 1.14.0 * Persistent storage is GCS, via the following jars: * flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar * gcs-connector-hadoop3-2.2.2-shaded.jar * Unaligned checkpoint is enabled. * The network ingress for checkpoint restoration (~ 750 MiB/s) is much faster than savepoint restoration (~ 50 MiB/s) * Checkpoint and savepoint uses different GCS buckets, not sure if this will affect the throughput of GCS. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: Flink 1.14.0 reactive mode cannot rescale
On Tue, Oct 19, 2021 at 11:51:44AM +0200, David Morávek wrote: > Hi ChangZhuo, > > this seems to be a current limitation of the unaligned checkpoints [1], are > you using any broadcasted streams in your application? > > [1] https://issues.apache.org/jira/browse/FLINK-22815 * Yes, we do have broadcasted streams for configuration. We can change to use aligned checkpoint to see if it is okay. * [0] is marked as fixed in version 1.14.0, so maybe there are other * part that needs to be fixed? [0] https://issues.apache.org/jira/browse/FLINK-22815 -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Flink 1.14.0 reactive mode cannot rescale
) ~[?:?] at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) ~[?:?] at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517) ~[?:?] at org.apache.flink.runtime.checkpoint.RescaleMappings.of(RescaleMappings.java:138) ~[flink-dist_2.12-1.14.0.jar:1.14.0] at org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.getNewToOldSubtasksMapping(SubtaskStateMapper.java:198) ~[flink-dist_2.12-1.14.0.jar:1.14.0] at org.apache.flink.runtime.checkpoint.TaskStateAssignment.getOutputMapping(TaskStateAssignment.java:369) ~[flink-dist_2.12-1.14.0.jar:1.14.0] at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeResultSubpartitionStates(StateAssignmentOperation.java:379) ~[flink-dist_2.12-1.14.0.jar:1.14.0] at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:195) ~[flink-dist_2.12-1.14.0.jar:1.14.0] at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:140) ~[flink-dist_2.12-1.14.0.jar:1.14.0] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1548) ~[flink-dist_2.12-1.14.0.jar:1.14.0] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1460) ~[flink-dist_2.12-1.14.0.jar:1.14.0] at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134) ~[flink-dist_2.12-1.14.0.jar:1.14.0] at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.createExecutionGraphAndRestoreState(AdaptiveScheduler.java:993) ~[flink-dist_2.12-1.14.0.jar:1.14.0] at org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$createExecutionGraphAndRestoreStateAsync$25(AdaptiveScheduler.java:983) ~[flink-dist_2.12-1.14.0.jar:1.14.0] at org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:57) ~[flink-dist_2.12-1.14.0.jar:1.14.0] at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?] at java.lang.Thread.run(Thread.java:834) ~[?:?] 2021-10-18 09:31:14,426 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting StandaloneApplicationClusterEntryPoint down with application status FAILED. Diagnostics null. 2021-10-18 09:31:14,427 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: Inconsistent parallelism in web UI when using reactive mode
On Tue, Oct 12, 2021 at 10:41:24AM +0200, Chesnay Schepler wrote: > This is a known and documented > <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/#limitations-1> > limitation of the AdaptiveScheduler. There is no concrete date yet for when > it will be fixed. Thanks for the information. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Inconsistent parallelism in web UI when using reactive mode
Hi, We found that parallelism in web UI are inconsistent when using reactive mode. As in attachment, in overview page, all parallelism values are 1, which is not correct one. When clicking operator for detail information, the parallelism in detail information is the correct one. Is it possible to fix this inconsistent so that it would not confused engineer when deploying Flink application. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: 1.13.1 jobmanager annotations by pod template does not work
On Tue, Jun 15, 2021 at 04:40:00PM +0800, Yang Wang wrote: > Yes. It is the by-design behavior. Because the pod template is only > applicable to the "pod", not other resources(e.g. deployment, configmap). > > Currently, the JobManager pod is managed by deployment and the naked > TaskManager pods are managed by Flink ResourceManager. > This is the root cause which makes the difference. Thanks for the clarification. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: 1.13.1 jobmanager annotations by pod template does not work
On Tue, Jun 15, 2021 at 04:22:07PM +0800, Yang Wang wrote: > The annotations, and labels in the pod template will only apply to the > JobManager pod, not the JobManager deployment. Thanks for the information. Is this behavior by design? In document, it looks like there is no different between jobmanager and taskmanager when handling annotations and labels [0]. [0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#pod-template -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: 1.13.1 jobmanager annotations by pod template does not work
On Fri, Jun 11, 2021 at 11:19:09PM +0800, Yang Wang wrote: > Could you please share your pod template and the value of > kubernetes.jobmanager.annotations? > > Usually the annotations of pod template and flink config options will be > merged. And the flink config > options has higher priority if you are specifying same name annotation. > > I have verified in minikube and it could take effect as expected. Hi, There are other finding for this issue: * For jobanager: * annotations, and labels in pod template do not work. * annotations, and labels in -Dkubernetes.jobmanager.* work. * For taskmanager: * annotations, and labels in pod template work The following is jobmanager pod template: apiVersion: batch/v1 kind: Pod metadata: labels: app: jobmanager helm.sh/chart: app.kubernetes.io/name: app.kubernetes.io/instance: app.kubernetes.io/version: app.kubernetes.io/managed-by: Helm annotations: rt.prometheus.io/scrape: 'true' rt.prometheus.io/path: '/' rt.prometheus.io/port: '9249' The following is pod created as jobmanager: Name: Namespace: Priority: 200 Priority Class Name: medium Node: Start Time: Fri, 11 Jun 2021 23:38:52 +0800 Labels: app= component=jobmanager pod-template-hash=55846fd8f7 type=flink-native-kubernetes Annotations: -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: NPE when restoring from savepoint in Flink 1.13.1 application
On Thu, Jun 10, 2021 at 07:10:45PM +0200, Roman Khachatryan wrote: > Hi ChangZhuo, > > Thanks for reporting, it looks like a bug. > I've opened a ticket for that [1]. > > [1] > https://issues.apache.org/jira/browse/FLINK-22966 Thanks for the help. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
1.13.1 jobmanager annotations by pod template does not work
Hi, We found that jobmanager annotations defined by pod template does not work. However, annotations defined by kubernetes.jobmanager.annotations [0] This behavior is different from document [1]. [0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#kubernetes-jobmanager-annotations [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#pod-template -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
NPE when restoring from savepoint in Flink 1.13.1 application
(CompletableFuture.java:506) ~[?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) ~[?:?] ... 6 more Caused by: java.util.concurrent.CompletionException: java.lang.NullPointerException at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[?:?] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) ~[?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) ~[?:?] ... 6 more Caused by: java.lang.NullPointerException at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reAssignSubKeyedStates(StateAssignmentOperation.java:300) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.lambda$reDistributeKeyedStates$0(StateAssignmentOperation.java:260) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at java.util.HashMap.forEach(HashMap.java:1336) ~[?:?] at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeKeyedStates(StateAssignmentOperation.java:252) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:196) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:139) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1642) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:138) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:190) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:120) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:317) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) ~[flink-dist_2.12-1.13.1.jar:1.13.1] at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist_2.12-1.13.1.jar:1.13.1] ... 7 more 2021-06-09 13:08:59,852 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Exception occurred in REST handler: Could not execute application. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: Customer operator in BATCH execution mode
On Wed, May 26, 2021 at 01:03:53PM +0200, Dawid Wysakowicz wrote: > Hi, > > No there is no API in the operator to know which mode it works in. We > aim to have separate operators for both modes if required. You can check > e.g. how we do it in KeyedBroadcastStateTransformationTranslator[1]. Thanks for the information. We implement this according to Piotrek's suggestion. > > Yes, it should be possible to register a timer for Long.MAX_WATERMARK if > you want to apply a transformation at the end of each key. You could > also use the reduce operation (DataStream#keyBy#reduce) in BATCH mode. According to [0], timer time is irrelevant since timer will be triggered at the end of time right? If that is the case, we can use the same code for both streaming and batch mode. [0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/ > > A side note, I don't fully get what you mean by "build state for our > streaming application". Bear in mind though you cannot take a savepoint > from a job running in the BATCH execution mode. Moreover it uses a > different kind of StateBackend. Actually a dummy one, which just > imitates a real state backend. What we plan to do here is: 1. Load configuration from broadcast event (custom source backed by REST API). 2. Load historical events as batch mode input (From GCS). 3. Use timer to trigger output so that the following will happen: a. Serialize keyed states into JSON. b. Output to Kafka. c. Streaming application consumes data from Kafka, and update its keyed states according to it. We hope that in this way, we can rebuild our states with almost the same code in streaming. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Customer operator in BATCH execution mode
Hi, Currently, we want to use batch execution mode [0] and historical data to build state for our streaming application. Due to different between batch & streaming mode, we want to check current execution mode in custom operator. So our question is: * Is there any API for custom operator to know current execution mode (batch or streaming)? * If we want to output after all elements of one specific key are processed, can we just use timer since timer is triggered at the end of input [0]? [0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/ -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: Flink 1.13.0 reactive mode: Job stop and cannot restore from checkpoint
On Mon, May 17, 2021 at 01:22:16PM +0200, Arvid Heise wrote: > Hi ChangZhuo, > > This looks indeed like a bug. I created FLINK-22686 [1] to track it. It > looks unrelated to reactive mode to me and more related to unaligned > checkpoints. So, you can try out reactive mode with aligned checkpoints. > > If you can provide us with the topology, we can also fix it soonish: 1.13.1 > is on the horizon because of a license issue. > > [1] https://issues.apache.org/jira/browse/FLINK-22686 Hi Arvid, We have upload topology to [0], please help to see if that is enough for debugging, thanks. [0] https://issues.apache.org/jira/browse/FLINK-22686 -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: Flink-pod-template-issue
On Mon, May 17, 2021 at 01:29:55PM +0530, Priyanka Manickam wrote: > Hi All, > > Do we required to add any image for flink-main-container in > pod-template.yaml file because it giving an error saying > "spec.containers(0).image value required. > > > Could anyone help with this please Hi, You need to specific your image via `kubernetes.container.image` configuration as described in [0]. The image shall be Flink official one + your application jar so that you can specific your jar path as local:/// when submitting to Kubernetes. [0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#pod-template -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: How to setup HA properly with Kubernetes Standalone Application Cluster
On Fri, May 14, 2021 at 02:00:41PM +0200, Fabian Paul wrote: > Hi Chen, > > Can you tell us a bit more about the job you are using? > The intended behaviour you are seeking can only be achieved > If the Kubernetes HA Services are enabled [1][2]. > Otherwise the job cannot recall past checkpoints. Hi Fabian, Thanks for the response, the following is our cluster setup. The HA setting we used is the same one as we used to used in session cluster, so HA shall work. The following is setting we used for HA: high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: gs://a-bucket/recovery kubernetes.cluster-id: a-cluster-id kubernetes.context: a-context kubernetes.namespace: a-namespace The following is Job we used for standalone application cluster. Basically the content is almost the same as we used to used in session cluster, except that it is Job, not Deployment, and the args are different: apiVersion: batch/v1 kind: Job spec: template: spec: restartPolicy: OnFailure containers: - name: jobmanager env: - name: FLINK_CONF_FILE value: /opt/flink/conf/flink-conf.yaml - name: HADOOP_CLASSPATH value: /opt/flink/conf:/opt/hadoop-3.1.1/share/hadoop/common/lib/*:/opt/hadoop-3.1.1/share/hadoop/common/*:/opt/hadoop-3.1.1/share/hadoop/hdfs:/opt/hadoop-3.1.1/share/hadoop/hdfs/lib/*:/opt/hadoop-3.1.1/share/hadoop/hdfs/*:/opt/hadoop-3.1.1/share/hadoop/mapreduce/lib/*:/opt/hadoop-3.1.1/share/hadoop/mapreduce/*:/opt/hadoop-3.1.1/share/hadoop/yarn:/opt/hadoop-3.1.1/share/hadoop/yarn/lib/*:/opt/hadoop-3.1.1/share/hadoop/yarn/*:/contrib/capacity-scheduler/*.jar - name: HADOOP_CONF_DIR value: /opt/flink/conf - name: GOOGLE_APPLICATION_CREDENTIALS value: /opt/flink/conf/gcp-service-account.json args: - standalone-job - --fromSavepoint - gs://a-savepoint - --job-classname - com.example.my.application -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
How to setup HA properly with Kubernetes Standalone Application Cluster
Hi, Recently, we changed our deployment to Kubernetes Standalone Application Cluster for reactive mode. According to [0], we use Kubernetes Job with --fromSavepoint to upgrade our application without losing state. The Job config is identical to the one in document. However, we found that in this setup, if there is a failure in jobmanager, Kubernetes will restart the jobmanager with original savepoint specific in `--fromSavepoint`, instead of the latest checkpoint. It causes problem when it is a long running job. Any idea for how to make Flink restoring from latest checkpoint when it is jobmanager failure in Kubernetes Standalone Application Cluster. [0] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#deploy-application-cluster -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Flink 1.13.0 reactive mode: Job stop and cannot restore from checkpoint
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: How to specific key serializer
On Wed, Mar 31, 2021 at 05:33:19PM +0800, Tzu-Li (Gordon) Tai wrote: > You can try using TypeInfo annotations to specify a TypeInformationFactory > for your key class [1]. > This allows you to "plug-in" the TypeInformation extracted by Flink for a > given class. In that custom TypeInformation, you should let it return the > correct serializer. Hi Gordon, Thanks for the tip. We have solve the problem by specific TypeInformation in readKeyedState. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
How to specific key serializer
Hi, Currently we use sbt-avrohugger [0] to generate key class for keyed state. The key class generated by sbt-avrohugger is both case class, and AVRO specific record. However, in the following scenarons, Flink uses different serializers: * In streaming application, Flink uses CaseClassSerializer for key class. * In state processor API application, Flink uses AvroSerializer for key class. Since they use different serializers for key, they are not compatible. Is there any way to specific key serializer so that both applications use the same serializer? [0] https://github.com/julianpeeters/sbt-avrohugger -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: Checkpoint fail due to timeout
On Wed, Mar 17, 2021 at 05:45:38AM +, Alexey Trenikhun wrote: > In my opinion looks similar. Were you able to tune-up Flink to make it work? > I'm stuck with it, I wanted to scale up hoping to reduce backpressure, but to > rescale I need to take savepoint, which never completes (at least takes > longer than 3 hours). You can use aligned checkpoint to scala your job. Just restarting from checkpoint with the same jar file, and new parallelism shall do the trick. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: Checkpoint fail due to timeout
On Tue, Mar 16, 2021 at 02:32:54AM +, Alexey Trenikhun wrote: > Hi Roman, > I took thread dump: > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on > java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: > digital-itx-eastus2 -> Filter (6/6)#0" Id=202 > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) > - blocked on java.lang.Object@5366a0e2 > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 > WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef > at sun.misc.Unsafe.park(Native Method) > - waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707) > at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) > at > java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291) > > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple > threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on > different Objects. Hi, This call stack is similar to our case as described in [0]. Maybe they are the same issue? [0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: How to debug checkpoint/savepoint stuck in Flink 1.12.2
On Thu, Mar 11, 2021 at 02:14:32PM +0100, Arvid Heise wrote: > Hi ChangZhuo, > > Did you upgrade to Flink 1.12.2 and change the settings at the time? If so, > could you maybe reset the settings to the old values on Flink 1.12.2 and > check if the job still gets stuck? Especially, turning off unaligned > checkpoints (UC) should clarify if it's a general issue in Flink 1.12.2 or > with UC. > > If it's indeed an issue with UC, then it would help to get the debug logs > in particular for the package > org.apache.flink.streaming.runtime.io.checkpointing. You could add the > following to your log4js.properties (set general log level to INFO). > > logger.checkpointing.name = > org.apache.flink.streaming.runtime.io.checkpointing > logger.checkpointing.level = DEBUG * Thanks for this information, we are working on this one, will reply when we get log. * Also, we got the stack track when checkpoint stuck, please let us know if you need full trace. * The stuck task in UI is KafkaProducer -> ProcessFunction 128 * The following is BLOCKED thread for Source: KafkaProducer -> ProcessFunction (129/140)#2 "Source: KafkaProducer -> ProcessFunction (129/140)#2" #66336 prio=5 os_prio=0 cpu=582.01ms elapsed=5079.15s tid=0x7feb32717000 nid=0x9696 waiting for monitor entry [0x7feb28b61000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) - waiting to lock <0x00058e8c5070> (a java.lang.Object) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(java.base@11.0.8/Thread.java:834) ps: * The original UID is redacted by their underlying type. * It looks like subtask id in UI is off-by-one in stacktrace. -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debconf,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
How to debug checkpoint/savepoint stuck in Flink 1.12.2
Hi, We have updated our Flink applications to 1.12.2, alone with the following modifications to improve its performance: - Use unaligned checkpoint - Change the following fs config - state.backend.fs.memory-threshold: 1048576 - state.backend.fs.write-buffer-size: 4194304 However, now our Flink applications will occasionally stuck when doing unaligned checkpoint or savepoint. The following are operators that stuck in our cases. - Kafka source connector. - BroadcastProcessFunction with data input, and broadcasted configuration. Also, when it is stuck, Flink also stops to consume any data. Since these operators do not have many data to be stored in checkpoint/savepoint, we wonder, how can we debug this problem? -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debconf,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Re: Cannot connect to queryable state proxy
On Thu, Feb 04, 2021 at 04:26:42PM +0800, ChangZhuo Chen (陳昌倬) wrote: > Hi, > > We have problem connecting to queryable state client proxy as described > in [0]. Any help is appreciated. > > * The port 6125 is opened in taskmanager pod. > > ``` > root@-654b94754d-2vknh:/tmp# ss -tlp > StateRecv-Q Send-Q Local > Address:Port Peer Address:Port Process > LISTEN 01024 > 0.0.0.0:46561 0.0.0.0:* > LISTEN 03 > 0.0.0.0:9249 0.0.0.0:* > LISTEN 01024 > 0.0.0.0:6122 0.0.0.0:* > LISTEN 01024 > 10.200.11.3:9067 0.0.0.0:* > LISTEN 01024 > 10.200.11.3:6125 0.0.0.0:* > LISTEN 01024 > 0.0.0.0:38607 0.0.0.0:* > ``` The problem is that Flink only listens 10.200.11.3:6125 for queryable state client proxy, so we need to use correct network to connect to it. Is there any way we can make Flink to listen to 0.0.0.0 for queryable state client proxy? -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debconf,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Cannot connect to queryable state proxy
Hi, We have problem connecting to queryable state client proxy as described in [0]. Any help is appreciated. The following is our setup: * Flink 1.12.1 * Standalone Kubernetes * Related config in flink-conf.yaml ``` queryable-state.enable: true queryable-state.proxy.ports: 6125 ``` * taskmanager log ``` 2021-02-04 03:22:57,650 INFO org.apache.flink.runtime.io.network.netty.NettyServer[] - Successful initialization (took 35 ms). Listening on SocketAddress /0.0.0.0:43665. 2021-02-04 03:22:57,656 INFO org.apache.flink.runtime.taskexecutor.KvStateService [] - Starting the kvState service and its components. 2021-02-04 03:22:57,672 INFO org.apache.flink.queryablestate.server.KvStateServerImpl [] - Started Queryable State Server @ /10.200.18.4:9067. 2021-02-04 03:22:57,679 INFO org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl [] - Started Queryable State Proxy Server @ /10.200.18.4:6125. 2021-02-04 03:22:57,698 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at akka://flink/user/rpc/taskmanager_0 . ``` * The port 6125 is opened in taskmanager pod. ``` root@-654b94754d-2vknh:/tmp# ss -tlp StateRecv-Q Send-Q Local Address:Port Peer Address:Port Process LISTEN 01024 0.0.0.0:46561 0.0.0.0:* LISTEN 03 0.0.0.0:9249 0.0.0.0:* LISTEN 01024 0.0.0.0:6122 0.0.0.0:* LISTEN 01024 10.200.11.3:9067 0.0.0.0:* LISTEN 01024 10.200.11.3:6125 0.0.0.0:* LISTEN 01024 0.0.0.0:38607 0.0.0.0:* ``` * However, we always get the following error when using queryable API: ``` Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:6125 at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) at com.appier.rt.short_term_score.QueryCalculateUserST$.printMapState(QueryCalculateUserST.scala:44) at com.appier.rt.short_term_score.QueryCalculateUserST$.main(QueryCalculateUserST.scala:82) at com.appier.rt.short_term_score.QueryCalculateUserST.main(QueryCalculateUserST.scala) Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:6125 Caused by: java.net.ConnectException: Connection refused at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.base/java.lang.Thread.run(Thread.java:834) ``` * `nc` in taskmanager itself also get the same error: ``` root@:/tmp# nc -vz localhost 6125 nc: connect to localhost port 6125 (tcp) failed: Connection refused nc: connect to localhost port 6125 (tcp) failed: Cannot assign requested address ``` [0] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debconf,debian}.org gttp://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature
Cannot start from savepoint using Flink 1.12 in standalone Kubernetes + Kubernetes HA
Hi, We cannot start job from savepoint (created by Flink 1.12, Standalone Kubernetes + zookeeper HA) in Flink 1.12, Standalone Kubernetes + Kubernetes HA. The following is the exception that stops the job. Caused by: java.util.concurrent.CompletionException: org.apache.flink.kubernetes.kubeclient.resources.KubernetesException: Cannot retry checkAndUpdateConfigMap with configMap name-51e5afd90227d537ff442403d1b279da-jobmanager-leader because it does not exist. Cluster can start new job from scratch, so we think cluster configuration is good. The following is HA related config: high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: gs://some/path/recovery kubernetes.cluster-id: cluster-name kubernetes.context: kubernetes-context kubernetes.namespace: kubernetes-namespace -- ChangZhuo Chen (陳昌倬) czchen@{czchen,debconf,debian}.org http://czchen.info/ Key fingerprint = BA04 346D C2E1 FE63 C790 8793 CC65 B0CD EC27 5D5B signature.asc Description: PGP signature