Re: [Question] How to scale application based on 'reactive' mode
Hello, Thanks for your notice. Than what is the purpose of using 'reactive', if this doesn't do anything itself? What is the difference if I use auto-scaler without 'reactive' mode? Regards, Jung 2023년 8월 18일 (금) 오후 7:51, Gyula Fóra 님이 작성: > Hi! > > I think what you need is probably not the reactive mode but a proper > autoscaler. The reactive mode as you say doesn't do anything in itself, you > need to build a lot of logic around it. > > Check this instead: > https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/ > > The Kubernetes Operator has a built in autoscaler that can scale jobs > based on kafka data rate / processing throughput. It also doesn't rely on > the reactive mode. > > Cheers, > Gyula > > On Fri, Aug 18, 2023 at 12:43 PM Dennis Jung wrote: > >> Hello, >> Sorry for frequent questions. This is a question about 'reactive' mode. >> >> 1. As far as I understand, though I've setup `scheduler-mode: reactive`, >> it will not change parallelism automatically by itself, by CPU usage or >> Kafka consumer rate. It needs additional resource monitor features (such as >> Horizontal Pod Autoscaler, or else). Is this correct? >> 2. Is it possible to create a custom resource monitor provider >> application? For example, if I want to increase/decrease parallelism by >> Kafka consumer rate, do I need to send specific API from outside, to order >> rescaling? >> 3. If 2 is correct, what is the difference when using 'reactive' mode? >> Because as far as I think, calling a specific API will rescale either using >> 'reactive' mode or not...(or is the API just working based on this mode)? >> >> Thanks. >> >> Regards >> >>
Re: Could not retrieve JobResults of globally-terminated jobs from JobResultStore
Hi, 是流式作业还是批式作业无法恢复吗?从错误上看作业已经处于结束状态,你可以查看一下有没有其他错误日志,看看为什么作业失败退出了 Best, Shammon FY On Thu, Aug 31, 2023 at 7:47 PM denghaibin wrote: > flink-1.16.0任务运行一段时间后,大批量任务失败。错误日志如下。麻烦大佬看下是什么问题 > java.util.concurrent.CompletionException: > org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults > of globally-terminated jobs from JobResultStore > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) > ~[?:1.8.0_382] > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) > ~[?:1.8.0_382] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) > ~[?:1.8.0_382] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > ~[?:1.8.0_382] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ~[?:1.8.0_382] > at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382] > Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve > JobResults of globally-terminated jobs from JobResultStore > at > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:196) > ~[flink-dist-1.16.0.jar:1.16.0] > at > org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) > ~[flink-dist-1.16.0.jar:1.16.0] > at > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) > ~[flink-dist-1.16.0.jar:1.16.0] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) > ~[?:1.8.0_382] > ... 3 more > Caused by: > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: > No content to map due to end-of-input > at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream); > line: 1, column: 0] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) > ~[flink-dist-1.16.0.jar:1.16.0] > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688) > ~[flink-dist-1.16.0.jar:1.16.0]
Re: K8s operator - Stop Job with savepoint on session cluster via Java API
Hi Krzysztof, For the flink session cluster, you can stop the job with savepoint through the statement `STOP JOB '{Your job id}' WITH SAVEPOINT;`. You can refer to [1] for more information about how to do it in sql client and you can also create a table environment to perform the statement in your application. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#terminating-a-job Best, Shammon FY On Fri, Sep 1, 2023 at 6:35 AM Krzysztof Chmielewski < krzysiek.chmielew...@gmail.com> wrote: > Hi community, > I would like to ask what is the recommended way to stop Flink job with > save point on a session cluster via k8s operator Java API? > > Currently I'm doing this by setting savepointTriggerNonce on JobSpec > object. > However I've noticed that this works only if I do not include Job state > change in that spec. > > In other words when I submit JobSpec that has state change from Running to > Suspend and savepointTriggerNonce, the checkpoint is not created. Is that > intended? > In order to mimic [1] do I have to submit two JobSpec updates? One with > savepointNonce and the second one with Job state change? > > A followup question, what kind of savepoint is triggered when using > savepointTriggerNonce native or canonical? Also is there a way to pass > --drain option or savepoint path via spec? (Not > including state.savepoints.dir cluster config option) > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#stopping-a-job-with-savepoint > > Thanks, > Krzysztof Chmielewski >
K8s operator - Stop Job with savepoint on session cluster via Java API
Hi community, I would like to ask what is the recommended way to stop Flink job with save point on a session cluster via k8s operator Java API? Currently I'm doing this by setting savepointTriggerNonce on JobSpec object. However I've noticed that this works only if I do not include Job state change in that spec. In other words when I submit JobSpec that has state change from Running to Suspend and savepointTriggerNonce, the checkpoint is not created. Is that intended? In order to mimic [1] do I have to submit two JobSpec updates? One with savepointNonce and the second one with Job state change? A followup question, what kind of savepoint is triggered when using savepointTriggerNonce native or canonical? Also is there a way to pass --drain option or savepoint path via spec? (Not including state.savepoints.dir cluster config option) [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#stopping-a-job-with-savepoint Thanks, Krzysztof Chmielewski
Re: Rate Limit / Throttle Data to Send
Hi, Thanks for the info. I updated my code implementation. I am producing a list in my custom processAllWindowFunction() before it goes to my customSink. The problem I get is the list doesnt get reset/cleared. inputStream. windowAll(TumblingProcessingTimeWindows.of(Time.of(Time.seconds(10))) .trigger(CountTrigger.of(maxNo)) .process(new CustomProcessAllWindowFunc()); CustomProcessAllWindowFunc's code: eventList = new ArrayList<>(); If (rateConfig === eventList.size()) { collector.collect(eventList); //clear the size since I need to send only 100 elements eventList.clear(); } However, even I cleared the eventList, the elements that went to Sink previously are still there and the elements just keep adding to the eventList with the previous elements How can I correctly cleared the list in processWindowFunction? On Wed, Aug 30, 2023, 3:54 PM Schwalbe Matthias wrote: > Hi Patricia, > > > > What you try to implement can be achieved out-of-the-box by windowing. > > > > I assume these packets of 100 event are by key but globally. > > In that case use non-keyed windowing [1] with count trigger (100) [3] and > maybe add a processing time trigger if it takes too long time to collect > all 100 events, then create the output with a process window function [2]. > > > > I hope this helps > > > > Thias > > > > > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#keyed-vs-non-keyed-windows > > [2] > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#processwindowfunction > > [3] > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#built-in-and-custom-triggers > > > > > > *From:* patricia lee > *Sent:* Wednesday, August 30, 2023 6:54 AM > *To:* user@flink.apache.org > *Subject:* Rate Limit / Throttle Data to Send > > > > Hi, > > > > I have a requirement that I need to send data to a third party with a > limit number of elements with flow below. > > > > kafkasource > > mapToVendorPojo > > processfunction > > sinkToVendor > > > > My implementation is I continuously add the elements to my list state > > ListState in ProcessFunction and once it reaches 100 in size I > emit the data and start collecting data again to another set of 100. > > > > *if (rateConfig == Iterables.size(appEventState.get()) {* > > *List holder = new ArrayList();* > > *appEventState.get().forEach(e -> holder.add(e));* > > *collector.collect(holder);* > > *appEventState.clear()* > > *}* > > > > The problem I am getting is, *"if " condition above never gets matched*. > Because the appEventState size is always *0 or 1 only*. The rateConfig is > set to *20. * > > > > What am I missing? > > > > Thanks, > > Patricia > > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >
Could not retrieve JobResults of globally-terminated jobs from JobResultStore
flink-1.16.0任务运行一段时间后,大批量任务失败。错误日志如下。麻烦大佬看下是什么问题 java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_382] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_382] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) ~[?:1.8.0_382] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_382] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_382] at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382] Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:196) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.16.0.jar:1.16.0] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_382] ... 3 more Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream); line: 1, column: 0] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688) ~[flink-dist-1.16.0.jar:1.16.0]
Re: flink k8s operator - problem with patching seession cluster
There is no effect of the replicas setting in native mode. Native session clusters are "elastic", the number of task managers are determined on the fly based on the job requirements. Gyula On Thu, Aug 31, 2023 at 11:19 AM Krzysztof Chmielewski < krzysiek.chmielew...@gmail.com> wrote: > Thank you for the response. > > Yes currently in my PoC I'm using standalone integration. > Does ` spec.taskManager.replicas` has any effect when using native mode? > > The reason I'm asking is that I need to know what is the "cacpacity" of > particular session cluster before I will submit the job into it. > And the way how I was doing this was this: > > try (KubernetesClient kubernetesClient = new > KubernetesClientBuilder().build()) { > MixedOperation KubernetesResourceList, > io.fabric8.kubernetes.client.dsl.Resource> > resources = > kubernetesClient.resources(FlinkDeployment.class); > > List items = > resources.inNamespace("default").list().getItems(); > for (FlinkDeployment item : items) { > System.out.println("Flink Deployments: " + item); > System.out.println("Number of TM replicas: " + > item.getSpec().getTaskManager().getReplicas()); > } > } > > > Thanks, > Krzysztof > > czw., 31 sie 2023 o 10:44 Gyula Fóra napisał(a): > >> I guess your question is in the context of the standalone integration >> because native session deployments automatically add TMs on the fly as more >> are necessary. >> >> For standalone mode you should be able to configure >> `spec.taskManager.replicas` and if I understand correctly that will not >> shut down the running jobs. >> If you have problems please share your FlinkDeployment yaml and the >> operator logs in a JIRA ticket. >> >> In any case the native mode is probably better fit for your use-case. >> >> Gyula >> >> On Thu, Aug 31, 2023 at 2:42 AM Krzysztof Chmielewski < >> krzysiek.chmielew...@gmail.com> wrote: >> >>> Just want to broth this up in case it was missed in the other >>> messages/queries :) >>> >>> TL:DR >>> How to add TM to Flink Session cluster via Java K8s client if Session >>> Cluster has running jobs? >>> >>> Thanks, >>> Krzysztof >>> >>> pt., 25 sie 2023 o 23:48 Krzysztof Chmielewski < >>> krzysiek.chmielew...@gmail.com> napisał(a): >>> Hi community, I have a use case where I would like to add an extra TM) to a running Flink session cluster that has Flink jobs deployed. Session cluster creation, job submission and cluster patching is done using flink k8s operator Java API. The Details of this are presented here [1] I would like to ask, what is a recommended path to add a TM to existing Session Cluster that currently runs number of Flink jobs using Java API. For simplicity lets assume that I dont want to resume jobs from a savepoint, just redeploy them. When executing steps from [1] I'm facing an issue where Session jobs are not redeployed on patched Session cluster however kubectl shows that there is FlinkSessionJob subbmited to the k8s. Additionally when I'm trying to delete FlinkSessionJob from kubectl, Flink k8s operator throws an exception described in [1]. In fact the state of that Flink deployment requires few steps to clean it up after that patch. [1] https://github.com/kristoffSC/FlinkK8sControllerPoC/blob/ExceptionInOperator-clusterRestart/doc/K8sException_1.MD >>>
Re: flink k8s operator - problem with patching seession cluster
Thank you for the response. Yes currently in my PoC I'm using standalone integration. Does ` spec.taskManager.replicas` has any effect when using native mode? The reason I'm asking is that I need to know what is the "cacpacity" of particular session cluster before I will submit the job into it. And the way how I was doing this was this: try (KubernetesClient kubernetesClient = new KubernetesClientBuilder().build()) { MixedOperation, io.fabric8.kubernetes.client.dsl.Resource> resources = kubernetesClient.resources(FlinkDeployment.class); List items = resources.inNamespace("default").list().getItems(); for (FlinkDeployment item : items) { System.out.println("Flink Deployments: " + item); System.out.println("Number of TM replicas: " + item.getSpec().getTaskManager().getReplicas()); } } Thanks, Krzysztof czw., 31 sie 2023 o 10:44 Gyula Fóra napisał(a): > I guess your question is in the context of the standalone integration > because native session deployments automatically add TMs on the fly as more > are necessary. > > For standalone mode you should be able to configure > `spec.taskManager.replicas` and if I understand correctly that will not > shut down the running jobs. > If you have problems please share your FlinkDeployment yaml and the > operator logs in a JIRA ticket. > > In any case the native mode is probably better fit for your use-case. > > Gyula > > On Thu, Aug 31, 2023 at 2:42 AM Krzysztof Chmielewski < > krzysiek.chmielew...@gmail.com> wrote: > >> Just want to broth this up in case it was missed in the other >> messages/queries :) >> >> TL:DR >> How to add TM to Flink Session cluster via Java K8s client if Session >> Cluster has running jobs? >> >> Thanks, >> Krzysztof >> >> pt., 25 sie 2023 o 23:48 Krzysztof Chmielewski < >> krzysiek.chmielew...@gmail.com> napisał(a): >> >>> Hi community, >>> I have a use case where I would like to add an extra TM) to a running >>> Flink session cluster that has Flink jobs deployed. Session cluster >>> creation, job submission and cluster patching is done using flink k8s >>> operator Java API. The Details of this are presented here [1] >>> >>> I would like to ask, what is a recommended path to add a TM to existing >>> Session Cluster that currently runs number of Flink jobs using Java API. >>> For simplicity lets assume that I dont want to resume jobs from a >>> savepoint, just redeploy them. >>> >>> When executing steps from [1] I'm facing an issue where Session jobs are >>> not redeployed on patched Session cluster however kubectl shows that there >>> is FlinkSessionJob subbmited to the k8s. >>> >>> Additionally when I'm trying to delete FlinkSessionJob from kubectl, >>> Flink k8s operator throws an exception described in [1]. In fact the state >>> of that Flink deployment requires few steps to clean it up after that >>> patch. >>> >>> >>> [1] >>> https://github.com/kristoffSC/FlinkK8sControllerPoC/blob/ExceptionInOperator-clusterRestart/doc/K8sException_1.MD >>> >>
Re: flink k8s operator - problem with patching seession cluster
I guess your question is in the context of the standalone integration because native session deployments automatically add TMs on the fly as more are necessary. For standalone mode you should be able to configure `spec.taskManager.replicas` and if I understand correctly that will not shut down the running jobs. If you have problems please share your FlinkDeployment yaml and the operator logs in a JIRA ticket. In any case the native mode is probably better fit for your use-case. Gyula On Thu, Aug 31, 2023 at 2:42 AM Krzysztof Chmielewski < krzysiek.chmielew...@gmail.com> wrote: > Just want to broth this up in case it was missed in the other > messages/queries :) > > TL:DR > How to add TM to Flink Session cluster via Java K8s client if Session > Cluster has running jobs? > > Thanks, > Krzysztof > > pt., 25 sie 2023 o 23:48 Krzysztof Chmielewski < > krzysiek.chmielew...@gmail.com> napisał(a): > >> Hi community, >> I have a use case where I would like to add an extra TM) to a running >> Flink session cluster that has Flink jobs deployed. Session cluster >> creation, job submission and cluster patching is done using flink k8s >> operator Java API. The Details of this are presented here [1] >> >> I would like to ask, what is a recommended path to add a TM to existing >> Session Cluster that currently runs number of Flink jobs using Java API. >> For simplicity lets assume that I dont want to resume jobs from a >> savepoint, just redeploy them. >> >> When executing steps from [1] I'm facing an issue where Session jobs are >> not redeployed on patched Session cluster however kubectl shows that there >> is FlinkSessionJob subbmited to the k8s. >> >> Additionally when I'm trying to delete FlinkSessionJob from kubectl, >> Flink k8s operator throws an exception described in [1]. In fact the state >> of that Flink deployment requires few steps to clean it up after that >> patch. >> >> >> [1] >> https://github.com/kristoffSC/FlinkK8sControllerPoC/blob/ExceptionInOperator-clusterRestart/doc/K8sException_1.MD >> >
Re: Blue green deployment with Flink Apache Operator
Definitely our intent is to start with an in house specific Blue Green operator and once we will reach some level of confidence we will open a FLIP to discuss it. Nicolas On Thu, Aug 31, 2023 at 10:12 AM Gyula Fóra wrote: > The main concern as we discussed in previous mailing list threads before > is the general applicability of such solution: > > - Many production jobs cannot really afford running in parallel (starting > the second job while the first one is running), due to data > consistency/duplications reasons > - Exactly once sinks do not really support this > > So I think we should start with this maybe as an independent effort / > external library and if we see that it works we could discuss it in a FLIP. > > What do you think? > Gyula > > On Thu, Aug 31, 2023 at 9:23 AM Nicolas Fraison < > nicolas.frai...@datadoghq.com> wrote: > >> Thanks Gyula for your feedback. >> >> We were also thinking of relying on such a solution, creating a dedicated >> crd/operator to manage this BlueGreenFlinkDeployment. >> Good to hear that it could be incorporated later in the operator. >> >> Will let you know once we will have something to share with you. >> >> Nicolas >> >> On Wed, Aug 30, 2023 at 4:28 PM Gyula Fóra wrote: >> >>> Hey! >>> >>> I don't know if anyone has implemented this or not but one way to >>> approach this problem (and this may not be the right way, just an idea :) ) >>> is to add a new Custom Resource type that sits on top of the >>> FlinkDeployment / FlinkSessionJob resources and add a small controller for >>> this. >>> >>> This new custom resource, BlueGreenDeployment, would be somewhat similar >>> to how a Replicaset vs Pod works in Kubernetes. It would create a new >>> FlinkDeployment and would delete the old one once the new reached a healthy >>> running state. >>> >>> Adding a new CR allows us to not overcomplicate the existing >>> resource/controller loop but simply leverage it. If you prototype something >>> along these lines, please feel free to share and then we can discuss if we >>> want to incorporate something like this in the operator repo in the future >>> :) >>> >>> Cheers, >>> Gyula >>> >>> On Wed, Aug 30, 2023 at 1:21 PM Nicolas Fraison via user < >>> user@flink.apache.org> wrote: >>> Hi, From https://issues.apache.org/jira/browse/FLINK-29199 it seems that support for blue green deployment will not be supported or will not happen soon. I'd like to know if some of you have built a custom mechanism on top of this operator to support the blue green deployment and if you would have any advice on implementing this? -- Nicolas Fraison (he/him) >>>
Re: Blue green deployment with Flink Apache Operator
The main concern as we discussed in previous mailing list threads before is the general applicability of such solution: - Many production jobs cannot really afford running in parallel (starting the second job while the first one is running), due to data consistency/duplications reasons - Exactly once sinks do not really support this So I think we should start with this maybe as an independent effort / external library and if we see that it works we could discuss it in a FLIP. What do you think? Gyula On Thu, Aug 31, 2023 at 9:23 AM Nicolas Fraison < nicolas.frai...@datadoghq.com> wrote: > Thanks Gyula for your feedback. > > We were also thinking of relying on such a solution, creating a dedicated > crd/operator to manage this BlueGreenFlinkDeployment. > Good to hear that it could be incorporated later in the operator. > > Will let you know once we will have something to share with you. > > Nicolas > > On Wed, Aug 30, 2023 at 4:28 PM Gyula Fóra wrote: > >> Hey! >> >> I don't know if anyone has implemented this or not but one way to >> approach this problem (and this may not be the right way, just an idea :) ) >> is to add a new Custom Resource type that sits on top of the >> FlinkDeployment / FlinkSessionJob resources and add a small controller for >> this. >> >> This new custom resource, BlueGreenDeployment, would be somewhat similar >> to how a Replicaset vs Pod works in Kubernetes. It would create a new >> FlinkDeployment and would delete the old one once the new reached a healthy >> running state. >> >> Adding a new CR allows us to not overcomplicate the existing >> resource/controller loop but simply leverage it. If you prototype something >> along these lines, please feel free to share and then we can discuss if we >> want to incorporate something like this in the operator repo in the future >> :) >> >> Cheers, >> Gyula >> >> On Wed, Aug 30, 2023 at 1:21 PM Nicolas Fraison via user < >> user@flink.apache.org> wrote: >> >>> Hi, >>> >>> From https://issues.apache.org/jira/browse/FLINK-29199 it seems that >>> support for blue green deployment will not be supported or will not happen >>> soon. >>> >>> I'd like to know if some of you have built a custom mechanism on top of >>> this operator to support the blue green deployment and if you would have >>> any advice on implementing this? >>> >>> -- >>> >>> Nicolas Fraison (he/him) >>> >>
Re: Blue green deployment with Flink Apache Operator
Thanks Gyula for your feedback. We were also thinking of relying on such a solution, creating a dedicated crd/operator to manage this BlueGreenFlinkDeployment. Good to hear that it could be incorporated later in the operator. Will let you know once we will have something to share with you. Nicolas On Wed, Aug 30, 2023 at 4:28 PM Gyula Fóra wrote: > Hey! > > I don't know if anyone has implemented this or not but one way to approach > this problem (and this may not be the right way, just an idea :) ) is to > add a new Custom Resource type that sits on top of the FlinkDeployment / > FlinkSessionJob resources and add a small controller for this. > > This new custom resource, BlueGreenDeployment, would be somewhat similar > to how a Replicaset vs Pod works in Kubernetes. It would create a new > FlinkDeployment and would delete the old one once the new reached a healthy > running state. > > Adding a new CR allows us to not overcomplicate the existing > resource/controller loop but simply leverage it. If you prototype something > along these lines, please feel free to share and then we can discuss if we > want to incorporate something like this in the operator repo in the future > :) > > Cheers, > Gyula > > On Wed, Aug 30, 2023 at 1:21 PM Nicolas Fraison via user < > user@flink.apache.org> wrote: > >> Hi, >> >> From https://issues.apache.org/jira/browse/FLINK-29199 it seems that >> support for blue green deployment will not be supported or will not happen >> soon. >> >> I'd like to know if some of you have built a custom mechanism on top of >> this operator to support the blue green deployment and if you would have >> any advice on implementing this? >> >> -- >> >> Nicolas Fraison (he/him) >> >