Re: Flink 1.17 upgrade issue when using azure storage account for checkpoints/savepoints

2023-03-25 Thread
On Sat, Mar 25, 2023 at 02:01:24PM +0530, Jessy Ping wrote:
> Root cause:  Caused by: java.util.concurrent.CompletionException:
> java.lang.RuntimeException:  java.lang.ClassNotFoundException: Class
> org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found

We have similar error with Google Cloud Storage, and there is workaround
in slack thread 
https://apache-flink.slack.com/archives/C03G7LJTS2G/p1679320815257449


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15

2022-06-07 Thread
On Tue, Jun 07, 2022 at 03:26:43PM +0800, czc...@gmail.com wrote:
> On Mon, Jun 06, 2022 at 10:42:08AM +0800, Shengkai Fang wrote:
> > Hi. In my experience, the step to debug classloading problems are as
> > follows:
> 
> Thanks for the help. We get the following log when using
> `-verbose:class`:
> 
>   [2.074s][info][class,load] org.apache.hadoop.fs.FileSystem source: 
> file:/opt/flink/opt/flink-s3-fs-hadoop-1.15.0.jar
>   ...
>   [8.094s][info][class,load] 
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem source: 
> file:/opt/flink/opt/flink-gs-fs-hadoop-1.15.0.jar
> 
> 
> It looks like application uses hadoop.fs.FileSystem from
> flink-s3-fs-hadoop-1.15.0.jar, and use GoogleHadoopFileSystem from
> flink-gs-fs-hadoop-1.15.0.jar, and they are incompatible. Since we run
> Flink in both AWS and GCP, our base image contains both plugins at the
> same time. Any idea how to workaround it?
> 
> We also try to set `classloader.resolve-order: parent-first`. However,
> we got another error causing by library conflict between Flink and our
> application:
> 
>   Caused by: com.fasterxml.jackson.databind.JsonMappingException: Scala 
> module 2.11.3 requires Jackson Databind version >= 2.11.0 and < 2.12.0

We solve the problem by moving plugins into correct plugins directory.
Thanks for the help from slack.



-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15

2022-06-04 Thread
On Thu, Jun 02, 2022 at 06:23:20PM +0800, Qingsheng Ren wrote:
> Thanks for the input ChangZhuo.
> 
> Could you check if the configuration "classloader.resolve-order” is
> set to “parent-first” in your Flink 1.14 cluster? I didn’t notice any
> changes related to the user code classloader in Flink 1.15. If my
> assumption is correct, you package the gcs-connector into your job JAR
> but the Hadoop FS dependencies are not included, so
> org.apache.hadoop.fs.FileSystem is loaded by app classloader from
> flink-s3-fs-hadoop.jar under the lib of Flink, but
> GoogleHadoopFileSystem is loaded by user code classloader from job
> JAR. Setting the resolve order to "parent-first" could bypass the
> issue [1] so I assume you have this config in 1.14 but not in 1.15.
> Please forgive me if I understand incorrectly!

No, we do not config classloader.resolve-order in both 1.14, and 1.15
setup. We will check if "parent-first" can solve the problem, thanks for
the advise.


Also, in 1.14, we include the following jars into /opt/flink/lib to
support GCS:

* flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
* gcs-connector-hadoop3-2.2.2-shaded.jar

In 1.15, we add flink-gs-fs-hadoop-1.15.0.jar to /opt/flink/lib to
support GCS. Maybe this different causes problem?


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15

2022-06-01 Thread
15.0.jar:1.15.0]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291)
 ~[flink-dist-1.15.0.jar:1.15.0]
... 13 more
Caused by: java.lang.ClassCastException: class 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class 
org.apache.hadoop.fs.FileSystem 
(com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and 
org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374) 
~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) 
~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) 
~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) 
~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) 
~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) 
~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
.listPath() ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown 
Source) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source) ~[?:?]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist-1.15.0.jar:1.15.0]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291)
 ~[flink-dist-1.15.0.jar:1.15.0]
... 13 more
2022-06-02 00:25:57,830 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting 
KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. 
Diagnostics Cluster entrypoint has been closed externally..

> 
> Best regards, 
> 
> Qingsheng
> 
> > On Jun 2, 2022, at 09:08, ChangZhuo Chen (陳昌倬)  wrote:
> > 
> > Hi,
> > 
> > We use GCS as storage, and have the following functions to list files in
> > GCS path for Flink batch mode to buidl states:
> > 
> > 
> >  def listPath(p: String): Seq[String] = {
> >val path = new Path(p)
> >val fs = path.getFileSystem(new Configuration())
> >fs.listStatus(path) match {
> >  case null => Seq()
> >  case xs => xs.map(_.getPath.toString)
> >}
> >  }
> > 
> > This function works fine in Flink 1.14. However, in Flink 1.15, we have
> > the following exception:
> > 
> >  Caused by: java.lang.ClassCastException: class 
> > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to 
> > class org.apache.hadoop.fs.FileSystem 
> > (com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and 
> > org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
> >  at 
> > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374) 
> > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) 
> > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >  at 
> > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) 
> > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) 
> > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) 
> > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >  at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) 
> > ~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
> >  at .listPath() ~[?:?]
> > 
> > We found a similar issue in Spark [0]. However, we are not sure if it is
> > related, and if it is, how can we apply this fix. Any help is welcome.
> > 
> > 
> > [0] https://issues.apache.org/jira/browse/SPARK-9206
> > 
> > 
> > -- 
> > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> > http://czchen.info/
> > Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
> 

-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Cannot cast GoogleHadoopFileSystem to hadoop.fs.FileSystem to list file in Flink 1.15

2022-06-01 Thread
Hi,

We use GCS as storage, and have the following functions to list files in
GCS path for Flink batch mode to buidl states:


  def listPath(p: String): Seq[String] = {
val path = new Path(p)
val fs = path.getFileSystem(new Configuration())
fs.listStatus(path) match {
  case null => Seq()
  case xs => xs.map(_.getPath.toString)
}
  }

This function works fine in Flink 1.14. However, in Flink 1.15, we have
the following exception:

  Caused by: java.lang.ClassCastException: class 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to class 
org.apache.hadoop.fs.FileSystem 
(com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem and 
org.apache.hadoop.fs.FileSystem are in unnamed module of loader 'app')
  at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374) 
~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) 
~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
  at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) 
~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) 
~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) 
~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
  at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) 
~[flink-s3-fs-hadoop-1.15.0.jar:1.15.0]
  at .listPath() ~[?:?]

We found a similar issue in Spark [0]. However, we are not sure if it is
related, and if it is, how can we apply this fix. Any help is welcome.


[0] https://issues.apache.org/jira/browse/SPARK-9206


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-04 Thread
On Wed, May 04, 2022 at 01:53:01PM +0200, Chesnay Schepler wrote:
> Disabling the kafka metrics _should_ work.

Is there anyway to disable Kafka metrics when using low level process
function?


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-03 Thread
On Tue, May 03, 2022 at 10:32:03AM +0200, Peter Schrott wrote:
> Hi!
> 
> I also discovered problems with the PrometheusReporter on Flink 1.15.0,
> coming from 1.14.4. I already consulted the mailing list:
> https://lists.apache.org/thread/m8ohrfkrq1tqgq7lowr9p226z3yc0fgc
> I have not found the underlying problem or a solution to it.
> 
> Actually, after re-checking, I see the same log WARNINGS as
> ChangZhou described.
> 
> As I described, it seems to be an issue with my job. If no job, or an
> example job runs on the taskmanager the basic metrics work just fine. Maybe
> ChangZhou can confirm this?
> 
> @ChangZhou what's your job setup? I am running a streaming SQL job, but
> also using data streams API to create the streaming environment and from
> that the table environment and finally using a StatementSet to execute
> multiple SQL statements in one job.


We are running a streaming application with low level API with
Kubernetes operator FlinkDeployment.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-03 Thread
On Tue, May 03, 2022 at 10:28:18AM +0200, Chesnay Schepler wrote:
> Is there any warning in the logs containing "Error while handling metric"?

No, we don't find any "Error while handling metric"


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-03 Thread
On Tue, May 03, 2022 at 01:00:42AM -0700, Mason Chen wrote:
> Hi ChangZhou,
> 
> The warning log indicates that the metric was previously defined and so the
> runtime is handling the "duplicate" metric by ignoring it. This is
> typically a benign message unless you rely on this metric. Is it possible
> that you are using the same task name for different tasks? It would be
> defined by the `.name(...)` API in your job graph instantiation.
> 
> Can you clarify what it means that your endpoint isn't working--some
> metrics missing, endpoint is timing out, etc.? Also, can you confirm from
> logs that the PrometheusReporter was created properly?

Endpoint isn't working means we got empty reply from Prometheus
endpoint. The following is our testing for taskmanager Prometheus
endpoint.

curl localhost:9249
curl: (52) Empty reply from server

We have the following log in taskmanager, so PrometheusReporter was
created properly.

2022-05-03 01:48:16,678 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: metrics.reporter.prom.class, 
org.apache.flink.metrics.prometheus.PrometheusReporter
...
2022-05-03 01:48:23,665 INFO  
org.apache.flink.metrics.prometheus.PrometheusReporter   [] - Started 
PrometheusReporter HTTP server on port 9249.
2022-05-03 01:48:23,669 INFO  
org.apache.flink.runtime.metrics.MetricRegistryImpl  [] - Reporting 
metrics for reporter prom of type 
org.apache.flink.metrics.prometheus.PrometheusReporter.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Prometheus metrics does not work in 1.15.0 taskmanager

2022-05-02 Thread
Hi,

We found that taskmanager Prometheus endpoint does not work after
upgrading from 1.14.3 to 1.15.0. Jobmanager Prometheus endpoint is okay
in 1.15.0, so we think the problem is not in image we used. Any idea how
to fix this problem?


Also, we found the following log in taskmanager, but not jobmanager. Not
sure if they are related to this issue.

2022-05-03 01:48:32,839 WARN  org.apache.flink.metrics.MetricGroup  
   [] - Name collision: Group already contains a Metric with the 
name 'numBytesInLocal'. Metric will not be reported.[10.210.47.134, 
taskmanager, , , , 8, Shuffle, Netty, Input]
2022-05-03 01:48:32,839 WARN  org.apache.flink.metrics.MetricGroup  
   [] - Name collision: Group already contains a Metric with the 
name 'numBytesInLocalPerSecond'. Metric will not be reported.[10.210.47.134, 
taskmanager, , , , 8, Shuffle, Netty, Input]
...


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


flink operator sometimes cannot start jobmanager after upgrading

2022-04-29 Thread
/flink-deployment-name] JobManager is being deployed
2022-04-29 09:41:35,756 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][namespace/flink-deployment-name] Reconciliation successfully completed
2022-04-29 09:41:40,759 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][namespace/flink-deployment-name] Starting reconciliation
2022-04-29 09:41:40,760 o.a.f.k.o.o.JobObserver[INFO 
][namespace/flink-deployment-name] Observing JobManager deployment. Previous 
status: DEPLOYING
2022-04-29 09:41:40,864 o.a.f.k.o.o.JobObserver[INFO 
][namespace/flink-deployment-name] JobManager is being deployed
2022-04-29 09:41:40,864 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][namespace/flink-deployment-name] Reconciliation successfully completed
2022-04-29 09:41:45,867 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][namespace/flink-deployment-name] Starting reconciliation
2022-04-29 09:41:45,868 o.a.f.k.o.o.JobObserver[INFO 
][namespace/flink-deployment-name] Observing JobManager deployment. Previous 
status: DEPLOYING
2022-04-29 09:41:45,870 o.a.f.k.o.o.JobObserver[INFO 
][namespace/flink-deployment-name] JobManager deployment port is ready, waiting 
for the Flink REST API...
2022-04-29 09:41:45,870 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][namespace/flink-deployment-name] Reconciliation successfully completed
2022-04-29 09:41:55,901 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][namespace/flink-deployment-name] Starting reconciliation
2022-04-29 09:41:55,902 o.a.f.k.o.o.JobObserver[INFO 
][namespace/flink-deployment-name] Observing JobManager deployment. Previous 
status: DEPLOYED_NOT_READY
2022-04-29 09:41:55,902 o.a.f.k.o.o.JobObserver[INFO 
][namespace/flink-deployment-name] JobManager deployment is ready
2022-04-29 09:41:55,902 o.a.f.k.o.o.JobObserver[INFO 
][namespace/flink-deployment-name] Observing job status
2022-04-29 09:41:56,294 o.a.f.k.o.o.JobObserver[INFO 
][namespace/flink-deployment-name] No job found on cluster yet
2022-04-29 09:41:56,294 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][namespace/flink-deployment-name] Reconciliation successfully completed
2022-04-29 09:41:58,443 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][namespace/flink-deployment-name] Starting reconciliation
2022-04-29 09:41:58,445 o.a.f.k.o.o.JobObserver[INFO 
][namespace/flink-deployment-name] Observing job status
2022-04-29 09:42:10,489 o.a.f.k.o.o.JobObserver
[ERROR][namespace/flink-deployment-name] Exception while listing jobs
2022-04-29 09:42:10,489 o.a.f.k.o.o.JobObserver[INFO 
][namespace/flink-deployment-name] Observing JobManager deployment. Previous 
status: READY
2022-04-29 09:42:10,489 o.a.f.k.o.o.JobObserver[INFO 
][namespace/flink-deployment-name] JobManager deployment does not exist
2022-04-29 09:42:10,490 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][namespace/flink-deployment-name] Reconciliation successfully completed
2022-04-29 09:42:25,521 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][namespace/flink-deployment-name] Starting reconciliation
2022-04-29 09:42:25,522 o.a.f.k.o.o.JobObserver[INFO 
][namespace/flink-deployment-name] Observing JobManager deployment. Previous 
status: MISSING
2022-04-29 09:42:25,522 o.a.f.k.o.o.JobObserver[INFO 
][namespace/flink-deployment-name] JobManager deployment does not exist
2022-04-29 09:42:25,522 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][namespace/flink-deployment-name] Reconciliation successfully completed
2022-04-29 09:42:40,526 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][namespace/flink-deployment-name] Starting reconciliation
2022-04-29 09:42:40,527 o.a.f.k.o.o.JobObserver[INFO 
][namespace/flink-deployment-name] Observing JobManager deployment. Previous 
status: MISSING
2022-04-29 09:42:40,527 o.a.f.k.o.o.JobObserver[INFO 
][namespace/flink-deployment-name] JobManager deployment does not exist
2022-04-29 09:42:40,527 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][namespace/flink-deployment-name] Reconciliation successfully completed
...

2022-04-29 10:00:55,862 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][namespace/flink-deployment-name] Starting reconciliation
2022-04-29 10:00:55,863 o.a.f.k.o.o.JobObserver[INFO 
][namespace/flink-deployment-name] Observing JobManager deployment. Previous 
status: MISSING
2022-04-29 10:00:55,863 o.a.f.k.o.o.JobObserver[INFO 
][namespace/flink-deployment-name] JobManager deployment does not exist
2022-04-29 10:00:55,863 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][namespace/flink-deployment-name] Reconciliation successfully completed


[0] https://github.com/apache/flink-kubernetes-operator


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc

Re: how to setup working dir in Flink operator

2022-04-25 Thread
On Mon, Apr 25, 2022 at 05:15:58PM +0800, Yang Wang wrote:
> Using the pod template to configure the local SSD(via host-path or local
> PV) is the correct way.
> After that, either "java.io.tmpdir" or "process.taskmanager.working-dir" in
> CR should take effect.
> 
> Maybe you need to share the complete pod yaml and logs of failed
> TaskManager.

In our test result, `process.working-dir`, or `process.taskmanager.working-dir`
seem to be ignored. Only `io.tmps.dir` can change how Flink stores
`flink-io-*`, `localState/`, etc.


The following is our test environment:

- configuration:
io.tmps.dir: /srv/working-dir
  result:
flink-io-*, localState/ are in /srv/working-dir

- configuration:
process.working-dir: /srv/working-dir
  result:
flink-io-*, localState/ are in /tmp

- configuration:
process.taskmanager.working-dir: /srv/working-dir
  result:
flink-io-*, localState/ are in /tmp

all other configuration are the same.


> nit: if the TaskManager pod crashed and was deleted too fast, you could
> kill the JobManager first, then you will have enough time to get the logs
> and yamls.

Thanks for the tip.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


how to setup working dir in Flink operator

2022-04-24 Thread
Hi,

We try to migrate our application from `Flink on standalone Kubernetes`
to `Application mode on Flink operator`. However, we cannot configure to
use local SSD for RocksDB state successful. Any through?


Detail:

In original `Flink on standalone Kubernetes`:
- set `io.tmp.dirs` to local SSD and Flink uses local SSD for its data.

In new `Application mode on Flink operator`:
- set `io.tmp.dirs` to local SSD causes taskmanager crashloop. We are
  still trying to get the exact error message since it disappers very
  fast.
- set `workingDir` in pod template does not work. Flink still uses /tmp
  to store its data.
- set `process.taskmanager.working-dir` does not work. Flink still uses
  /tmp to store its data.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Questions about checkpoint retention

2022-02-05 Thread
On Fri, Jan 28, 2022 at 02:43:11PM +0800, Caizhi Weng wrote:
> Chen-Che Huang  于2022年1月27日周四 11:10写道:
> > We have two questions for checkpoint retention.
> >
> >1. When our cron job creates a savepoint called SP, it seems those
> >checkpoints created earlier SP still cannot be deleted. We thought the 
> > new
> >checkpoints are generated based on SP and thus old checkpoints before SP
> >will be useless. However, it seems the checkpoint mechanism doesn't work 
> > as
> >we thought. Is what we thought correct?
> >2. To save storage cost, we’d like to know what checkpoints can be
> >deleted. Currently, each version of our app has 10 checkpoints. We wonder
> >whether we can delete checkpoints generated for previous versions of our
> >apps?

Some details below:

* We have two GCS buckets to store checkpoints and savepoints, like the
  following:

  * gs://flink-checkpoints has no retention configuration.
  * gs://flink-savepoints has retention 5 days.

* The checkpoint configuration are:
  * state.backend.incremental: true
  * RETAIN_ON_CANCELLATION
* We create savepoint every 4 hours for recovery.
* The business requires to have up to 180 days historical data.

The questions are:

* We want to set retention on gs://flink-checkpoints to reduce storage
  cost. However, Flink sometimes cannot restore from checkpoint due to
  missing data when retention is configured on gs://flink-checkpoints.
  Is there any way to config retention safely for Flink?

* We don't use DELETE_ON_CANCELLATION to avoid deleting state data by
  accidently.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Time different between checkpoint and savepoint restoration in GCS

2021-10-25 Thread
On Mon, Oct 25, 2021 at 05:19:32PM +0800, JING ZHANG wrote:
> Hi,
> > We wonder if this is expected behavior or not?
> I think it's expected. You could find more information in document [1].
> Checkpoints and Savepoints differ in their implementation. Checkpoints are
> designed to be lightweight and fast. They might (but don’t necessarily have
> to) make use of different features of the underlying state backend and try
> to restore data as fast as possible. As an example, incremental Checkpoints
> with the RocksDB State backend use RocksDB’s internal format instead of
> Flink’s native format. This is used to speed up the checkpointing process
> of RocksDB that makes them the first instance of a more lightweight
> Checkpointing mechanism. On the contrary, Savepoints are designed to focus
> more on the portability of data and support any changes made to the job
> that make them slightly more expensive to produce and restore.
> Besides, Savepoints binary format is different from checkpoint format.
> Flink savepoint binary format is unified across all state backends. [2]
> That means you can take a Savepoint with one state backend and then restore
> it using another. So when restore from Savepoint file, the job need to read
> from unified binary format and write into format based on the underlying
> state backend. When restore from checkpoint file, this step maybe easier,
> for example, load the files directly into underlying state backend.
> [1]
> https://www.ververica.com/blog/differences-between-savepoints-and-checkpoints-in-flink
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint

Thanks for the information.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Time different between checkpoint and savepoint restoration in GCS

2021-10-25 Thread
Hi,

We found that our application savepoint restoration time (~ 40 mins) is
much slower than checkpoint restoration time (~ 4 mins). We wonder if
this is expected behavior or not?


Some detail about the environment:

* Flink version: 1.14.0
* Persistent storage is GCS, via the following jars:
* flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar
* gcs-connector-hadoop3-2.2.2-shaded.jar
* Unaligned checkpoint is enabled.
* The network ingress for checkpoint restoration (~ 750 MiB/s) is much
  faster than savepoint restoration (~ 50 MiB/s)
* Checkpoint and savepoint uses different GCS buckets, not sure if this
  will affect the throughput of GCS.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Flink 1.14.0 reactive mode cannot rescale

2021-10-19 Thread
On Tue, Oct 19, 2021 at 11:51:44AM +0200, David Morávek wrote:
> Hi ChangZhuo,
> 
> this seems to be a current limitation of the unaligned checkpoints [1], are
> you using any broadcasted streams in your application?
> 
> [1] https://issues.apache.org/jira/browse/FLINK-22815

* Yes, we do have broadcasted streams for configuration. We can change
  to use aligned checkpoint to see if it is okay.

* [0] is marked as fixed in version 1.14.0, so maybe there are other
* part that needs to be fixed?

[0] https://issues.apache.org/jira/browse/FLINK-22815


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Flink 1.14.0 reactive mode cannot rescale

2021-10-18 Thread
) ~[?:?]
at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
 ~[?:?]
at 
java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517) ~[?:?]
at 
org.apache.flink.runtime.checkpoint.RescaleMappings.of(RescaleMappings.java:138)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.getNewToOldSubtasksMapping(SubtaskStateMapper.java:198)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.checkpoint.TaskStateAssignment.getOutputMapping(TaskStateAssignment.java:369)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeResultSubpartitionStates(StateAssignmentOperation.java:379)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:195)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:140)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1548)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1460)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.createExecutionGraphAndRestoreState(AdaptiveScheduler.java:993)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$createExecutionGraphAndRestoreStateAsync$25(AdaptiveScheduler.java:983)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.scheduler.adaptive.BackgroundTask.lambda$new$0(BackgroundTask.java:57)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
 ~[?:?]
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
 ~[?:?]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
~[?:?]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
~[?:?]
at java.lang.Thread.run(Thread.java:834) ~[?:?]
2021-10-18 09:31:14,426 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting 
StandaloneApplicationClusterEntryPoint down with application status FAILED. 
Diagnostics null.
2021-10-18 09:31:14,427 INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting 
down rest endpoint.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Inconsistent parallelism in web UI when using reactive mode

2021-10-12 Thread
On Tue, Oct 12, 2021 at 10:41:24AM +0200, Chesnay Schepler wrote:
> This is a known and documented 
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/#limitations-1>
> limitation of the AdaptiveScheduler. There is no concrete date yet for when
> it will be fixed.

Thanks for the information.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Inconsistent parallelism in web UI when using reactive mode

2021-10-11 Thread
Hi,

We found that parallelism in web UI are inconsistent when using reactive
mode. As in attachment, in overview page, all parallelism values are 1,
which is not correct one. When clicking operator for detail information,
the parallelism in detail information is the correct one.

Is it possible to fix this inconsistent so that it would not confused
engineer when deploying Flink application.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: 1.13.1 jobmanager annotations by pod template does not work

2021-06-15 Thread
On Tue, Jun 15, 2021 at 04:40:00PM +0800, Yang Wang wrote:
> Yes. It is the by-design behavior. Because the pod template is only
> applicable to the "pod", not other resources(e.g. deployment, configmap).
> 
> Currently, the JobManager pod is managed by deployment and the naked
> TaskManager pods are managed by Flink ResourceManager.
> This is the root cause which makes the difference.

Thanks for the clarification.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: 1.13.1 jobmanager annotations by pod template does not work

2021-06-15 Thread
On Tue, Jun 15, 2021 at 04:22:07PM +0800, Yang Wang wrote:
> The annotations, and labels in the pod template will only apply to the
> JobManager pod, not the JobManager deployment.

Thanks for the information.

Is this behavior by design? In document, it looks like there is no
different between jobmanager and taskmanager when handling annotations
and labels [0].


[0] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#pod-template


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: 1.13.1 jobmanager annotations by pod template does not work

2021-06-11 Thread
On Fri, Jun 11, 2021 at 11:19:09PM +0800, Yang Wang wrote:
> Could you please share your pod template and the value of
> kubernetes.jobmanager.annotations?
>
> Usually the annotations of pod template and flink config options will be
> merged. And the flink config
> options has higher priority if you are specifying same name annotation.
>
> I have verified in minikube and it could take effect as expected.

Hi,

There are other finding for this issue:

* For jobanager:
  * annotations, and labels in pod template do not work.
  * annotations, and labels in -Dkubernetes.jobmanager.* work.

* For taskmanager:
  * annotations, and labels in pod template work


The following is jobmanager pod template:

apiVersion: batch/v1
kind: Pod
metadata:
  labels:
app: jobmanager
helm.sh/chart: 
app.kubernetes.io/name: 
app.kubernetes.io/instance: 
app.kubernetes.io/version: 
app.kubernetes.io/managed-by: Helm
  annotations:
rt.prometheus.io/scrape: 'true'
rt.prometheus.io/path: '/'
rt.prometheus.io/port: '9249'


The following is pod created as jobmanager:


Name: 
Namespace:
Priority: 200
Priority Class Name:  medium
Node: 
Start Time:   Fri, 11 Jun 2021 23:38:52 +0800
Labels:   app=
  component=jobmanager
  pod-template-hash=55846fd8f7
  type=flink-native-kubernetes
Annotations:  


--
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: NPE when restoring from savepoint in Flink 1.13.1 application

2021-06-11 Thread
On Thu, Jun 10, 2021 at 07:10:45PM +0200, Roman Khachatryan wrote:
> Hi ChangZhuo,
> 
> Thanks for reporting, it looks like a bug.
> I've opened a ticket for that [1].
> 
> [1]
> https://issues.apache.org/jira/browse/FLINK-22966

Thanks for the help.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


1.13.1 jobmanager annotations by pod template does not work

2021-06-11 Thread

Hi,

We found that jobmanager annotations defined by pod template does not
work. However, annotations defined by kubernetes.jobmanager.annotations
[0]

This behavior is different from document [1].


[0] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#kubernetes-jobmanager-annotations
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#pod-template


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


NPE when restoring from savepoint in Flink 1.13.1 application

2021-06-09 Thread
(CompletableFuture.java:506) 
~[?:?]
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)
 ~[?:?]
... 6 more
Caused by: java.util.concurrent.CompletionException: 
java.lang.NullPointerException
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
 ~[?:?]
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
 ~[?:?]
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
 ~[?:?]
... 6 more
Caused by: java.lang.NullPointerException
at 
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reAssignSubKeyedStates(StateAssignmentOperation.java:300)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.lambda$reDistributeKeyedStates$0(StateAssignmentOperation.java:260)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at java.util.HashMap.forEach(HashMap.java:1336) ~[?:?]
at 
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.reDistributeKeyedStates(StateAssignmentOperation.java:252)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:196)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:139)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1642)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:163)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:138)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:190) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:120)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:317) 
~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
 ~[flink-dist_2.12-1.13.1.jar:1.13.1]
... 7 more
2021-06-09 13:08:59,852 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Exception 
occurred in REST handler: Could not execute application.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Customer operator in BATCH execution mode

2021-05-26 Thread
On Wed, May 26, 2021 at 01:03:53PM +0200, Dawid Wysakowicz wrote:
> Hi,
> 
> No there is no API in the operator to know which mode it works in. We
> aim to have separate operators for both modes if required. You can check
> e.g. how we do it in KeyedBroadcastStateTransformationTranslator[1].

Thanks for the information. We implement this according to Piotrek's
suggestion.

> 
> Yes, it should be possible to register a timer for Long.MAX_WATERMARK if
> you want to apply a transformation at the end of each key. You could
> also use the reduce operation (DataStream#keyBy#reduce) in BATCH mode.

According to [0], timer time is irrelevant since timer will be triggered
at the end of time right? If that is the case, we can use the same code
for both streaming and batch mode.

[0] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/


> 
> A side note, I don't fully get what you mean by "build state for our
> streaming application". Bear in mind though you cannot take a savepoint
> from a job running in the BATCH execution mode. Moreover it uses a
> different kind of StateBackend. Actually a dummy one, which just
> imitates a real state backend.

What we plan to do here is:

1. Load configuration from broadcast event (custom source backed by REST
   API).
2. Load historical events as batch mode input (From GCS).
3. Use timer to trigger output so that the following will happen:
   a. Serialize keyed states into JSON.
   b. Output to Kafka.
   c. Streaming application consumes data from Kafka, and update its
  keyed states according to it.

We hope that in this way, we can rebuild our states with almost the same
code in streaming.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Customer operator in BATCH execution mode

2021-05-25 Thread
Hi,

Currently, we want to use batch execution mode [0] and historical data
to build state for our streaming application. Due to different between
batch & streaming mode, we want to check current execution mode in
custom operator. So our question is:


* Is there any API for custom operator to know current execution mode
  (batch or streaming)?

* If we want to output after all elements of one specific key are
  processed, can we just use timer since timer is triggered at the end
  of input [0]?


[0] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/

-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Flink 1.13.0 reactive mode: Job stop and cannot restore from checkpoint

2021-05-17 Thread
On Mon, May 17, 2021 at 01:22:16PM +0200, Arvid Heise wrote:
> Hi ChangZhuo,
> 
> This looks indeed like a bug. I created FLINK-22686 [1] to track it. It
> looks unrelated to reactive mode to me and more related to unaligned
> checkpoints. So, you can try out reactive mode with aligned checkpoints.
> 
> If you can provide us with the topology, we can also fix it soonish: 1.13.1
> is on the horizon because of a license issue.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-22686

Hi Arvid,

We have upload topology to [0], please help to see if that is enough for
debugging, thanks.


[0] https://issues.apache.org/jira/browse/FLINK-22686


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Flink-pod-template-issue

2021-05-17 Thread
On Mon, May 17, 2021 at 01:29:55PM +0530, Priyanka Manickam wrote:
> Hi All,
> 
> Do we required to add any image for flink-main-container in
> pod-template.yaml file because it giving an error saying
> "spec.containers(0).image value required.
> 
> 
> Could anyone help with this please

Hi,

You need to specific your image via `kubernetes.container.image`
configuration as described in [0]. The image shall be Flink official one
+ your application jar so that you can specific your jar path as
local:/// when submitting to Kubernetes.

[0] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#pod-template


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: How to setup HA properly with Kubernetes Standalone Application Cluster

2021-05-14 Thread
On Fri, May 14, 2021 at 02:00:41PM +0200, Fabian Paul wrote:
> Hi Chen,
> 
> Can you tell us a bit more about the job you are using? 
> The intended behaviour you are seeking can only be achieved 
> If the Kubernetes HA Services are enabled [1][2].
> Otherwise the job cannot recall past checkpoints.

Hi Fabian,

Thanks for the response, the following is our cluster setup.

The HA setting we used is the same one as we used to used in session
cluster, so HA shall work. The following is setting we used for HA:

high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: gs://a-bucket/recovery
kubernetes.cluster-id: a-cluster-id
kubernetes.context: a-context
kubernetes.namespace: a-namespace

The following is Job we used for standalone application cluster.
Basically the content is almost the same as we used to used in session
cluster, except that it is Job, not Deployment, and the args are
different:

apiVersion: batch/v1
kind: Job
spec:
  template:
spec:
  restartPolicy: OnFailure
  containers:
- name: jobmanager
  env:
- name: FLINK_CONF_FILE
  value: /opt/flink/conf/flink-conf.yaml
- name: HADOOP_CLASSPATH
  value: 
/opt/flink/conf:/opt/hadoop-3.1.1/share/hadoop/common/lib/*:/opt/hadoop-3.1.1/share/hadoop/common/*:/opt/hadoop-3.1.1/share/hadoop/hdfs:/opt/hadoop-3.1.1/share/hadoop/hdfs/lib/*:/opt/hadoop-3.1.1/share/hadoop/hdfs/*:/opt/hadoop-3.1.1/share/hadoop/mapreduce/lib/*:/opt/hadoop-3.1.1/share/hadoop/mapreduce/*:/opt/hadoop-3.1.1/share/hadoop/yarn:/opt/hadoop-3.1.1/share/hadoop/yarn/lib/*:/opt/hadoop-3.1.1/share/hadoop/yarn/*:/contrib/capacity-scheduler/*.jar
- name: HADOOP_CONF_DIR
  value: /opt/flink/conf
- name: GOOGLE_APPLICATION_CREDENTIALS
  value: /opt/flink/conf/gcp-service-account.json
  args:
- standalone-job
- --fromSavepoint
- gs://a-savepoint
- --job-classname
- com.example.my.application

-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


How to setup HA properly with Kubernetes Standalone Application Cluster

2021-05-14 Thread
Hi,

Recently, we changed our deployment to Kubernetes Standalone Application
Cluster for reactive mode. According to [0], we use Kubernetes Job with
--fromSavepoint to upgrade our application without losing state. The Job
config is identical to the one in document.

However, we found that in this setup, if there is a failure in
jobmanager, Kubernetes will restart the jobmanager with original
savepoint specific in `--fromSavepoint`, instead of the latest
checkpoint. It causes problem when it is a long running job.

Any idea for how to make Flink restoring from latest checkpoint when it
is jobmanager failure in Kubernetes Standalone Application Cluster.


[0] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#deploy-application-cluster


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Flink 1.13.0 reactive mode: Job stop and cannot restore from checkpoint

2021-05-13 Thread
  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting 
down rest endpoint.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: How to specific key serializer

2021-03-31 Thread
On Wed, Mar 31, 2021 at 05:33:19PM +0800, Tzu-Li (Gordon) Tai wrote:
> You can try using TypeInfo annotations to specify a TypeInformationFactory
> for your key class [1].
> This allows you to "plug-in" the TypeInformation extracted by Flink for a
> given class. In that custom TypeInformation, you should let it return the
> correct serializer.

Hi Gordon,

Thanks for the tip. We have solve the problem by specific
TypeInformation in readKeyedState.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


How to specific key serializer

2021-03-29 Thread
Hi,

Currently we use sbt-avrohugger [0] to generate key class for keyed
state.  The key class generated by sbt-avrohugger is both case class,
and AVRO specific record. However, in the following scenarons, Flink
uses different serializers:


* In streaming application, Flink uses CaseClassSerializer for key
  class.
* In state processor API application, Flink uses AvroSerializer for key
  class.


Since they use different serializers for key, they are not compatible.
Is there any way to specific key serializer so that both applications
use the same serializer?



[0] https://github.com/julianpeeters/sbt-avrohugger

-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Checkpoint fail due to timeout

2021-03-17 Thread
On Wed, Mar 17, 2021 at 05:45:38AM +, Alexey Trenikhun wrote:
> In my opinion looks similar. Were you able to tune-up Flink to make it work? 
> I'm stuck with it, I wanted to scale up hoping to reduce backpressure, but to 
> rescale I need to take savepoint, which never completes (at least takes 
> longer than 3 hours).

You can use aligned checkpoint to scala your job. Just restarting from
checkpoint with the same jar file, and new parallelism shall do the
trick.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Checkpoint fail due to timeout

2021-03-16 Thread
On Tue, Mar 16, 2021 at 02:32:54AM +, Alexey Trenikhun wrote:
> Hi Roman,
> I took thread dump:
> "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on 
> java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: 
> digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> -  blocked on java.lang.Object@5366a0e2
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> 
> "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 
> WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> at sun.misc.Unsafe.park(Native Method)
> -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> 
> Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple 
> threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on 
> different Objects.

Hi,

This call stack is similar to our case as described in [0]. Maybe they
are the same issue?

[0] 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: How to debug checkpoint/savepoint stuck in Flink 1.12.2

2021-03-11 Thread
On Thu, Mar 11, 2021 at 02:14:32PM +0100, Arvid Heise wrote:
> Hi ChangZhuo,
> 
> Did you upgrade to Flink 1.12.2 and change the settings at the time? If so,
> could you maybe reset the settings to the old values on Flink 1.12.2 and
> check if the job still gets stuck? Especially, turning off unaligned
> checkpoints (UC) should clarify if it's a general issue in Flink 1.12.2 or
> with UC.
> 
> If it's indeed an issue with UC, then it would help to get the debug logs
> in particular for the package
> org.apache.flink.streaming.runtime.io.checkpointing. You could add the
> following to your log4js.properties (set general log level to INFO).
> 
> logger.checkpointing.name = 
> org.apache.flink.streaming.runtime.io.checkpointing
> logger.checkpointing.level = DEBUG

* Thanks for this information, we are working on this one, will reply
  when we get log.

* Also, we got the stack track when checkpoint stuck, please let us know
  if you need full trace.

  * The stuck task in UI is KafkaProducer -> ProcessFunction 128
  * The following is BLOCKED thread for Source: KafkaProducer -> 
ProcessFunction (129/140)#2

"Source: KafkaProducer -> ProcessFunction (129/140)#2" #66336 prio=5 
os_prio=0 cpu=582.01ms elapsed=5079.15s tid=0x7feb32717000 nid=0x9696 
waiting for monitor entry  [0x7feb28b61000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
- waiting to lock <0x00058e8c5070> (a java.lang.Object)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(java.base@11.0.8/Thread.java:834)

ps:
* The original UID is redacted by their underlying type.
* It looks like subtask id in UI is off-by-one in stacktrace.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debconf,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


How to debug checkpoint/savepoint stuck in Flink 1.12.2

2021-03-10 Thread
Hi,

We have updated our Flink applications to 1.12.2, alone with the
following modifications to improve its performance:

- Use unaligned checkpoint
- Change the following fs config
  - state.backend.fs.memory-threshold: 1048576
  - state.backend.fs.write-buffer-size: 4194304

However, now our Flink applications will occasionally stuck when doing
unaligned checkpoint or savepoint. The following are operators that
stuck in our cases.

- Kafka source connector.
- BroadcastProcessFunction with data input, and broadcasted
  configuration.

Also, when it is stuck, Flink also stops to consume any data.

Since these operators do not have many data to be stored in
checkpoint/savepoint, we wonder, how can we debug this problem?


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debconf,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: Cannot connect to queryable state proxy

2021-02-07 Thread
On Thu, Feb 04, 2021 at 04:26:42PM +0800, ChangZhuo Chen (陳昌倬) wrote:
> Hi,
> 
> We have problem connecting to queryable state client proxy as described
> in [0]. Any help is appreciated.
> 
> * The port 6125 is opened in taskmanager pod.
> 
>   ```
>   root@-654b94754d-2vknh:/tmp# ss -tlp
>   StateRecv-Q   Send-Q Local 
> Address:Port  Peer Address:Port  Process
>   LISTEN   01024 
> 0.0.0.0:46561  0.0.0.0:*
>   LISTEN   03
> 0.0.0.0:9249   0.0.0.0:*
>   LISTEN   01024 
> 0.0.0.0:6122   0.0.0.0:*
>   LISTEN   01024 
> 10.200.11.3:9067   0.0.0.0:*
>   LISTEN   01024 
> 10.200.11.3:6125   0.0.0.0:*
>   LISTEN   01024 
> 0.0.0.0:38607  0.0.0.0:*
>   ```

The problem is that Flink only listens 10.200.11.3:6125 for queryable
state client proxy, so we need to use correct network to connect to it.
Is there any way we can make Flink to listen to 0.0.0.0 for queryable
state client proxy?


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debconf,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Cannot connect to queryable state proxy

2021-02-04 Thread
Hi,

We have problem connecting to queryable state client proxy as described
in [0]. Any help is appreciated.


The following is our setup:

* Flink 1.12.1
* Standalone Kubernetes
* Related config in flink-conf.yaml

  ```
  queryable-state.enable: true
  queryable-state.proxy.ports: 6125
  ```

* taskmanager log

  ```
  2021-02-04 03:22:57,650 INFO  
org.apache.flink.runtime.io.network.netty.NettyServer[] - Successful 
initialization (took 35 ms). Listening on SocketAddress /0.0.0.0:43665.
  2021-02-04 03:22:57,656 INFO  
org.apache.flink.runtime.taskexecutor.KvStateService [] - Starting the 
kvState service and its components.
  2021-02-04 03:22:57,672 INFO  
org.apache.flink.queryablestate.server.KvStateServerImpl [] - Started 
Queryable State Server @ /10.200.18.4:9067.
  2021-02-04 03:22:57,679 INFO  
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl [] - 
Started Queryable State Proxy Server @ /10.200.18.4:6125.
  2021-02-04 03:22:57,698 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC 
endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at 
akka://flink/user/rpc/taskmanager_0 .
  ```

* The port 6125 is opened in taskmanager pod.

  ```
  root@-654b94754d-2vknh:/tmp# ss -tlp
  StateRecv-Q   Send-Q Local 
Address:Port  Peer Address:Port  Process
  LISTEN   01024 
0.0.0.0:46561  0.0.0.0:*
  LISTEN   03
0.0.0.0:9249   0.0.0.0:*
  LISTEN   01024 
0.0.0.0:6122   0.0.0.0:*
  LISTEN   01024 
10.200.11.3:9067   0.0.0.0:*
  LISTEN   01024 
10.200.11.3:6125   0.0.0.0:*
  LISTEN   01024 
0.0.0.0:38607  0.0.0.0:*
  ```

* However, we always get the following error when using queryable API:

  ```
  Exception in thread "main" java.util.concurrent.ExecutionException: 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
 Connection refused: localhost/127.0.0.1:6125
  at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
  at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022)
  at 
com.appier.rt.short_term_score.QueryCalculateUserST$.printMapState(QueryCalculateUserST.scala:44)
  at 
com.appier.rt.short_term_score.QueryCalculateUserST$.main(QueryCalculateUserST.scala:82)
  at 
com.appier.rt.short_term_score.QueryCalculateUserST.main(QueryCalculateUserST.scala)
  Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
 Connection refused: localhost/127.0.0.1:6125
  Caused by: java.net.ConnectException: Connection refused
  at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
  at 
java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779)
  at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
  at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
  at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
  at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
  at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
  at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
  at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
  at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
  at java.base/java.lang.Thread.run(Thread.java:834)
  ```

* `nc` in taskmanager itself also get the same error:

  ```
  root@:/tmp# nc -vz localhost 6125
  nc: connect to localhost port 6125 (tcp) failed: Connection refused
  nc: connect to localhost port 6125 (tcp) failed: Cannot assign requested 
address
  ```


[0] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html

-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debconf,debian}.org
gttp://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Cannot start from savepoint using Flink 1.12 in standalone Kubernetes + Kubernetes HA

2020-12-29 Thread
Hi,

We cannot start job from savepoint (created by Flink 1.12, Standalone
Kubernetes + zookeeper HA) in Flink 1.12, Standalone Kubernetes +
Kubernetes HA. The following is the exception that stops the job.

Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesException: Cannot 
retry checkAndUpdateConfigMap with configMap 
name-51e5afd90227d537ff442403d1b279da-jobmanager-leader because it does not 
exist.


Cluster can start new job from scratch, so we think cluster
configuration is good.

The following is HA related config:

high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: gs://some/path/recovery
kubernetes.cluster-id: cluster-name
kubernetes.context: kubernetes-context
kubernetes.namespace: kubernetes-namespace


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debconf,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature