Re: [Question] How to scale application based on 'reactive' mode

2023-08-31 Thread Dennis Jung
Hello,
Thanks for your notice.

Than what is the purpose of using 'reactive', if this doesn't do anything
itself?
What is the difference if I use auto-scaler without 'reactive' mode?

Regards,
Jung



2023년 8월 18일 (금) 오후 7:51, Gyula Fóra 님이 작성:

> Hi!
>
> I think what you need is probably not the reactive mode but a proper
> autoscaler. The reactive mode as you say doesn't do anything in itself, you
> need to build a lot of logic around it.
>
> Check this instead:
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/
>
> The Kubernetes Operator has a built in autoscaler that can scale jobs
> based on kafka data rate / processing throughput. It also doesn't rely on
> the reactive mode.
>
> Cheers,
> Gyula
>
> On Fri, Aug 18, 2023 at 12:43 PM Dennis Jung  wrote:
>
>> Hello,
>> Sorry for frequent questions. This is a question about 'reactive' mode.
>>
>> 1. As far as I understand, though I've setup `scheduler-mode: reactive`,
>> it will not change parallelism automatically by itself, by CPU usage or
>> Kafka consumer rate. It needs additional resource monitor features (such as
>> Horizontal Pod Autoscaler, or else). Is this correct?
>> 2. Is it possible to create a custom resource monitor provider
>> application? For example, if I want to increase/decrease parallelism by
>> Kafka consumer rate, do I need to send specific API from outside, to order
>> rescaling?
>> 3. If 2 is correct, what is the difference when using 'reactive' mode?
>> Because as far as I think, calling a specific API will rescale either using
>> 'reactive' mode or not...(or is the API just working based on this mode)?
>>
>> Thanks.
>>
>> Regards
>>
>>


Re: Could not retrieve JobResults of globally-terminated jobs from JobResultStore

2023-08-31 Thread Shammon FY
Hi,

是流式作业还是批式作业无法恢复吗?从错误上看作业已经处于结束状态,你可以查看一下有没有其他错误日志,看看为什么作业失败退出了

Best,
Shammon FY

On Thu, Aug 31, 2023 at 7:47 PM denghaibin  wrote:

> flink-1.16.0任务运行一段时间后,大批量任务失败。错误日志如下。麻烦大佬看下是什么问题
> java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults
> of globally-terminated jobs from JobResultStore
>  at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> ~[?:1.8.0_382]
>  at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> ~[?:1.8.0_382]
>  at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
> ~[?:1.8.0_382]
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_382]
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_382]
>  at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
> Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve
> JobResults of globally-terminated jobs from JobResultStore
>  at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:196)
> ~[flink-dist-1.16.0.jar:1.16.0]
>  at
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
> ~[flink-dist-1.16.0.jar:1.16.0]
>  at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188)
> ~[flink-dist-1.16.0.jar:1.16.0]
>  at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> ~[?:1.8.0_382]
>  ... 3 more
> Caused by:
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
> No content to map due to end-of-input
>  at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream);
> line: 1, column: 0]
>  at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> ~[flink-dist-1.16.0.jar:1.16.0]
>  at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
> ~[flink-dist-1.16.0.jar:1.16.0]


Re: K8s operator - Stop Job with savepoint on session cluster via Java API

2023-08-31 Thread Shammon FY
Hi Krzysztof,

For the flink session cluster, you can stop the job with savepoint through
the statement `STOP JOB '{Your job id}' WITH SAVEPOINT;`. You can refer to
[1] for more information about how to do it in sql client and you can also
create a table environment to perform the statement in your application.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#terminating-a-job

Best,
Shammon FY

On Fri, Sep 1, 2023 at 6:35 AM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Hi community,
> I would like to ask what is the recommended way to stop Flink job with
> save point on a session cluster via k8s operator Java API?
>
> Currently I'm doing this by setting savepointTriggerNonce on JobSpec
> object.
> However I've noticed that this works only if I do not include Job state
> change in that spec.
>
> In other words when I submit JobSpec that has state change from Running to
> Suspend and savepointTriggerNonce, the checkpoint is not created. Is that
> intended?
> In order to mimic [1] do I have to submit two JobSpec updates? One with
> savepointNonce and the second one with Job state change?
>
> A followup question, what kind of savepoint is triggered when using
> savepointTriggerNonce native or canonical? Also is there a way to pass
> --drain option or savepoint path via spec? (Not
> including state.savepoints.dir cluster config option)
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#stopping-a-job-with-savepoint
>
> Thanks,
> Krzysztof Chmielewski
>


K8s operator - Stop Job with savepoint on session cluster via Java API

2023-08-31 Thread Krzysztof Chmielewski
Hi community,
I would like to ask what is the recommended way to stop Flink job with save
point on a session cluster via k8s operator Java API?

Currently I'm doing this by setting savepointTriggerNonce on JobSpec object.
However I've noticed that this works only if I do not include Job state
change in that spec.

In other words when I submit JobSpec that has state change from Running to
Suspend and savepointTriggerNonce, the checkpoint is not created. Is that
intended?
In order to mimic [1] do I have to submit two JobSpec updates? One with
savepointNonce and the second one with Job state change?

A followup question, what kind of savepoint is triggered when using
savepointTriggerNonce native or canonical? Also is there a way to pass
--drain option or savepoint path via spec? (Not
including state.savepoints.dir cluster config option)

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#stopping-a-job-with-savepoint

Thanks,
Krzysztof Chmielewski


Re: Rate Limit / Throttle Data to Send

2023-08-31 Thread patricia lee
Hi,


Thanks for the info.

I updated my code implementation. I am producing a list in my
custom processAllWindowFunction() before it goes to my customSink. The
problem I get is the list doesnt get reset/cleared.

inputStream.
windowAll(TumblingProcessingTimeWindows.of(Time.of(Time.seconds(10)))
.trigger(CountTrigger.of(maxNo))
.process(new CustomProcessAllWindowFunc());




CustomProcessAllWindowFunc's code:

eventList = new ArrayList<>();
If (rateConfig === eventList.size()) {
collector.collect(eventList);

//clear the size since I need to send only 100 elements
eventList.clear();
}

However, even I cleared the eventList, the elements that went to Sink
previously are still there and the elements just keep adding to the
eventList with the previous elements

How can I correctly cleared the list in processWindowFunction?




On Wed, Aug 30, 2023, 3:54 PM Schwalbe Matthias 
wrote:

> Hi Patricia,
>
>
>
> What you try to implement can be achieved out-of-the-box by windowing.
>
>
>
> I assume these packets of 100 event are by key but globally.
>
> In that case use non-keyed windowing [1] with count trigger (100) [3] and
> maybe add a processing time trigger if it takes too long time to collect
> all 100 events, then create the output with a process window function [2].
>
>
>
> I hope this helps
>
>
>
> Thias
>
>
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#keyed-vs-non-keyed-windows
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#processwindowfunction
>
> [3]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/windows/#built-in-and-custom-triggers
>
>
>
>
>
> *From:* patricia lee 
> *Sent:* Wednesday, August 30, 2023 6:54 AM
> *To:* user@flink.apache.org
> *Subject:* Rate Limit / Throttle Data to Send
>
>
>
> Hi,
>
>
>
> I have a requirement that I need to send data to a third party with a
> limit number of elements with flow below.
>
>
>
> kafkasource
>
> mapToVendorPojo
>
> processfunction
>
> sinkToVendor
>
>
>
> My implementation is I continuously add the elements to my list state
>
> ListState in ProcessFunction and once it reaches 100 in size I
> emit the data and start collecting data again to another set of 100.
>
>
>
> *if (rateConfig == Iterables.size(appEventState.get()) {*
>
> *List holder = new ArrayList();*
>
> *appEventState.get().forEach(e -> holder.add(e));*
>
> *collector.collect(holder);*
>
> *appEventState.clear()*
>
> *}*
>
>
>
> The problem I am getting is, *"if " condition above never gets matched*.
> Because the appEventState size is always *0 or 1 only*. The rateConfig is
> set to *20. *
>
>
>
> What am I missing?
>
>
>
> Thanks,
>
> Patricia
>
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


Could not retrieve JobResults of globally-terminated jobs from JobResultStore

2023-08-31 Thread denghaibin
flink-1.16.0任务运行一段时间后,大批量任务失败。错误日志如下。麻烦大佬看下是什么问题
java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of 
globally-terminated jobs from JobResultStore
 at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 ~[?:1.8.0_382]
 at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
 ~[?:1.8.0_382]
 at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
 ~[?:1.8.0_382]
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_382]
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_382]
 at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve 
JobResults of globally-terminated jobs from JobResultStore
 at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:196)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 ~[?:1.8.0_382]
 ... 3 more
Caused by: 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
 No content to map due to end-of-input
 at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream); line: 1, 
column: 0]
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
 ~[flink-dist-1.16.0.jar:1.16.0]
 at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
 ~[flink-dist-1.16.0.jar:1.16.0]

Re: flink k8s operator - problem with patching seession cluster

2023-08-31 Thread Gyula Fóra
There is no effect of the replicas setting in native mode. Native session
clusters are "elastic", the number of task managers are determined on the
fly based on the job requirements.

Gyula

On Thu, Aug 31, 2023 at 11:19 AM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Thank you for the response.
>
> Yes currently in my PoC I'm using standalone integration.
> Does ` spec.taskManager.replicas` has any effect when using native mode?
>
> The reason I'm asking is that I need to know what is the "cacpacity" of
> particular session cluster before I will submit the job into it.
> And the way how I was doing this was this:
>
> try (KubernetesClient kubernetesClient = new
> KubernetesClientBuilder().build()) {
> MixedOperation KubernetesResourceList,
> io.fabric8.kubernetes.client.dsl.Resource>
> resources =
> kubernetesClient.resources(FlinkDeployment.class);
>
> List items =
> resources.inNamespace("default").list().getItems();
> for (FlinkDeployment item : items) {
> System.out.println("Flink Deployments: " + item);
> System.out.println("Number of TM replicas: " +
> item.getSpec().getTaskManager().getReplicas());
> }
> }
>
>
> Thanks,
> Krzysztof
>
> czw., 31 sie 2023 o 10:44 Gyula Fóra  napisał(a):
>
>> I guess your question is in the context of the standalone integration
>> because native session deployments automatically add TMs on the fly as more
>> are necessary.
>>
>> For standalone mode you should be able to configure
>> `spec.taskManager.replicas` and if I understand correctly that will not
>> shut down the running jobs.
>> If you have problems please share your FlinkDeployment yaml and the
>> operator logs in a JIRA ticket.
>>
>> In any case the native mode is probably better fit for your use-case.
>>
>> Gyula
>>
>> On Thu, Aug 31, 2023 at 2:42 AM Krzysztof Chmielewski <
>> krzysiek.chmielew...@gmail.com> wrote:
>>
>>> Just want to broth this up in case it was missed in the other
>>> messages/queries :)
>>>
>>> TL:DR
>>> How to add TM to Flink Session cluster via Java K8s client if Session
>>> Cluster has running jobs?
>>>
>>> Thanks,
>>> Krzysztof
>>>
>>> pt., 25 sie 2023 o 23:48 Krzysztof Chmielewski <
>>> krzysiek.chmielew...@gmail.com> napisał(a):
>>>
 Hi community,
 I have a use case where I would like to add an extra TM) to a running
 Flink session cluster that has Flink jobs deployed. Session cluster
 creation, job submission and cluster patching is done using flink k8s
 operator Java API. The Details of this are presented here [1]

 I would like to ask, what is a recommended path to add a TM to existing
 Session Cluster that currently runs number of Flink jobs using Java API.
 For simplicity lets assume that I dont want to resume jobs from a
 savepoint, just redeploy them.

 When executing steps from [1] I'm facing an issue where Session jobs
 are not redeployed on patched Session cluster however kubectl shows that
 there is FlinkSessionJob subbmited to the k8s.

 Additionally when I'm trying to delete FlinkSessionJob from kubectl,
 Flink k8s operator throws an exception described in [1]. In fact the state
 of that Flink deployment requires few steps to clean it up after that
 patch.


 [1]
 https://github.com/kristoffSC/FlinkK8sControllerPoC/blob/ExceptionInOperator-clusterRestart/doc/K8sException_1.MD

>>>


Re: flink k8s operator - problem with patching seession cluster

2023-08-31 Thread Krzysztof Chmielewski
Thank you for the response.

Yes currently in my PoC I'm using standalone integration.
Does ` spec.taskManager.replicas` has any effect when using native mode?

The reason I'm asking is that I need to know what is the "cacpacity" of
particular session cluster before I will submit the job into it.
And the way how I was doing this was this:

try (KubernetesClient kubernetesClient = new
KubernetesClientBuilder().build()) {
MixedOperation,
io.fabric8.kubernetes.client.dsl.Resource>
resources =
kubernetesClient.resources(FlinkDeployment.class);

List items =
resources.inNamespace("default").list().getItems();
for (FlinkDeployment item : items) {
System.out.println("Flink Deployments: " + item);
System.out.println("Number of TM replicas: " +
item.getSpec().getTaskManager().getReplicas());
}
}


Thanks,
Krzysztof

czw., 31 sie 2023 o 10:44 Gyula Fóra  napisał(a):

> I guess your question is in the context of the standalone integration
> because native session deployments automatically add TMs on the fly as more
> are necessary.
>
> For standalone mode you should be able to configure
> `spec.taskManager.replicas` and if I understand correctly that will not
> shut down the running jobs.
> If you have problems please share your FlinkDeployment yaml and the
> operator logs in a JIRA ticket.
>
> In any case the native mode is probably better fit for your use-case.
>
> Gyula
>
> On Thu, Aug 31, 2023 at 2:42 AM Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> wrote:
>
>> Just want to broth this up in case it was missed in the other
>> messages/queries :)
>>
>> TL:DR
>> How to add TM to Flink Session cluster via Java K8s client if Session
>> Cluster has running jobs?
>>
>> Thanks,
>> Krzysztof
>>
>> pt., 25 sie 2023 o 23:48 Krzysztof Chmielewski <
>> krzysiek.chmielew...@gmail.com> napisał(a):
>>
>>> Hi community,
>>> I have a use case where I would like to add an extra TM) to a running
>>> Flink session cluster that has Flink jobs deployed. Session cluster
>>> creation, job submission and cluster patching is done using flink k8s
>>> operator Java API. The Details of this are presented here [1]
>>>
>>> I would like to ask, what is a recommended path to add a TM to existing
>>> Session Cluster that currently runs number of Flink jobs using Java API.
>>> For simplicity lets assume that I dont want to resume jobs from a
>>> savepoint, just redeploy them.
>>>
>>> When executing steps from [1] I'm facing an issue where Session jobs are
>>> not redeployed on patched Session cluster however kubectl shows that there
>>> is FlinkSessionJob subbmited to the k8s.
>>>
>>> Additionally when I'm trying to delete FlinkSessionJob from kubectl,
>>> Flink k8s operator throws an exception described in [1]. In fact the state
>>> of that Flink deployment requires few steps to clean it up after that
>>> patch.
>>>
>>>
>>> [1]
>>> https://github.com/kristoffSC/FlinkK8sControllerPoC/blob/ExceptionInOperator-clusterRestart/doc/K8sException_1.MD
>>>
>>


Re: flink k8s operator - problem with patching seession cluster

2023-08-31 Thread Gyula Fóra
I guess your question is in the context of the standalone integration
because native session deployments automatically add TMs on the fly as more
are necessary.

For standalone mode you should be able to configure
`spec.taskManager.replicas` and if I understand correctly that will not
shut down the running jobs.
If you have problems please share your FlinkDeployment yaml and the
operator logs in a JIRA ticket.

In any case the native mode is probably better fit for your use-case.

Gyula

On Thu, Aug 31, 2023 at 2:42 AM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Just want to broth this up in case it was missed in the other
> messages/queries :)
>
> TL:DR
> How to add TM to Flink Session cluster via Java K8s client if Session
> Cluster has running jobs?
>
> Thanks,
> Krzysztof
>
> pt., 25 sie 2023 o 23:48 Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> napisał(a):
>
>> Hi community,
>> I have a use case where I would like to add an extra TM) to a running
>> Flink session cluster that has Flink jobs deployed. Session cluster
>> creation, job submission and cluster patching is done using flink k8s
>> operator Java API. The Details of this are presented here [1]
>>
>> I would like to ask, what is a recommended path to add a TM to existing
>> Session Cluster that currently runs number of Flink jobs using Java API.
>> For simplicity lets assume that I dont want to resume jobs from a
>> savepoint, just redeploy them.
>>
>> When executing steps from [1] I'm facing an issue where Session jobs are
>> not redeployed on patched Session cluster however kubectl shows that there
>> is FlinkSessionJob subbmited to the k8s.
>>
>> Additionally when I'm trying to delete FlinkSessionJob from kubectl,
>> Flink k8s operator throws an exception described in [1]. In fact the state
>> of that Flink deployment requires few steps to clean it up after that
>> patch.
>>
>>
>> [1]
>> https://github.com/kristoffSC/FlinkK8sControllerPoC/blob/ExceptionInOperator-clusterRestart/doc/K8sException_1.MD
>>
>


Re: Blue green deployment with Flink Apache Operator

2023-08-31 Thread Nicolas Fraison via user
Definitely our intent is to start with an in house specific Blue Green
operator and once we will reach some level of confidence we will open a
FLIP to discuss it.

Nicolas

On Thu, Aug 31, 2023 at 10:12 AM Gyula Fóra  wrote:

> The main concern as we discussed in previous mailing list threads before
> is the general applicability of such solution:
>
>  - Many production jobs cannot really afford running in parallel (starting
> the second job while the first one is running), due to data
> consistency/duplications reasons
>  - Exactly once sinks do not really support this
>
> So I think we should start with this maybe as an independent effort /
> external library and if we see that it works we could discuss it in a FLIP.
>
> What do you think?
> Gyula
>
> On Thu, Aug 31, 2023 at 9:23 AM Nicolas Fraison <
> nicolas.frai...@datadoghq.com> wrote:
>
>> Thanks Gyula for your feedback.
>>
>> We were also thinking of relying on such a solution, creating a dedicated
>> crd/operator to manage this BlueGreenFlinkDeployment.
>> Good to hear that it could be incorporated later in the operator.
>>
>> Will let you know once we will have something to share with you.
>>
>> Nicolas
>>
>> On Wed, Aug 30, 2023 at 4:28 PM Gyula Fóra  wrote:
>>
>>> Hey!
>>>
>>> I don't know if anyone has implemented this or not but one way to
>>> approach this problem (and this may not be the right way, just an idea :) )
>>> is to add a new Custom Resource type that sits on top of the
>>> FlinkDeployment / FlinkSessionJob resources and add a small controller for
>>> this.
>>>
>>> This new custom resource, BlueGreenDeployment, would be somewhat similar
>>> to how a Replicaset vs Pod works in Kubernetes. It would create a new
>>> FlinkDeployment and would delete the old one once the new reached a healthy
>>> running state.
>>>
>>> Adding a new CR allows us to not overcomplicate the existing
>>> resource/controller loop but simply leverage it. If you prototype something
>>> along these lines, please feel free to share and then we can discuss if we
>>> want to incorporate something like this in the operator repo in the future
>>> :)
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Wed, Aug 30, 2023 at 1:21 PM Nicolas Fraison via user <
>>> user@flink.apache.org> wrote:
>>>
 Hi,

 From https://issues.apache.org/jira/browse/FLINK-29199 it seems that
 support for blue green deployment will not be supported or will not happen
 soon.

 I'd like to know if some of you have built a custom mechanism on top of
 this operator to support the blue green deployment and if you would have
 any advice on implementing this?

 --

 Nicolas Fraison (he/him)

>>>


Re: Blue green deployment with Flink Apache Operator

2023-08-31 Thread Gyula Fóra
The main concern as we discussed in previous mailing list threads before is
the general applicability of such solution:

 - Many production jobs cannot really afford running in parallel (starting
the second job while the first one is running), due to data
consistency/duplications reasons
 - Exactly once sinks do not really support this

So I think we should start with this maybe as an independent effort /
external library and if we see that it works we could discuss it in a FLIP.

What do you think?
Gyula

On Thu, Aug 31, 2023 at 9:23 AM Nicolas Fraison <
nicolas.frai...@datadoghq.com> wrote:

> Thanks Gyula for your feedback.
>
> We were also thinking of relying on such a solution, creating a dedicated
> crd/operator to manage this BlueGreenFlinkDeployment.
> Good to hear that it could be incorporated later in the operator.
>
> Will let you know once we will have something to share with you.
>
> Nicolas
>
> On Wed, Aug 30, 2023 at 4:28 PM Gyula Fóra  wrote:
>
>> Hey!
>>
>> I don't know if anyone has implemented this or not but one way to
>> approach this problem (and this may not be the right way, just an idea :) )
>> is to add a new Custom Resource type that sits on top of the
>> FlinkDeployment / FlinkSessionJob resources and add a small controller for
>> this.
>>
>> This new custom resource, BlueGreenDeployment, would be somewhat similar
>> to how a Replicaset vs Pod works in Kubernetes. It would create a new
>> FlinkDeployment and would delete the old one once the new reached a healthy
>> running state.
>>
>> Adding a new CR allows us to not overcomplicate the existing
>> resource/controller loop but simply leverage it. If you prototype something
>> along these lines, please feel free to share and then we can discuss if we
>> want to incorporate something like this in the operator repo in the future
>> :)
>>
>> Cheers,
>> Gyula
>>
>> On Wed, Aug 30, 2023 at 1:21 PM Nicolas Fraison via user <
>> user@flink.apache.org> wrote:
>>
>>> Hi,
>>>
>>> From https://issues.apache.org/jira/browse/FLINK-29199 it seems that
>>> support for blue green deployment will not be supported or will not happen
>>> soon.
>>>
>>> I'd like to know if some of you have built a custom mechanism on top of
>>> this operator to support the blue green deployment and if you would have
>>> any advice on implementing this?
>>>
>>> --
>>>
>>> Nicolas Fraison (he/him)
>>>
>>


Re: Blue green deployment with Flink Apache Operator

2023-08-31 Thread Nicolas Fraison via user
Thanks Gyula for your feedback.

We were also thinking of relying on such a solution, creating a dedicated
crd/operator to manage this BlueGreenFlinkDeployment.
Good to hear that it could be incorporated later in the operator.

Will let you know once we will have something to share with you.

Nicolas

On Wed, Aug 30, 2023 at 4:28 PM Gyula Fóra  wrote:

> Hey!
>
> I don't know if anyone has implemented this or not but one way to approach
> this problem (and this may not be the right way, just an idea :) ) is to
> add a new Custom Resource type that sits on top of the FlinkDeployment /
> FlinkSessionJob resources and add a small controller for this.
>
> This new custom resource, BlueGreenDeployment, would be somewhat similar
> to how a Replicaset vs Pod works in Kubernetes. It would create a new
> FlinkDeployment and would delete the old one once the new reached a healthy
> running state.
>
> Adding a new CR allows us to not overcomplicate the existing
> resource/controller loop but simply leverage it. If you prototype something
> along these lines, please feel free to share and then we can discuss if we
> want to incorporate something like this in the operator repo in the future
> :)
>
> Cheers,
> Gyula
>
> On Wed, Aug 30, 2023 at 1:21 PM Nicolas Fraison via user <
> user@flink.apache.org> wrote:
>
>> Hi,
>>
>> From https://issues.apache.org/jira/browse/FLINK-29199 it seems that
>> support for blue green deployment will not be supported or will not happen
>> soon.
>>
>> I'd like to know if some of you have built a custom mechanism on top of
>> this operator to support the blue green deployment and if you would have
>> any advice on implementing this?
>>
>> --
>>
>> Nicolas Fraison (he/him)
>>
>