Svar: [Kubernetes Operator] NullPointerException from KubernetesApplicationClusterEntrypoint
Hi Gyula, and thanks for your answer, We tried without any cluster-id reference and still got the same error message. It seems to be related with flink 1.16 as we have other jobs running with the same flinkConfig and flink 1.15. PB Fra: Gyula Fóra Dato: fredag, 31. mars 2023 kl. 14:41 Til: Pierre Bedoucha Kopi: user@flink.apache.org Emne: Re: [Kubernetes Operator] NullPointerException from KubernetesApplicationClusterEntrypoint Never seen this before but also you should not set the cluster-id in your config as that should be controlled by the operator itself. Gyula On Fri, Mar 31, 2023 at 2:39 PM Pierre Bedoucha mailto:pierre.bedou...@tv2.no>> wrote: Hi, We are trying to use Flink Kubernetes Operator 1.4.0 with Flink 1.16. However, at the job-manager deployment step we get the following error: ``` Exception in thread "main" java.lang.NullPointerException at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.shutDownAsync(ClusterEntrypoint.java:585) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:242) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729) at org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86) ``` It sems it is related to the following line: ``` this.clusterId = checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID), "ClusterId must be specified!"); ``` We specified the CLUSTER_ID but it seems that the flinkConfig object is not handled correctly. We have the following flinkConfiguration defined in deployment.yaml: ``` spec: flinkConfiguration: execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION execution.checkpointing.interval: 120s execution.checkpointing.min-pause: 120s execution.checkpointing.mode: AT_LEAST_ONCE execution.checkpointing.snapshot-compression: "false" execution.checkpointing.timeout: 3000s execution.checkpointing.tolerable-failed-checkpoints: "5" execution.checkpointing.unaligned: "false" fs.hdfs.hadoopconf: /opt/hadoop-conf/ high-availability.storageDir: gs:///ha high-availability: kubernetes high-availability.cluster-id: kubernetes.operator.periodic.savepoint.interval: 6h kubernetes.operator.savepoint.history.max.age: 72h kubernetes.operator.savepoint.history.max.count: "15" metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter metrics.reporter.prom.port: "2112" metrics.reporters: prom rest.flamegraph.enabled: "false" state.backend: rocksdb state.backend.incremental: "false" state.backend.rocksdb.localdir: /rocksdb state.checkpoint-storage: filesystem state.checkpoints.dir: gs:///checkpoints state.savepoints.dir: gs:///savepoints taskmanager.memory.managed.fraction: "0" taskmanager.network.memory.buffer-debloat.enabled: "false" taskmanager.network.memory.buffer-debloat.period: "200" taskmanager.network.memory.buffers-per-channel: "2" taskmanager.network.memory.floating-buffers-per-gate: "8" taskmanager.network.memory.max-buffers-per-channel: "10" taskmanager.network.sort-shuffle.min-buffers: "512" taskmanager.numberOfTaskSlots: "1" kubernetes.taskmanager.cpu.limit-factor: "4" kubernetes.taskmanager.cpu: "0.5" kubernetes.cluster-id: ``` Have someone encountered the issue before? Thanks, PB
Re: [Kubernetes Operator] NullPointerException from KubernetesApplicationClusterEntrypoint
Never seen this before but also you should not set the cluster-id in your config as that should be controlled by the operator itself. Gyula On Fri, Mar 31, 2023 at 2:39 PM Pierre Bedoucha wrote: > Hi, > > > > We are trying to use Flink Kubernetes Operator 1.4.0 with Flink 1.16. > > > > However, at the job-manager deployment step we get the following error: > ``` > > Exception in thread "main" java.lang.NullPointerException > > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.shutDownAsync(ClusterEntrypoint.java:585) > > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:242) > > at > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729) > > at > org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86) > > > > ``` > It sems it is related to the following line: > > ``` > > this.clusterId = > checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID), > "ClusterId must be specified!"); > > ``` > We specified the CLUSTER_ID but it seems that the flinkConfig object is > not handled correctly. > > We have the following flinkConfiguration defined in deployment.yaml: > ``` > spec: > > flinkConfiguration: > > execution.checkpointing.externalized-checkpoint-retention: > RETAIN_ON_CANCELLATION > > execution.checkpointing.interval: 120s > > execution.checkpointing.min-pause: 120s > > execution.checkpointing.mode: AT_LEAST_ONCE > > execution.checkpointing.snapshot-compression: "false" > > execution.checkpointing.timeout: 3000s > > execution.checkpointing.tolerable-failed-checkpoints: "5" > > execution.checkpointing.unaligned: "false" > > fs.hdfs.hadoopconf: /opt/hadoop-conf/ > > high-availability.storageDir: gs:///ha > > high-availability: kubernetes > > high-availability.cluster-id: > > kubernetes.operator.periodic.savepoint.interval: 6h > > kubernetes.operator.savepoint.history.max.age: 72h > > kubernetes.operator.savepoint.history.max.count: "15" > > metrics.reporter.prom.class: > org.apache.flink.metrics.prometheus.PrometheusReporter > > metrics.reporter.prom.port: "2112" > > metrics.reporters: prom > > rest.flamegraph.enabled: "false" > > state.backend: rocksdb > > state.backend.incremental: "false" > > state.backend.rocksdb.localdir: /rocksdb > > state.checkpoint-storage: filesystem > > state.checkpoints.dir: gs:///checkpoints > > state.savepoints.dir: gs:///savepoints > > taskmanager.memory.managed.fraction: "0" > > taskmanager.network.memory.buffer-debloat.enabled: "false" > > taskmanager.network.memory.buffer-debloat.period: "200" > > taskmanager.network.memory.buffers-per-channel: "2" > > taskmanager.network.memory.floating-buffers-per-gate: "8" > > taskmanager.network.memory.max-buffers-per-channel: "10" > > taskmanager.network.sort-shuffle.min-buffers: "512" > > taskmanager.numberOfTaskSlots: "1" > > kubernetes.taskmanager.cpu.limit-factor: "4" > > kubernetes.taskmanager.cpu: "0.5" > > kubernetes.cluster-id: > > ``` > Have someone encountered the issue before? > > Thanks, > PB >
[Kubernetes Operator] NullPointerException from KubernetesApplicationClusterEntrypoint
Hi, We are trying to use Flink Kubernetes Operator 1.4.0 with Flink 1.16. However, at the job-manager deployment step we get the following error: ``` Exception in thread "main" java.lang.NullPointerException at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.shutDownAsync(ClusterEntrypoint.java:585) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:242) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729) at org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86) ``` It sems it is related to the following line: ``` this.clusterId = checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID), "ClusterId must be specified!"); ``` We specified the CLUSTER_ID but it seems that the flinkConfig object is not handled correctly. We have the following flinkConfiguration defined in deployment.yaml: ``` spec: flinkConfiguration: execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION execution.checkpointing.interval: 120s execution.checkpointing.min-pause: 120s execution.checkpointing.mode: AT_LEAST_ONCE execution.checkpointing.snapshot-compression: "false" execution.checkpointing.timeout: 3000s execution.checkpointing.tolerable-failed-checkpoints: "5" execution.checkpointing.unaligned: "false" fs.hdfs.hadoopconf: /opt/hadoop-conf/ high-availability.storageDir: gs:///ha high-availability: kubernetes high-availability.cluster-id: kubernetes.operator.periodic.savepoint.interval: 6h kubernetes.operator.savepoint.history.max.age: 72h kubernetes.operator.savepoint.history.max.count: "15" metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter metrics.reporter.prom.port: "2112" metrics.reporters: prom rest.flamegraph.enabled: "false" state.backend: rocksdb state.backend.incremental: "false" state.backend.rocksdb.localdir: /rocksdb state.checkpoint-storage: filesystem state.checkpoints.dir: gs:///checkpoints state.savepoints.dir: gs:///savepoints taskmanager.memory.managed.fraction: "0" taskmanager.network.memory.buffer-debloat.enabled: "false" taskmanager.network.memory.buffer-debloat.period: "200" taskmanager.network.memory.buffers-per-channel: "2" taskmanager.network.memory.floating-buffers-per-gate: "8" taskmanager.network.memory.max-buffers-per-channel: "10" taskmanager.network.sort-shuffle.min-buffers: "512" taskmanager.numberOfTaskSlots: "1" kubernetes.taskmanager.cpu.limit-factor: "4" kubernetes.taskmanager.cpu: "0.5" kubernetes.cluster-id: ``` Have someone encountered the issue before? Thanks, PB
Re: Re: KafkaSource consumer group
Hi, FWIW, I asked a similar question here: https://lists.apache.org/thread/1f01zo1lqcmhvosptpjlm6k3mgx0sv1m :) On Fri, Mar 31, 2023 at 3:57 AM Roberts, Ben (Senior Developer) via user < user@flink.apache.org> wrote: > Hi Gordon, > > Thanks for the reply! > I think that makes sense. > > The reason for investigating is that generally we run our production > workloads across 2 kubernetes clusters (each in a different cloud region) > for availability reasons. So for instance requests to web apps are load > balanced between servers in both clusters, and pub/sub apps will have > consumers running in both clusters in the same consumer group (or non-kafka > equivalent). > > We’ve just recently deployed our first production Flink workload, using > the flink-kubernetes-operator and running the job(s) in HA mode, but we > discovered that the same job running in each k8s cluster was processing the > same messages, which was different to what we’d expected. > It sounds like this is intentional from Flink’s POV though. > > I don’t suppose you’re aware of a feature that would allow us to run a > Flink job across 2 clusters? Otherwise I guess we’ll need to just run it in > a single cluster and be aware of the risks if we lost that cluster. > > Thanks, > Ben > > On 2023/03/30 16:52:31 "Tzu-Li (Gordon) Tai" wrote: > > Hi Robert, > > > > This is a design choice. Flink's KafkaSource doesn't rely on consumer > > groups for assigning partitions / rebalancing / offset tracking. It > > manually assigns whatever partitions are in the specified topic across > its > > consumer instances, and rebalances only when the Flink job / KafkaSink is > > rescaled. > > > > Is there a specific reason that you need two Flink jobs for this? I > believe > > the Flink-way of doing this would be to have one job read the topic, and > > then you'd do a stream split if you want to have two different branches > of > > processing business logic. > > > > Thanks, > > Gordon > > > > On Thu, Mar 30, 2023 at 9:34 AM Roberts, Ben (Senior Developer) via user > < > > user@flink.apache.org> wrote: > > > > > Hi, > > > > > > > > > > > > Is there a way to run multiple flink jobs with the same Kafka group.id > > > and have them join the same consumer group? > > > > > > > > > > > > It seems that setting the group.id using > > > KafkaSource.builder().set_group_id() does not have the effect of > creating > > > an actual consumer group in Kafka. > > > > > > > > > > > > Running the same flink job with the same group.id, consuming from the > > > same topic, will result in both flink jobs receiving the same messages > from > > > the topic, rather than only one of the jobs receiving the messages (as > > > would be expected for consumers in a consumer group normally with > Kafka). > > > > > > > > > > > > Is this a design choice, and is there a way to configure it so messages > > > can be split across two jobs using the same “group.id”? > > > > > > > > > > > > Thanks in advance, > > > > > > Ben > > > > > > > > > Information in this email including any attachments may be privileged, > > > confidential and is intended exclusively for the addressee. The views > > > expressed may not be official policy, but the personal views of the > > > originator. If you have received it in error, please notify the sender > by > > > return e-mail and delete it from your system. You should not reproduce, > > > distribute, store, retransmit, use or disclose its contents to anyone. > > > Please note we reserve the right to monitor all e-mail communication > > > through our internal and external networks. SKY and the SKY marks are > > > trademarks of Sky Limited and Sky International AG and are used under > > > licence. > > > > > > Sky UK Limited (Registration No. 2906991), Sky-In-Home Service Limited > > > (Registration No. 2067075), Sky Subscribers Services Limited > (Registration > > > No. 2340150) and Sky CP Limited (Registration No. 9513259) are direct > or > > > indirect subsidiaries of Sky Limited (Registration No. 2247735). All > of the > > > companies mentioned in this paragraph are incorporated in England and > Wales > > > and share the same registered office at Grant Way, Isleworth, > Middlesex TW7 > > > 5QD > > > > > > Information in this email including any attachments may be privileged, > confidential and is intended exclusively for the addressee. The views > expressed may not be official policy, but the personal views of the > originator. If you have received it in error, please notify the sender by > return e-mail and delete it from your system. You should not reproduce, > distribute, store, retransmit, use or disclose its contents to anyone. > Please note we reserve the right to monitor all e-mail communication > through our internal and external networks. SKY and the SKY marks are > trademarks of Sky Limited and Sky International AG and are used under > licence. > > Sky UK Limited (Registration No. 2906991), Sky-In-Home Service Limited > (Registration No. 2067075), Sky
????
RE: Re: KafkaSource consumer group
Hi Gordon, Thanks for the reply! I think that makes sense. The reason for investigating is that generally we run our production workloads across 2 kubernetes clusters (each in a different cloud region) for availability reasons. So for instance requests to web apps are load balanced between servers in both clusters, and pub/sub apps will have consumers running in both clusters in the same consumer group (or non-kafka equivalent). We’ve just recently deployed our first production Flink workload, using the flink-kubernetes-operator and running the job(s) in HA mode, but we discovered that the same job running in each k8s cluster was processing the same messages, which was different to what we’d expected. It sounds like this is intentional from Flink’s POV though. I don’t suppose you’re aware of a feature that would allow us to run a Flink job across 2 clusters? Otherwise I guess we’ll need to just run it in a single cluster and be aware of the risks if we lost that cluster. Thanks, Ben On 2023/03/30 16:52:31 "Tzu-Li (Gordon) Tai" wrote: > Hi Robert, > > This is a design choice. Flink's KafkaSource doesn't rely on consumer > groups for assigning partitions / rebalancing / offset tracking. It > manually assigns whatever partitions are in the specified topic across its > consumer instances, and rebalances only when the Flink job / KafkaSink is > rescaled. > > Is there a specific reason that you need two Flink jobs for this? I believe > the Flink-way of doing this would be to have one job read the topic, and > then you'd do a stream split if you want to have two different branches of > processing business logic. > > Thanks, > Gordon > > On Thu, Mar 30, 2023 at 9:34 AM Roberts, Ben (Senior Developer) via user < > user@flink.apache.org> wrote: > > > Hi, > > > > > > > > Is there a way to run multiple flink jobs with the same Kafka group.id > > and have them join the same consumer group? > > > > > > > > It seems that setting the group.id using > > KafkaSource.builder().set_group_id() does not have the effect of creating > > an actual consumer group in Kafka. > > > > > > > > Running the same flink job with the same group.id, consuming from the > > same topic, will result in both flink jobs receiving the same messages from > > the topic, rather than only one of the jobs receiving the messages (as > > would be expected for consumers in a consumer group normally with Kafka). > > > > > > > > Is this a design choice, and is there a way to configure it so messages > > can be split across two jobs using the same “group.id”? > > > > > > > > Thanks in advance, > > > > Ben > > > > > > Information in this email including any attachments may be privileged, > > confidential and is intended exclusively for the addressee. The views > > expressed may not be official policy, but the personal views of the > > originator. If you have received it in error, please notify the sender by > > return e-mail and delete it from your system. You should not reproduce, > > distribute, store, retransmit, use or disclose its contents to anyone. > > Please note we reserve the right to monitor all e-mail communication > > through our internal and external networks. SKY and the SKY marks are > > trademarks of Sky Limited and Sky International AG and are used under > > licence. > > > > Sky UK Limited (Registration No. 2906991), Sky-In-Home Service Limited > > (Registration No. 2067075), Sky Subscribers Services Limited (Registration > > No. 2340150) and Sky CP Limited (Registration No. 9513259) are direct or > > indirect subsidiaries of Sky Limited (Registration No. 2247735). All of the > > companies mentioned in this paragraph are incorporated in England and Wales > > and share the same registered office at Grant Way, Isleworth, Middlesex TW7 > > 5QD > > > Information in this email including any attachments may be privileged, confidential and is intended exclusively for the addressee. The views expressed may not be official policy, but the personal views of the originator. If you have received it in error, please notify the sender by return e-mail and delete it from your system. You should not reproduce, distribute, store, retransmit, use or disclose its contents to anyone. Please note we reserve the right to monitor all e-mail communication through our internal and external networks. SKY and the SKY marks are trademarks of Sky Limited and Sky International AG and are used under licence. Sky UK Limited (Registration No. 2906991), Sky-In-Home Service Limited (Registration No. 2067075), Sky Subscribers Services Limited (Registration No. 2340150) and Sky CP Limited (Registration No. 9513259) are direct or indirect subsidiaries of Sky Limited (Registration No. 2247735). All of the companies mentioned in this paragraph are incorporated in England and Wales and share the same registered office at Grant Way, Isleworth, Middlesex TW7 5QD
Re: [Discussion] - Release major Flink version to support JDK 17 (LTS)
https://github.com/EsotericSoftware/kryo/wiki/Migration-to-v5#migration-guide Kroy themselves state that v5 likely can't read v2 data. However, both versions can be on the classpath without classpath as v5 offers a versioned artifact that includes the version in the package. It probably wouldn't be difficult to migrate a savepoint to Kryo v5, purely from a read/write perspective. The bigger question is how we expose this new Kryo version in the API. If we stick to the versioned jar we need to either duplicate all current Kryo-related APIs or find a better way to integrate other serialization stacks. On 30/03/2023 17:50, Piotr Nowojski wrote: Hey, > 1. The Flink community agrees that we upgrade Kryo to a later version, which means breaking all checkpoint/savepoint compatibility and releasing a Flink 2.0 with Java 17 support added and Java 8 and Flink Scala API support dropped. This is probably the quickest way, but would still mean that we expose Kryo in the Flink APIs, which is the main reason why we haven't been able to upgrade Kryo at all. This sounds pretty bad to me. Has anyone looked into what it would take to provide a smooth migration from Kryo2 -> Kryo5? Best, Piotrek czw., 30 mar 2023 o 16:54 Alexis Sarda-Espinosa napisał(a): Hi Martijn, just to be sure, if all state-related classes use a POJO serializer, Kryo will never come into play, right? Given FLINK-16686 [1], I wonder how many users actually have jobs with Kryo and RocksDB, but even if there aren't many, that still leaves those who don't use RocksDB for checkpoints/savepoints. If Kryo were to stay in the Flink APIs in v1.X, is it impossible to let users choose between v2/v5 jars by separating them like log4j2 jars? [1] https://issues.apache.org/jira/browse/FLINK-16686 Regards, Alexis. Am Do., 30. März 2023 um 14:26 Uhr schrieb Martijn Visser : Hi all, I also saw a thread on this topic from Clayton Wohl [1] on this topic, which I'm including in this discussion thread to avoid that it gets lost. From my perspective, there's two main ways to get to Java 17: 1. The Flink community agrees that we upgrade Kryo to a later version, which means breaking all checkpoint/savepoint compatibility and releasing a Flink 2.0 with Java 17 support added and Java 8 and Flink Scala API support dropped. This is probably the quickest way, but would still mean that we expose Kryo in the Flink APIs, which is the main reason why we haven't been able to upgrade Kryo at all. 2. There's a contributor who makes a contribution that bumps Kryo, but either a) automagically reads in all old checkpoints/savepoints in using Kryo v2 and writes them to new snapshots using Kryo v5 (like is mentioned in the Kryo migration guide [2][3] or b) provides an offline tool that allows users that are interested in migrating their snapshots manually before starting from a newer version. That potentially could prevent the need to introduce a new Flink major version. In both scenarios, ideally the contributor would also help with avoiding the exposure of Kryo so that we will be in a better shape in the future. It would be good to get the opinion of the community for either of these two options, or potentially for another one that I haven't mentioned. If it appears that there's an overall agreement on the direction, I would propose that a FLIP gets created which describes the entire process. Looking forward to the thoughts of others, including the Users (therefore including the User ML). Best regards, Martijn [1] https://lists.apache.org/thread/qcw8wy9dv8szxx9bh49nz7jnth22p1v2 [2] https://lists.apache.org/thread/gv49jfkhmbshxdvzzozh017ntkst3sgq [3] https://github.com/EsotericSoftware/kryo/wiki/Migration-to-v5 On Sun, Mar 19, 2023 at 8:16 AM Tamir Sagi wrote: I agree, there are several options to mitigate the migration from v2 to v5. yet, Oracle roadmap is to end JDK 11 support in September this year. From: ConradJam Sent: Thursday, March 16, 2023 4:36 AM To: d...@flink.apache.org Subject: Re: [Discussion] - Release major Flink version to support JDK 17 (LTS) EXTERNAL EMAIL Thanks for your start this discuss I have been tracking this problem for a long time, until I saw a conversation in ISSUSE a few days ago and learned that the Kryo version problem will affect the JDK17 compilation of snapshots [1] FLINK-24998 ,
Re: flink 1.17 connector adapt
Thanks for the update. Martijn Visser 于2023年3月31日周五 14:35写道: > Hi Tian, > > Thanks for flagging this. This is the first time that we've released a > Flink version with connectors externalized and we're still discussing > what's the best way to release connectors for new versions in a simple way. > This is something that we're trying to get done asap. > > Best regards, > > Martijn > > On Fri, Mar 31, 2023 at 7:22 AM tian tian wrote: > >> Hi, Flink 1.17 has been released, but elasticsearch and rabbitmq have not >> been adapted to 1.17. Is there any plan to adapt it? >> >
Re: flink 1.17 connector adapt
Hi Tian, Thanks for flagging this. This is the first time that we've released a Flink version with connectors externalized and we're still discussing what's the best way to release connectors for new versions in a simple way. This is something that we're trying to get done asap. Best regards, Martijn On Fri, Mar 31, 2023 at 7:22 AM tian tian wrote: > Hi, Flink 1.17 has been released, but elasticsearch and rabbitmq have not > been adapted to 1.17. Is there any plan to adapt it? >