Re: [Question] How to scale application based on 'reactive' mode

2023-09-01 Thread Dennis Jung
Hello,
Thanks for your notice.

1. In "Flink 1.18 + non-reactive", is parallelism being changed by the
number of TM?
2. In the document(
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.6/docs/custom-resource/autoscaler/),
it said "we are not using any container memory / CPU utilization metrics
directly here". Which metrics are these using internally?
3. I'm using standalone k8s(
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/)
for deployment. Is autoscaler features only available by using the "flink
k8s operator"(sorry I don't understand this clearly yet...)?

Regards


2023년 9월 1일 (금) 오후 10:20, Gyula Fóra 님이 작성:

> Pretty much, except that with Flink 1.18 autoscaler can scale the job in
> place without restarting the JM (even without reactive mode )
>
> So actually best option is autoscaler with Flink 1.18 native mode (no
> reactive)
>
> Gyula
>
> On Fri, 1 Sep 2023 at 13:54, Dennis Jung  wrote:
>
>> Thanks for feedback.
>> Could you check whether I understand correctly?
>>
>> *Only using 'reactive' mode:*
>> By manually adding TaskManager(TM) (such as using './bin/taskmanager.sh
>> start'), parallelism will be increased. For example, when job parallelism
>> is 1 and TM is 1, and if adding 1 new TM, JobManager will be restarted and
>> parallelism will be 2.
>> But the number of TM is not being controlled automatically.
>>
>> *Autoscaler + non-reactive:*
>> It can flexibilly control the number of TM by several metrics(CPU usage,
>> throughput, ...), and JobManager will be restarted when scaling. But job
>> parallelism is the same after the number of TM has been changed.
>>
>> *Autoscaler + 'reactive' mode*:
>> It can control numbers of TM by metric, and increase/decrease job
>> parallelism by changing TM.
>>
>> Regards,
>> Jung
>>
>> 2023년 9월 1일 (금) 오후 8:16, Gyula Fóra 님이 작성:
>>
>>> I would look at reactive scaling as a way to increase / decrease
>>> parallelism.
>>>
>>> It’s not a way to automatically decide when to actually do it as you
>>> need to create new TMs .
>>>
>>> The autoscaler could use reactive mode to change the parallelism but you
>>> need the autoscaler itself to decide when new resources should be added
>>>
>>> On Fri, 1 Sep 2023 at 13:09, Dennis Jung  wrote:
>>>
 For now, the thing I've found about 'reactive' mode is that it
 automatically adjusts 'job parallelism' when TaskManager is
 increased/decreased.


 https://www.slideshare.net/FlinkForward/autoscaling-flink-with-reactive-mode

 Is there some other feature that only 'reactive' mode offers for
 scaling?

 Thanks.
 Regards.



 2023년 9월 1일 (금) 오후 4:56, Dennis Jung 님이 작성:

> Hello,
> Thank you for your response. I have few more questions in following:
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/
>
> *Reactive Mode configures a job so that it always uses all resources
> available in the cluster. Adding a TaskManager will scale up your job,
> removing resources will scale it down. Flink will manage the parallelism 
> of
> the job, always setting it to the highest possible values.*
> => Does this mean when I add/remove TaskManager in 'non-reactive'
> mode, resource(CPU/Memory/Etc.) of the cluster is not being changed?
>
> *Reactive Mode restarts a job on a rescaling event, restoring it from
> the latest completed checkpoint. This means that there is no overhead of
> creating a savepoint (which is needed for manually rescaling a job). Also,
> the amount of data that is reprocessed after rescaling depends on the
> checkpointing interval, and the restore time depends on the state size.*
> => As I know 'rescaling' also works in non-reactive mode, with
> restoring checkpoint. What is the difference of using 'reactive' here?
>
> *The Reactive Mode allows Flink users to implement a powerful
> autoscaling mechanism, by having an external service monitor certain
> metrics, such as consumer lag, aggregate CPU utilization, throughput or
> latency. As soon as these metrics are above or below a certain threshold,
> additional TaskManagers can be added or removed from the Flink cluster.*
> => Why is this only possible in 'reactive' mode? Seems this is more
> related to 'autoscaler'. Are there some specific features/API which can
> control TaskManager/Parallelism only in 'reactive' mode?
>
> Thank you.
>
> 2023년 9월 1일 (금) 오후 3:30, Gyula Fóra 님이 작성:
>
>> The reactive mode reacts to available resources. The autoscaler
>> reacts to changing load and processing capacity and adjusts resources.
>>
>> Completely different concepts and applicability.
>> Most people want the autoscaler , but this is a recent feature and is
>> specific to the k8s operator at the moment.
>>
>> Gyula
>>

Re: Job graph

2023-09-01 Thread David Anderson
This may or may not help, but you can get the execution plan from
inside the client, by doing something like this (I printed the plan to
stderr):

...
System.err.println(env.getExecutionPlan());
env.execute("my job");

The result is a JSON-encoded representation of the job graph, which
for the simple example I just tried it with, produced this output:

{
  "nodes" : [ {
"id" : 1,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 10
  }, {
"id" : 3,
"type" : "Sink: Writer",
"pact" : "Operator",
"contents" : "Sink: Writer",
"parallelism" : 10,
"predecessors" : [ {
  "id" : 1,
  "ship_strategy" : "FORWARD",
  "side" : "second"
} ]
  }, {
"id" : 5,
"type" : "Sink: Committer",
"pact" : "Operator",
"contents" : "Sink: Committer",
"parallelism" : 10,
"predecessors" : [ {
  "id" : 3,
  "ship_strategy" : "FORWARD",
  "side" : "second"
} ]
  } ]
}

On Wed, Aug 30, 2023 at 7:01 AM Nikolaos Paraskakis
 wrote:
>
> Hello folks,
>
> I am trying to get the job graph of a running flink job. I want to use flink 
> libraries. For now, I have the RestClusterClient and the job IDs. Tell me 
> please how to get the job graph.
>
> Thank you.


Re: Blue green deployment with Flink Apache Operator

2023-09-01 Thread David Anderson
Back in 2020, there was a Flink Forward talk [1] about how Lyft was
doing blue green deployments. Earlier (all the way back in 2017)
Drivetribe described [2] how they were doing so as well.

David

[1] https://www.youtube.com/watch?v=Hyt3YrtKQAM
[2] https://www.ververica.com/blog/drivetribe-cqrs-apache-flink

On Thu, Aug 31, 2023 at 1:21 AM Nicolas Fraison via user
 wrote:
>
> Definitely our intent is to start with an in house specific Blue Green 
> operator and once we will reach some level of confidence we will open a FLIP 
> to discuss it.
>
> Nicolas
>
> On Thu, Aug 31, 2023 at 10:12 AM Gyula Fóra  wrote:
>>
>> The main concern as we discussed in previous mailing list threads before is 
>> the general applicability of such solution:
>>
>>  - Many production jobs cannot really afford running in parallel (starting 
>> the second job while the first one is running), due to data 
>> consistency/duplications reasons
>>  - Exactly once sinks do not really support this
>>
>> So I think we should start with this maybe as an independent effort / 
>> external library and if we see that it works we could discuss it in a FLIP.
>>
>> What do you think?
>> Gyula
>>
>> On Thu, Aug 31, 2023 at 9:23 AM Nicolas Fraison 
>>  wrote:
>>>
>>> Thanks Gyula for your feedback.
>>>
>>> We were also thinking of relying on such a solution, creating a dedicated 
>>> crd/operator to manage this BlueGreenFlinkDeployment.
>>> Good to hear that it could be incorporated later in the operator.
>>>
>>> Will let you know once we will have something to share with you.
>>>
>>> Nicolas
>>>
>>> On Wed, Aug 30, 2023 at 4:28 PM Gyula Fóra  wrote:

 Hey!

 I don't know if anyone has implemented this or not but one way to approach 
 this problem (and this may not be the right way, just an idea :) ) is to 
 add a new Custom Resource type that sits on top of the FlinkDeployment / 
 FlinkSessionJob resources and add a small controller for this.

 This new custom resource, BlueGreenDeployment, would be somewhat similar 
 to how a Replicaset vs Pod works in Kubernetes. It would create a new 
 FlinkDeployment and would delete the old one once the new reached a 
 healthy running state.

 Adding a new CR allows us to not overcomplicate the existing 
 resource/controller loop but simply leverage it. If you prototype 
 something along these lines, please feel free to share and then we can 
 discuss if we want to incorporate something like this in the operator repo 
 in the future :)

 Cheers,
 Gyula

 On Wed, Aug 30, 2023 at 1:21 PM Nicolas Fraison via user 
  wrote:
>
> Hi,
>
> From https://issues.apache.org/jira/browse/FLINK-29199 it seems that 
> support for blue green deployment will not be supported or will not 
> happen soon.
>
> I'd like to know if some of you have built a custom mechanism on top of 
> this operator to support the blue green deployment and if you would have 
> any advice on implementing this?
>
> --
>
> Nicolas Fraison (he/him)


Re: Flink local mini cluster is causing memory leak when triggered multiple times

2023-09-01 Thread Bashir Sadjad via user
An update: The extra memory that is being acquired/kept after each pipeline
run seems to be coming from off-heap space, especially using
`Unsafe.allocateMemory`. I have added some notes here

but
the TL;DR; is that adding a GC hint at the end of each pipeline run
mitigates the issue significantly. The idea of adding the GC hint came from
FLINK-15758  so I am not
sure if that is still a problem. Our code is open-source and the bandaid
fix we have added is this PR
 (the `System.gc()` is
the main relevant part).

Any insights from someone who knows the internals of Flink is appreciated.

-B

On Wed, Aug 30, 2023 at 3:47 AM Chandrashekar Sankarapu <
sankar...@google.com> wrote:

> Hi Team,
>
> We have a data pipeline which is built using Apache Beam SDK
>  and we use Apache Flink Runner
>  to execute Beam
> pipelines. We use the local embedded execution mode of Flink for running
> the pipelines.
>
> Currently, we are running into an issue where in the batch pipeline can be
> triggered multiple times and each time the pipeline is triggered the Flink
> creates a miniCluster in local execution mode, runs the job and destroys
> the miniCluster once the job is completed. When the batch job is triggered
> multiple times we observe the application process Resident Set Size(RSS)
> memory keeps increasing (approximately increases by a value set for
> parameter 'taskmanager.memory.network.max') with each run and is not
> released, eventually leading to crash of the docker container (container
> memory is limited) in which this is deployed. However, when we checked the
> JVM memory using tools like JConsole, jcmd etc does not show any increase
> and is always with the Xmx value set.
>
> We analysed the heap dump of the application but it didn't show any memory
> leaks.
>
> Has anyone faced this issue? Any pointers are appreciated.
>
> Thanks,
> Chandra
>


Re: [Question] How to scale application based on 'reactive' mode

2023-09-01 Thread Gyula Fóra
Pretty much, except that with Flink 1.18 autoscaler can scale the job in
place without restarting the JM (even without reactive mode )

So actually best option is autoscaler with Flink 1.18 native mode (no
reactive)

Gyula

On Fri, 1 Sep 2023 at 13:54, Dennis Jung  wrote:

> Thanks for feedback.
> Could you check whether I understand correctly?
>
> *Only using 'reactive' mode:*
> By manually adding TaskManager(TM) (such as using './bin/taskmanager.sh
> start'), parallelism will be increased. For example, when job parallelism
> is 1 and TM is 1, and if adding 1 new TM, JobManager will be restarted and
> parallelism will be 2.
> But the number of TM is not being controlled automatically.
>
> *Autoscaler + non-reactive:*
> It can flexibilly control the number of TM by several metrics(CPU usage,
> throughput, ...), and JobManager will be restarted when scaling. But job
> parallelism is the same after the number of TM has been changed.
>
> *Autoscaler + 'reactive' mode*:
> It can control numbers of TM by metric, and increase/decrease job
> parallelism by changing TM.
>
> Regards,
> Jung
>
> 2023년 9월 1일 (금) 오후 8:16, Gyula Fóra 님이 작성:
>
>> I would look at reactive scaling as a way to increase / decrease
>> parallelism.
>>
>> It’s not a way to automatically decide when to actually do it as you need
>> to create new TMs .
>>
>> The autoscaler could use reactive mode to change the parallelism but you
>> need the autoscaler itself to decide when new resources should be added
>>
>> On Fri, 1 Sep 2023 at 13:09, Dennis Jung  wrote:
>>
>>> For now, the thing I've found about 'reactive' mode is that it
>>> automatically adjusts 'job parallelism' when TaskManager is
>>> increased/decreased.
>>>
>>>
>>> https://www.slideshare.net/FlinkForward/autoscaling-flink-with-reactive-mode
>>>
>>> Is there some other feature that only 'reactive' mode offers for scaling?
>>>
>>> Thanks.
>>> Regards.
>>>
>>>
>>>
>>> 2023년 9월 1일 (금) 오후 4:56, Dennis Jung 님이 작성:
>>>
 Hello,
 Thank you for your response. I have few more questions in following:
 https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/

 *Reactive Mode configures a job so that it always uses all resources
 available in the cluster. Adding a TaskManager will scale up your job,
 removing resources will scale it down. Flink will manage the parallelism of
 the job, always setting it to the highest possible values.*
 => Does this mean when I add/remove TaskManager in 'non-reactive' mode,
 resource(CPU/Memory/Etc.) of the cluster is not being changed?

 *Reactive Mode restarts a job on a rescaling event, restoring it from
 the latest completed checkpoint. This means that there is no overhead of
 creating a savepoint (which is needed for manually rescaling a job). Also,
 the amount of data that is reprocessed after rescaling depends on the
 checkpointing interval, and the restore time depends on the state size.*
 => As I know 'rescaling' also works in non-reactive mode, with
 restoring checkpoint. What is the difference of using 'reactive' here?

 *The Reactive Mode allows Flink users to implement a powerful
 autoscaling mechanism, by having an external service monitor certain
 metrics, such as consumer lag, aggregate CPU utilization, throughput or
 latency. As soon as these metrics are above or below a certain threshold,
 additional TaskManagers can be added or removed from the Flink cluster.*
 => Why is this only possible in 'reactive' mode? Seems this is more
 related to 'autoscaler'. Are there some specific features/API which can
 control TaskManager/Parallelism only in 'reactive' mode?

 Thank you.

 2023년 9월 1일 (금) 오후 3:30, Gyula Fóra 님이 작성:

> The reactive mode reacts to available resources. The autoscaler reacts
> to changing load and processing capacity and adjusts resources.
>
> Completely different concepts and applicability.
> Most people want the autoscaler , but this is a recent feature and is
> specific to the k8s operator at the moment.
>
> Gyula
>
> On Fri, 1 Sep 2023 at 04:50, Dennis Jung  wrote:
>
>> Hello,
>> Thanks for your notice.
>>
>> Than what is the purpose of using 'reactive', if this doesn't do
>> anything itself?
>> What is the difference if I use auto-scaler without 'reactive' mode?
>>
>> Regards,
>> Jung
>>
>>
>>
>> 2023년 8월 18일 (금) 오후 7:51, Gyula Fóra 님이 작성:
>>
>>> Hi!
>>>
>>> I think what you need is probably not the reactive mode but a proper
>>> autoscaler. The reactive mode as you say doesn't do anything in itself, 
>>> you
>>> need to build a lot of logic around it.
>>>
>>> Check this instead:
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/
>>>
>>> The Kubernetes Operator has a built 

Re: [Question] How to scale application based on 'reactive' mode

2023-09-01 Thread Dennis Jung
Thanks for feedback.
Could you check whether I understand correctly?

*Only using 'reactive' mode:*
By manually adding TaskManager(TM) (such as using './bin/taskmanager.sh
start'), parallelism will be increased. For example, when job parallelism
is 1 and TM is 1, and if adding 1 new TM, JobManager will be restarted and
parallelism will be 2.
But the number of TM is not being controlled automatically.

*Autoscaler + non-reactive:*
It can flexibilly control the number of TM by several metrics(CPU usage,
throughput, ...), and JobManager will be restarted when scaling. But job
parallelism is the same after the number of TM has been changed.

*Autoscaler + 'reactive' mode*:
It can control numbers of TM by metric, and increase/decrease job
parallelism by changing TM.

Regards,
Jung

2023년 9월 1일 (금) 오후 8:16, Gyula Fóra 님이 작성:

> I would look at reactive scaling as a way to increase / decrease
> parallelism.
>
> It’s not a way to automatically decide when to actually do it as you need
> to create new TMs .
>
> The autoscaler could use reactive mode to change the parallelism but you
> need the autoscaler itself to decide when new resources should be added
>
> On Fri, 1 Sep 2023 at 13:09, Dennis Jung  wrote:
>
>> For now, the thing I've found about 'reactive' mode is that it
>> automatically adjusts 'job parallelism' when TaskManager is
>> increased/decreased.
>>
>>
>> https://www.slideshare.net/FlinkForward/autoscaling-flink-with-reactive-mode
>>
>> Is there some other feature that only 'reactive' mode offers for scaling?
>>
>> Thanks.
>> Regards.
>>
>>
>>
>> 2023년 9월 1일 (금) 오후 4:56, Dennis Jung 님이 작성:
>>
>>> Hello,
>>> Thank you for your response. I have few more questions in following:
>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/
>>>
>>> *Reactive Mode configures a job so that it always uses all resources
>>> available in the cluster. Adding a TaskManager will scale up your job,
>>> removing resources will scale it down. Flink will manage the parallelism of
>>> the job, always setting it to the highest possible values.*
>>> => Does this mean when I add/remove TaskManager in 'non-reactive' mode,
>>> resource(CPU/Memory/Etc.) of the cluster is not being changed?
>>>
>>> *Reactive Mode restarts a job on a rescaling event, restoring it from
>>> the latest completed checkpoint. This means that there is no overhead of
>>> creating a savepoint (which is needed for manually rescaling a job). Also,
>>> the amount of data that is reprocessed after rescaling depends on the
>>> checkpointing interval, and the restore time depends on the state size.*
>>> => As I know 'rescaling' also works in non-reactive mode, with restoring
>>> checkpoint. What is the difference of using 'reactive' here?
>>>
>>> *The Reactive Mode allows Flink users to implement a powerful
>>> autoscaling mechanism, by having an external service monitor certain
>>> metrics, such as consumer lag, aggregate CPU utilization, throughput or
>>> latency. As soon as these metrics are above or below a certain threshold,
>>> additional TaskManagers can be added or removed from the Flink cluster.*
>>> => Why is this only possible in 'reactive' mode? Seems this is more
>>> related to 'autoscaler'. Are there some specific features/API which can
>>> control TaskManager/Parallelism only in 'reactive' mode?
>>>
>>> Thank you.
>>>
>>> 2023년 9월 1일 (금) 오후 3:30, Gyula Fóra 님이 작성:
>>>
 The reactive mode reacts to available resources. The autoscaler reacts
 to changing load and processing capacity and adjusts resources.

 Completely different concepts and applicability.
 Most people want the autoscaler , but this is a recent feature and is
 specific to the k8s operator at the moment.

 Gyula

 On Fri, 1 Sep 2023 at 04:50, Dennis Jung  wrote:

> Hello,
> Thanks for your notice.
>
> Than what is the purpose of using 'reactive', if this doesn't do
> anything itself?
> What is the difference if I use auto-scaler without 'reactive' mode?
>
> Regards,
> Jung
>
>
>
> 2023년 8월 18일 (금) 오후 7:51, Gyula Fóra 님이 작성:
>
>> Hi!
>>
>> I think what you need is probably not the reactive mode but a proper
>> autoscaler. The reactive mode as you say doesn't do anything in itself, 
>> you
>> need to build a lot of logic around it.
>>
>> Check this instead:
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/
>>
>> The Kubernetes Operator has a built in autoscaler that can scale jobs
>> based on kafka data rate / processing throughput. It also doesn't rely on
>> the reactive mode.
>>
>> Cheers,
>> Gyula
>>
>> On Fri, Aug 18, 2023 at 12:43 PM Dennis Jung 
>> wrote:
>>
>>> Hello,
>>> Sorry for frequent questions. This is a question about 'reactive'
>>> mode.
>>>
>>> 1. As far as I understand, though 

Re: [Question] How to scale application based on 'reactive' mode

2023-09-01 Thread Gyula Fóra
I would look at reactive scaling as a way to increase / decrease
parallelism.

It’s not a way to automatically decide when to actually do it as you need
to create new TMs .

The autoscaler could use reactive mode to change the parallelism but you
need the autoscaler itself to decide when new resources should be added

On Fri, 1 Sep 2023 at 13:09, Dennis Jung  wrote:

> For now, the thing I've found about 'reactive' mode is that it
> automatically adjusts 'job parallelism' when TaskManager is
> increased/decreased.
>
>
> https://www.slideshare.net/FlinkForward/autoscaling-flink-with-reactive-mode
>
> Is there some other feature that only 'reactive' mode offers for scaling?
>
> Thanks.
> Regards.
>
>
>
> 2023년 9월 1일 (금) 오후 4:56, Dennis Jung 님이 작성:
>
>> Hello,
>> Thank you for your response. I have few more questions in following:
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/
>>
>> *Reactive Mode configures a job so that it always uses all resources
>> available in the cluster. Adding a TaskManager will scale up your job,
>> removing resources will scale it down. Flink will manage the parallelism of
>> the job, always setting it to the highest possible values.*
>> => Does this mean when I add/remove TaskManager in 'non-reactive' mode,
>> resource(CPU/Memory/Etc.) of the cluster is not being changed?
>>
>> *Reactive Mode restarts a job on a rescaling event, restoring it from the
>> latest completed checkpoint. This means that there is no overhead of
>> creating a savepoint (which is needed for manually rescaling a job). Also,
>> the amount of data that is reprocessed after rescaling depends on the
>> checkpointing interval, and the restore time depends on the state size.*
>> => As I know 'rescaling' also works in non-reactive mode, with restoring
>> checkpoint. What is the difference of using 'reactive' here?
>>
>> *The Reactive Mode allows Flink users to implement a powerful autoscaling
>> mechanism, by having an external service monitor certain metrics, such as
>> consumer lag, aggregate CPU utilization, throughput or latency. As soon as
>> these metrics are above or below a certain threshold, additional
>> TaskManagers can be added or removed from the Flink cluster.*
>> => Why is this only possible in 'reactive' mode? Seems this is more
>> related to 'autoscaler'. Are there some specific features/API which can
>> control TaskManager/Parallelism only in 'reactive' mode?
>>
>> Thank you.
>>
>> 2023년 9월 1일 (금) 오후 3:30, Gyula Fóra 님이 작성:
>>
>>> The reactive mode reacts to available resources. The autoscaler reacts
>>> to changing load and processing capacity and adjusts resources.
>>>
>>> Completely different concepts and applicability.
>>> Most people want the autoscaler , but this is a recent feature and is
>>> specific to the k8s operator at the moment.
>>>
>>> Gyula
>>>
>>> On Fri, 1 Sep 2023 at 04:50, Dennis Jung  wrote:
>>>
 Hello,
 Thanks for your notice.

 Than what is the purpose of using 'reactive', if this doesn't do
 anything itself?
 What is the difference if I use auto-scaler without 'reactive' mode?

 Regards,
 Jung



 2023년 8월 18일 (금) 오후 7:51, Gyula Fóra 님이 작성:

> Hi!
>
> I think what you need is probably not the reactive mode but a proper
> autoscaler. The reactive mode as you say doesn't do anything in itself, 
> you
> need to build a lot of logic around it.
>
> Check this instead:
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/
>
> The Kubernetes Operator has a built in autoscaler that can scale jobs
> based on kafka data rate / processing throughput. It also doesn't rely on
> the reactive mode.
>
> Cheers,
> Gyula
>
> On Fri, Aug 18, 2023 at 12:43 PM Dennis Jung 
> wrote:
>
>> Hello,
>> Sorry for frequent questions. This is a question about 'reactive'
>> mode.
>>
>> 1. As far as I understand, though I've setup `scheduler-mode:
>> reactive`, it will not change parallelism automatically by itself, by CPU
>> usage or Kafka consumer rate. It needs additional resource monitor 
>> features
>> (such as Horizontal Pod Autoscaler, or else). Is this correct?
>> 2. Is it possible to create a custom resource monitor provider
>> application? For example, if I want to increase/decrease parallelism by
>> Kafka consumer rate, do I need to send specific API from outside, to 
>> order
>> rescaling?
>> 3. If 2 is correct, what is the difference when using 'reactive'
>> mode? Because as far as I think, calling a specific API will rescale 
>> either
>> using 'reactive' mode or not...(or is the API just working based on this
>> mode)?
>>
>> Thanks.
>>
>> Regards
>>
>>


Re: Send data asynchronously to a 3rd party via SinkFunction

2023-09-01 Thread Feng Jin
hi, patricia

I suggest using the generic asynchronous base sink.

https://flink.apache.org/2022/03/16/the-generic-asynchronous-base-sink/


Best,
Feng

On Fri, Sep 1, 2023 at 6:07 PM patricia lee  wrote:

>
> I'd like to ask if there is a way to send data to a vendor (SDK plugin,
> which is also an HTTP request) asynchronously in flink 1.17?
>
> After transformation on the data, I usually collate them as a List to my
> custom SinkFunction. I initialized a CompleteableFuture inside the invoke()
> method. However I read about the Async I/O from the documentation but I
> couldn't figure out how to use it in my use case.
>
>
> How can I close the resources initialized in SinkFunction properly? e.g.
> the job failed.
> Is using completableFuture inside SinkFunction a good approach?
>
>
> Regards,
> Pat
>
>


Re: [Question] How to scale application based on 'reactive' mode

2023-09-01 Thread Dennis Jung
For now, the thing I've found about 'reactive' mode is that it
automatically adjusts 'job parallelism' when TaskManager is
increased/decreased.

https://www.slideshare.net/FlinkForward/autoscaling-flink-with-reactive-mode

Is there some other feature that only 'reactive' mode offers for scaling?

Thanks.
Regards.



2023년 9월 1일 (금) 오후 4:56, Dennis Jung 님이 작성:

> Hello,
> Thank you for your response. I have few more questions in following:
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/
>
> *Reactive Mode configures a job so that it always uses all resources
> available in the cluster. Adding a TaskManager will scale up your job,
> removing resources will scale it down. Flink will manage the parallelism of
> the job, always setting it to the highest possible values.*
> => Does this mean when I add/remove TaskManager in 'non-reactive' mode,
> resource(CPU/Memory/Etc.) of the cluster is not being changed?
>
> *Reactive Mode restarts a job on a rescaling event, restoring it from the
> latest completed checkpoint. This means that there is no overhead of
> creating a savepoint (which is needed for manually rescaling a job). Also,
> the amount of data that is reprocessed after rescaling depends on the
> checkpointing interval, and the restore time depends on the state size.*
> => As I know 'rescaling' also works in non-reactive mode, with restoring
> checkpoint. What is the difference of using 'reactive' here?
>
> *The Reactive Mode allows Flink users to implement a powerful autoscaling
> mechanism, by having an external service monitor certain metrics, such as
> consumer lag, aggregate CPU utilization, throughput or latency. As soon as
> these metrics are above or below a certain threshold, additional
> TaskManagers can be added or removed from the Flink cluster.*
> => Why is this only possible in 'reactive' mode? Seems this is more
> related to 'autoscaler'. Are there some specific features/API which can
> control TaskManager/Parallelism only in 'reactive' mode?
>
> Thank you.
>
> 2023년 9월 1일 (금) 오후 3:30, Gyula Fóra 님이 작성:
>
>> The reactive mode reacts to available resources. The autoscaler reacts to
>> changing load and processing capacity and adjusts resources.
>>
>> Completely different concepts and applicability.
>> Most people want the autoscaler , but this is a recent feature and is
>> specific to the k8s operator at the moment.
>>
>> Gyula
>>
>> On Fri, 1 Sep 2023 at 04:50, Dennis Jung  wrote:
>>
>>> Hello,
>>> Thanks for your notice.
>>>
>>> Than what is the purpose of using 'reactive', if this doesn't do
>>> anything itself?
>>> What is the difference if I use auto-scaler without 'reactive' mode?
>>>
>>> Regards,
>>> Jung
>>>
>>>
>>>
>>> 2023년 8월 18일 (금) 오후 7:51, Gyula Fóra 님이 작성:
>>>
 Hi!

 I think what you need is probably not the reactive mode but a proper
 autoscaler. The reactive mode as you say doesn't do anything in itself, you
 need to build a lot of logic around it.

 Check this instead:
 https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/

 The Kubernetes Operator has a built in autoscaler that can scale jobs
 based on kafka data rate / processing throughput. It also doesn't rely on
 the reactive mode.

 Cheers,
 Gyula

 On Fri, Aug 18, 2023 at 12:43 PM Dennis Jung 
 wrote:

> Hello,
> Sorry for frequent questions. This is a question about 'reactive' mode.
>
> 1. As far as I understand, though I've setup `scheduler-mode:
> reactive`, it will not change parallelism automatically by itself, by CPU
> usage or Kafka consumer rate. It needs additional resource monitor 
> features
> (such as Horizontal Pod Autoscaler, or else). Is this correct?
> 2. Is it possible to create a custom resource monitor provider
> application? For example, if I want to increase/decrease parallelism by
> Kafka consumer rate, do I need to send specific API from outside, to order
> rescaling?
> 3. If 2 is correct, what is the difference when using 'reactive' mode?
> Because as far as I think, calling a specific API will rescale either 
> using
> 'reactive' mode or not...(or is the API just working based on this mode)?
>
> Thanks.
>
> Regards
>
>


Send data asynchronously to a 3rd party via SinkFunction

2023-09-01 Thread patricia lee
I'd like to ask if there is a way to send data to a vendor (SDK plugin,
which is also an HTTP request) asynchronously in flink 1.17?

After transformation on the data, I usually collate them as a List to my
custom SinkFunction. I initialized a CompleteableFuture inside the invoke()
method. However I read about the Async I/O from the documentation but I
couldn't figure out how to use it in my use case.


How can I close the resources initialized in SinkFunction properly? e.g.
the job failed.
Is using completableFuture inside SinkFunction a good approach?


Regards,
Pat


Re: [Question] How to scale application based on 'reactive' mode

2023-09-01 Thread Dennis Jung
Hello,
Thank you for your response. I have few more questions in following:
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/

*Reactive Mode configures a job so that it always uses all resources
available in the cluster. Adding a TaskManager will scale up your job,
removing resources will scale it down. Flink will manage the parallelism of
the job, always setting it to the highest possible values.*
=> Does this mean when I add/remove TaskManager in 'non-reactive' mode,
resource(CPU/Memory/Etc.) of the cluster is not being changed?

*Reactive Mode restarts a job on a rescaling event, restoring it from the
latest completed checkpoint. This means that there is no overhead of
creating a savepoint (which is needed for manually rescaling a job). Also,
the amount of data that is reprocessed after rescaling depends on the
checkpointing interval, and the restore time depends on the state size.*
=> As I know 'rescaling' also works in non-reactive mode, with restoring
checkpoint. What is the difference of using 'reactive' here?

*The Reactive Mode allows Flink users to implement a powerful autoscaling
mechanism, by having an external service monitor certain metrics, such as
consumer lag, aggregate CPU utilization, throughput or latency. As soon as
these metrics are above or below a certain threshold, additional
TaskManagers can be added or removed from the Flink cluster.*
=> Why is this only possible in 'reactive' mode? Seems this is more related
to 'autoscaler'. Are there some specific features/API which can control
TaskManager/Parallelism only in 'reactive' mode?

Thank you.

2023년 9월 1일 (금) 오후 3:30, Gyula Fóra 님이 작성:

> The reactive mode reacts to available resources. The autoscaler reacts to
> changing load and processing capacity and adjusts resources.
>
> Completely different concepts and applicability.
> Most people want the autoscaler , but this is a recent feature and is
> specific to the k8s operator at the moment.
>
> Gyula
>
> On Fri, 1 Sep 2023 at 04:50, Dennis Jung  wrote:
>
>> Hello,
>> Thanks for your notice.
>>
>> Than what is the purpose of using 'reactive', if this doesn't do anything
>> itself?
>> What is the difference if I use auto-scaler without 'reactive' mode?
>>
>> Regards,
>> Jung
>>
>>
>>
>> 2023년 8월 18일 (금) 오후 7:51, Gyula Fóra 님이 작성:
>>
>>> Hi!
>>>
>>> I think what you need is probably not the reactive mode but a proper
>>> autoscaler. The reactive mode as you say doesn't do anything in itself, you
>>> need to build a lot of logic around it.
>>>
>>> Check this instead:
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/
>>>
>>> The Kubernetes Operator has a built in autoscaler that can scale jobs
>>> based on kafka data rate / processing throughput. It also doesn't rely on
>>> the reactive mode.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Fri, Aug 18, 2023 at 12:43 PM Dennis Jung 
>>> wrote:
>>>
 Hello,
 Sorry for frequent questions. This is a question about 'reactive' mode.

 1. As far as I understand, though I've setup `scheduler-mode:
 reactive`, it will not change parallelism automatically by itself, by CPU
 usage or Kafka consumer rate. It needs additional resource monitor features
 (such as Horizontal Pod Autoscaler, or else). Is this correct?
 2. Is it possible to create a custom resource monitor provider
 application? For example, if I want to increase/decrease parallelism by
 Kafka consumer rate, do I need to send specific API from outside, to order
 rescaling?
 3. If 2 is correct, what is the difference when using 'reactive' mode?
 Because as far as I think, calling a specific API will rescale either using
 'reactive' mode or not...(or is the API just working based on this mode)?

 Thanks.

 Regards




Re: [Question] How to scale application based on 'reactive' mode

2023-09-01 Thread Gyula Fóra
The reactive mode reacts to available resources. The autoscaler reacts to
changing load and processing capacity and adjusts resources.

Completely different concepts and applicability.
Most people want the autoscaler , but this is a recent feature and is
specific to the k8s operator at the moment.

Gyula

On Fri, 1 Sep 2023 at 04:50, Dennis Jung  wrote:

> Hello,
> Thanks for your notice.
>
> Than what is the purpose of using 'reactive', if this doesn't do anything
> itself?
> What is the difference if I use auto-scaler without 'reactive' mode?
>
> Regards,
> Jung
>
>
>
> 2023년 8월 18일 (금) 오후 7:51, Gyula Fóra 님이 작성:
>
>> Hi!
>>
>> I think what you need is probably not the reactive mode but a proper
>> autoscaler. The reactive mode as you say doesn't do anything in itself, you
>> need to build a lot of logic around it.
>>
>> Check this instead:
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/
>>
>> The Kubernetes Operator has a built in autoscaler that can scale jobs
>> based on kafka data rate / processing throughput. It also doesn't rely on
>> the reactive mode.
>>
>> Cheers,
>> Gyula
>>
>> On Fri, Aug 18, 2023 at 12:43 PM Dennis Jung  wrote:
>>
>>> Hello,
>>> Sorry for frequent questions. This is a question about 'reactive' mode.
>>>
>>> 1. As far as I understand, though I've setup `scheduler-mode: reactive`,
>>> it will not change parallelism automatically by itself, by CPU usage or
>>> Kafka consumer rate. It needs additional resource monitor features (such as
>>> Horizontal Pod Autoscaler, or else). Is this correct?
>>> 2. Is it possible to create a custom resource monitor provider
>>> application? For example, if I want to increase/decrease parallelism by
>>> Kafka consumer rate, do I need to send specific API from outside, to order
>>> rescaling?
>>> 3. If 2 is correct, what is the difference when using 'reactive' mode?
>>> Because as far as I think, calling a specific API will rescale either using
>>> 'reactive' mode or not...(or is the API just working based on this mode)?
>>>
>>> Thanks.
>>>
>>> Regards
>>>
>>>


Re: K8s operator - Stop Job with savepoint on session cluster via Java API

2023-09-01 Thread Krzysztof Chmielewski
Hi thanks,
However what you have send me is sql client. I'm looking for a way to do it
via k8s operator's java Api.

pt., 1 wrz 2023, 03:58 użytkownik Shammon FY  napisał:

> Hi Krzysztof,
>
> For the flink session cluster, you can stop the job with savepoint through
> the statement `STOP JOB '{Your job id}' WITH SAVEPOINT;`. You can refer to
> [1] for more information about how to do it in sql client and you can also
> create a table environment to perform the statement in your application.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#terminating-a-job
>
> Best,
> Shammon FY
>
> On Fri, Sep 1, 2023 at 6:35 AM Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> wrote:
>
>> Hi community,
>> I would like to ask what is the recommended way to stop Flink job with
>> save point on a session cluster via k8s operator Java API?
>>
>> Currently I'm doing this by setting savepointTriggerNonce on JobSpec
>> object.
>> However I've noticed that this works only if I do not include Job state
>> change in that spec.
>>
>> In other words when I submit JobSpec that has state change from Running
>> to Suspend and savepointTriggerNonce, the checkpoint is not created. Is
>> that intended?
>> In order to mimic [1] do I have to submit two JobSpec updates? One with
>> savepointNonce and the second one with Job state change?
>>
>> A followup question, what kind of savepoint is triggered when using
>> savepointTriggerNonce native or canonical? Also is there a way to pass
>> --drain option or savepoint path via spec? (Not
>> including state.savepoints.dir cluster config option)
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#stopping-a-job-with-savepoint
>>
>> Thanks,
>> Krzysztof Chmielewski
>>
>