Re: Flink Kubernetes Operator 1.8.0 CRDs

2024-05-09 Thread Gyula Fóra
Hey!
We have not observed any issue so far, can you please share some error
information / log ?

Opening a jira ticket would be best

Thanks
Gyula

On Thu, 9 May 2024 at 21:18, Prasad, Neil 
wrote:

> I am writing to report an issue with the Flink Kubernetes Operator version
> 1.8.0. The CRD is unable to be applied or replaced in minikube or GKE.
> However, the CRD works on version 1.7.0 of the operator. I thought it would
> be helpful to bring this issue to the attention of the community and get
> some help in case someone has run into this issue before .
>
>
>
> Thank you for your attention to this matter.
>


Re: Flink scheduler keeps trying to schedule the pods indefinitely

2024-05-06 Thread Gyula Fóra
Hey!

Let me first answer your questions then provide some actual solution
hopefully :)

1. The adaptive scheduler would not reduce the vertex desired parallelism
in this case but it should allow the job to start depending on the
lower/upper bound resource config. There have been some changes in how the
k8s operator sets these resource requirements, in the latest 1.8.0 we only
set the upper bound so that the job can still start with a smaller
parallelism. So Flink ultimately will keep trying to schedule pods but
ideally the job would also start/run. I would look at the scheduler logs
(maybe debug) for more detail.

You can look at configs like:
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-wait-timeout

2. Default scheduler here refers to the Kubernetes pod scheduler not
Flink's schedulers. So this is normal

As for the solution to the problem. The thing to do is to make the
autoscaler aware of the resource limits in the first place so that we don't
scale the job too high. There has been some work on autodetecting these
limits https://issues.apache.org/jira/browse/FLINK-33771

You can set:
kubernetes.operator.cluster.resource-view.refresh-interval: 5 min

to turn this on. Alternatively a simpler approach would be to directly
limit the parallelism of the scaling decisions:
job.autoscaler.vertex.max-parallelism

Cheers,
Gyula

On Mon, May 6, 2024 at 8:09 AM Chetas Joshi  wrote:

> Hello,
>
> I am running a flink job in the application mode on k8s. It's deployed as
> a FlinkDeployment and its life-cycle is managed by the flink-k8s-operator.
> The autoscaler is being used with the following config
>
> job.autoscaler.enabled: true
> job.autoscaler.metrics.window: 5m
> job.autoscaler.stabilization.interval: 1m
> job.autoscaler.target.utilization: 0.6
> job.autoscaler.target.utilization.boundary: 0.2
> pipeline.max-parallelism: 60
> jobmanager.scheduler: adaptive
>
> During a scale-up event, the autoscaler increases the parallelism of one
> of the job vertex to a higher value. This triggers a bunch of new task
> managers to be scheduled on the EKS cluster (The node-group has an attached
> ASG). Out of all the requested TM pods only some get scheduled and then the
> cluster runs out of resources. The other TM pods remain in the "pending
> mode" indefinitely and the job is stuck in the "restart" loop forever.
>
> 1. Shouldn't the adaptive scheduler reduce the vertex parallelism due to
> the slots/TMs not being available?
> 2. When I looked at the pods stuck in the pending state, I found them to
> be reporting the following events:
>
> │   Warning  FailedScheduling   4m55s (x287 over 23h)   default-scheduler
> 0/5 nodes are available: 1 Insufficient cpu, 1 node(s) didn't match Pod's
> node affinity/selector, 3 Insufficient memory. preempti │
>
> │ on: 0/5 nodes are available: 1 Preemption is not helpful for scheduling,
> 4 No preemption victims found for incoming pod.
>   │
>
> │   Normal   NotTriggerScaleUp  3m26s (x8555 over 23h)  cluster-autoscaler
> pod didn't trigger scale-up: 1 max node group size reached
>
> The WARN suggests that the "default scheduler" is being used. Why is that
> the case even though the adaptive scheduler is configured to be used?
>
> Appreciate it if you can shed some light on why this could be happening.
>
> Thanks
> Chetas
>


Re: Autoscaling with flink-k8s-operator 1.8.0

2024-05-01 Thread Gyula Fóra
Hi Chetas,

The operator logic itself would normally call the rescale api during the
upgrade process, not the autoscaler module. The autoscaler module sets the
correct config with the parallelism overrides, and then the operator
performs the regular upgrade cycle (as when you yourself change something
in the spec). If only the parallelism overrides change then it will use-the
rescale api, otherwise a full upgrade is triggered.

Can you share the entire resource yaml and the logs from the operator
related to the upgrade (after the scaling was triggered)? You can usually
see from the logs why the in-place scaling wasn't used in a particular case.
You can debug in-place scaling itself by completely disabling the
autoscaler and manually setting pipeline.jobvertex-parallelism-overrides in
the flink config.

Cheers,
Gyula

On Thu, May 2, 2024 at 3:49 AM Chetas Joshi  wrote:

> Hello,
>
> We recently upgraded the operator to 1.8.0 to leverage the new autoscaling
> features (
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/autoscaler/).
> The FlinkDeployment (application cluster) is set to flink v1_18 as well. I
> am able to observe the following event being reported in the logs of the
> operator.
>
> o.a.f.k.o.l.AuditUtils [INFO ][flink/devpipeline] >>> Event  |
> Info| SCALINGREPORT   | Scaling execution enabled, begin scaling
> vertices:{ Vertex ID  | Parallelism 2 -> 1 | Processing capacity
> Infinity -> Infinity | Target data rate 7.85}{ Vertex ID  |
> Parallelism 2 -> 1 | Processing capacity Infinity -> Infinity | Target data
> rate 0.00}{ Vertex ID  | Parallelism 2 -> 1 | Processing capacity
> Infinity -> Infinity | Target data rate 7.85}{ Vertex ID w |
> Parallelism 2 -> 1 | Processing capacity 33235.72 -> 13294.29 | Target data
> rate 6.65}
>
> But the in-place autoscaling is not getting triggered. My understanding is
> that the autoscaler running within the k8s-operator should call the rescale
> api endpoint of the FlinkDeployment (devpipeline)  with a parallelism
> overrides map (vertexId => parallelism) and that should trigger a redeploy
> of the jobGraph. But that is not happening. The restart of the
> FlinkDeployment overrides the map (vertexId => parallelism) in the
> configMap resource that stores the flink-config.
>
> Am I missing something? How do I debug this further?
>
> Here is the flink-config set within the k8s-operator.
>
> job.autoscaler.stabilization.interval: 1m
> job.autoscaler.target.utilization: 0.6
> job.autoscaler.target.utilization.boundary: 0.2
> pipeline.max-parallelism: 180
> jobmanager.scheduler: adaptive
>
>
> Thank you
> Chetas
>


Re: [Flink Kubernetes Operator] The "last-state" upgrade mode is only supported in FlinkDeployments

2024-04-30 Thread Gyula Fóra
The application mode indeed has a sticky jobId (at least when we are
performing a last-state upgrade, otherwise a new jobId is generated during
stateless deployments). But that's only part of the story and arguably the
less important bit. The last-state upgrade mechanism for running/failing
(but otherwise non-terminal) jobs relies on the Flink HA metadata to carry
over the state information automagically. In Flink the HA mechanism always
keeps track of the last state of a job so that even in the case  of a JM
loss the job can correctly recover.

The operator last-state upgrade uses this exact mechanism: we delete the
deployment (JMs, and TMs) but keep the HA metadata and then start the new
cluster with the upgraded spec. The JM will recover thinking that it's only
a failover and pick up the state automatically. We can do this because we
have 1 cluster - 1 job and upgrading means upgrading the entire deployment.

The same is not true for session jobs where we can't use the HA metadata
trick and we actually need to figure out the last state (the checkpoint or
savepoint path). This can only be done through the JM rest api. This should
be possible in most cases when the JM is healthy after cancelling the
session job. By the way for terminal jobs (FAILED/FINISHED/CANCELLED) we
also do similarly in case of the FlinkDeployments, where the last
checkpoint info is queried from the JM (
https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java#L74-L78
)
For session jobs you will not need sticky job ids because it's simply not
relevant.

Gyula

On Tue, Apr 30, 2024 at 7:51 PM Alan Zhang  wrote:

> Hi Gyula,
>
> Thanks for your reply! Good suggestion on JIRA ticket, I created a JIRA
> ticket for tracking it: https://issues.apache.org/jira/browse/FLINK-35279.
> We could be interested in working on it because of our own requirement, I
> will check you and the community again once we have some updates.
>
> >We don't have the same robust way of getting the last-state information
> for session jobs as we do for applications, so it will be slightly less
> reliable overall.
> My understanding is that application mode has sticky job id but session
> mode doesn't have, with sticky job id it is easier to implement
> "last-state" upgrade mode. When you were saying "robust way", does it mean
> "sticky job id" in application mode?
>
>
> On Mon, Apr 29, 2024 at 10:28 PM Gyula Fóra  wrote:
>
>> Hi Alan!
>>
>> I think it should be possible to address this gap for most cases. We
>> don't have the same robust way of getting the last-state information for
>> session jobs as we do for applications, so it will be slightly less
>> reliable overall.
>> For session jobs the last checkpoint info has to be queried from the JM
>> rest api, so as long that is available it should work fine.
>>
>> I am not aware of anyone working on this at the moment, it would be great
>> if you could open a JIRA ticket to track this. If you are interested in
>> working on this, we can also support you but this is a fairly complex
>> feature that involves many layers of operator logic.
>>
>> Cheers,
>> Gyula
>>
>> On Tue, Apr 30, 2024 at 1:08 AM Alan Zhang  wrote:
>>
>>> Hi,
>>>
>>> We wanted to use the Apache Flink Kubernetes operator to manage the
>>> lifecycle of our Flink jobs in Flink session clusters. And we wanted to
>>> have the "last-state" upgrade feature for our use cases.
>>>
>>> However, the latest official doc states the "last-state" upgrade mode is
>>> not supported in the session mode(aka. FlinkSessionJob) currently:
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades
>>>
>>> Last state upgrade mode is currently only supported for FlinkDeployments
>>> .
>>>
>>> Why didn't we support this upgrade mode in session mode? Do we have a
>>> plan to address this gap? Any suggestions for us if we want to stick with
>>> session mode?
>>>
>>> --
>>> Thanks,
>>> Alan
>>>
>>
>
> --
> Thanks,
> Alan
>


Re: [Flink Kubernetes Operator] The "last-state" upgrade mode is only supported in FlinkDeployments

2024-04-29 Thread Gyula Fóra
Hi Alan!

I think it should be possible to address this gap for most cases. We don't
have the same robust way of getting the last-state information for session
jobs as we do for applications, so it will be slightly less reliable
overall.
For session jobs the last checkpoint info has to be queried from the JM
rest api, so as long that is available it should work fine.

I am not aware of anyone working on this at the moment, it would be great
if you could open a JIRA ticket to track this. If you are interested in
working on this, we can also support you but this is a fairly complex
feature that involves many layers of operator logic.

Cheers,
Gyula

On Tue, Apr 30, 2024 at 1:08 AM Alan Zhang  wrote:

> Hi,
>
> We wanted to use the Apache Flink Kubernetes operator to manage the
> lifecycle of our Flink jobs in Flink session clusters. And we wanted to
> have the "last-state" upgrade feature for our use cases.
>
> However, the latest official doc states the "last-state" upgrade mode is
> not supported in the session mode(aka. FlinkSessionJob) currently:
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades
>
> Last state upgrade mode is currently only supported for FlinkDeployments.
>
> Why didn't we support this upgrade mode in session mode? Do we have a plan
> to address this gap? Any suggestions for us if we want to stick with
> session mode?
>
> --
> Thanks,
> Alan
>


Re: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0

2024-04-26 Thread Gyula Fóra
I was talking about Flink Kubernetes operator and HA , not the individual
Flink jobs. But based on your answer it’s probably not the cause

Gyula

On Fri, 26 Apr 2024 at 21:15, Maxim Senin  wrote:

> Hi, Gyula. Thanks for the tips.
>
> All jobs are deployed in a single namespace, “flink”.
>
> Which replicas? The JM replicas are already 1, I tried with TM replicas
> set to 1, but same exception happens. We have only 1 instance of the
> operator (replicas=1) in this environment.
>
> The only workarounds I discovered is either
> a) disable autoscaling for the failing job (autoscaler scales the job to
> zero for “gracefully” stopping it and then never starts it)  or
> b) some jobs that keep restarting can be fixed by disabling HA for that job
>
> And ` *Cannot rescale the given pointwise partitioner.` *is also still a
> mystery.
>
> *Thanks,*
>
> *Maxim*
>
>
>
> *From: *Gyula Fóra 
> *Date: *Friday, April 26, 2024 at 1:10 AM
> *To: *Maxim Senin 
> *Cc: *Maxim Senin via user 
> *Subject: *Re: [External] Exception during autoscaling operation - Flink
> 1.18/Operator 1.8.0
>
> Hi Maxim!
>
>
>
> Regarding the status update error, it could be related to a problem that
> we have discovered recently with the Flink Operator HA. Where during a
> namespace change both leader and follower instances would start processing.
>
> It has been fixed in the current master by updating the JOSDK version to
> the one containing the fix.
>
>
>
> For details you can check:
>
> https://github.com/operator-framework/java-operator-sdk/issues/2341
>
>
> https://github.com/apache/flink-kubernetes-operator/commit/29a4aa5adf101920cbe1a3a9a178ff16f52c746d
>
>
>
> To resolve the issue (if it's caused by this), you could either
> cherry-pick the fix internally to the operator or reduce the replicas to 1
> if you are using HA.
>
>
>
> Cheers,
>
> Gyula
>
>
>
>
>
>
>
> On Fri, Apr 26, 2024 at 3:48 AM Maxim Senin via user <
> user@flink.apache.org> wrote:
>
> I have also seen this exception:
>
> o.a.f.k.o.o.JobStatusObserver  
> *[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6]
> Job d0ac9da5959d8cc9a82645eeef6751a5 failed with error:
> java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException:
> java.lang.UnsupportedOperationException: Cannot rescale the given pointwise
> partitioner.*
>
> *Did you change the partitioner to forward or rescale?*
>
> *It may also help to add an explicit shuffle().*
>
> *at
> org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305)*
>
> *at
> java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)*
>
> *at
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown
> Source)*
>
> *at
> java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown
> Source)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)*
>
> *at
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)*
>
> *at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)*
>
> *at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)*
>
> *at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:127)*
>
> *at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)*
>
> *at
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)*
>
> I can’t find any information on how to interpret this. Please advise..
>
> Cheers,
> Maxim
>
> *From: *Maxim Senin via user 
> *Date: *Thursday, April 25, 2024 at 12:01 PM
> *To: *Maxim Senin via user 
> *Subject: *[External] Exception during autoscaling operation - Flink
> 1.18/Operator 1.8.0
>
> Hi.
>
> I already asked before but never got an answer. My observation is that the
> operator, after collecting some stats, is trying to restart one of 

Re: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0

2024-04-26 Thread Gyula Fóra
Hi Maxim!

Regarding the status update error, it could be related to a problem that we
have discovered recently with the Flink Operator HA. Where during a
namespace change both leader and follower instances would start processing.
It has been fixed in the current master by updating the JOSDK version to
the one containing the fix.

For details you can check:
https://github.com/operator-framework/java-operator-sdk/issues/2341
https://github.com/apache/flink-kubernetes-operator/commit/29a4aa5adf101920cbe1a3a9a178ff16f52c746d

To resolve the issue (if it's caused by this), you could either cherry-pick
the fix internally to the operator or reduce the replicas to 1 if you are
using HA.

Cheers,
Gyula



On Fri, Apr 26, 2024 at 3:48 AM Maxim Senin via user 
wrote:

> I have also seen this exception:
>
> o.a.f.k.o.o.JobStatusObserver  
> *[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6]
> Job d0ac9da5959d8cc9a82645eeef6751a5 failed with error:
> java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException:
> java.lang.UnsupportedOperationException: Cannot rescale the given pointwise
> partitioner.*
>
> *Did you change the partitioner to forward or rescale?*
>
> *It may also help to add an explicit shuffle().*
>
> *at
> org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305)*
>
> *at
> java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)*
>
> *at
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown
> Source)*
>
> *at
> java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown
> Source)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)*
>
> *at
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)*
>
> *at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)*
>
> *at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)*
>
> *at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:127)*
>
> *at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)*
>
> *at
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)*
>
> I can’t find any information on how to interpret this. Please advise..
>
> Cheers,
> Maxim
>
> *From: *Maxim Senin via user 
> *Date: *Thursday, April 25, 2024 at 12:01 PM
> *To: *Maxim Senin via user 
> *Subject: *[External] Exception during autoscaling operation - Flink
> 1.18/Operator 1.8.0
>
> Hi.
>
> I already asked before but never got an answer. My observation is that the
> operator, after collecting some stats, is trying to restart one of the
> deployments. This includes taking a savepoint (`takeSavepointOnUpgrade:
> true`, `upgradeMode: savepoint`) and “gracefully” shutting down the
> JobManager by “scaling it to zero” (by setting replicas = 0 in the new
> generated config).
>
> However, the deployment never comes back up, apparently, due to exception:
>
>
> 2024-04-25 17:20:52,920 mi.j.o.p.e.ReconciliationDispatcher *[ERROR]*
> [flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] *Error* during error
> status handling.
>
> org.apache.flink.kubernetes.operator.exception.StatusConflictException:
> Status have been modified externally in version 50607043 Previous:
> {"jobStatus":{"jobName":"autoscaling
> test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHEDINFO\\n\"},\"mode\":\"native\"},\"resource_metadata\":…
>
> *at
> org.apache.flink.kubernetes.operator.utils.StatusRecorder.replaceStatus(StatusRecorder.java:161)*
>
> *at
> org.apache.flink.kubernetes.operator.utils.StatusRecorder.patchAndCacheStatus(StatusRecorder.java:97)*
>
> *at
> org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils.toErrorStatusUpdateControl(ReconciliationUtils.java:438)*
>
> *at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:209)*
>
> *at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:57)*
>
> *at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleErrorStatusHandler(ReconciliationDispatcher.java:194)*
>
> *at
> 

Re: Unaligned checkpoint blocked by long Async operation

2024-03-15 Thread Gyula Fóra
Posting this to dev as well...

Thanks Zakelly,
Sounds like a solution could be to add a new different version of yield
that would actually yield to the checkpoint barrier too. That way operator
implementations could decide whether any state modification may or may not
have happened and can optionally allow checkpoint to be taken in the
"middle of record  processing".

Gyula

On Fri, Mar 15, 2024 at 3:49 AM Zakelly Lan  wrote:

> Hi Gyula,
>
> Processing checkpoint halfway through `processElement` is problematic. The
> current element will not be included in the input in-flight data, and we
> cannot assume it has taken effect on the state by user code. So the best
> way is to treat `processElement` as an 'atomic' operation. I guess that's
> why the priority of the cp barrier is set low.
> However, the AsyncWaitOperator is a special case where we know the element
> blocked at `addToWorkQueue` has not started triggering the userFunction.
> Thus I'd suggest putting the element in the queue when the cp barrier
> comes, and taking a snapshot of the whole queue afterwards. The problem
> will be solved. But this approach also involves some code modifications on
> the mailbox executor.
>
>
> Best,
> Zakelly
>
> On Thu, Mar 14, 2024 at 9:15 PM Gyula Fóra  wrote:
>
>> Thank you for the detailed analysis Zakelly.
>>
>> I think we should consider whether yield should process checkpoint
>> barriers because this puts quite a serious limitation on the unaligned
>> checkpoints in these cases.
>> Do you know what is the reason behind the current priority setting? Is
>> there a problem with processing the barrier here?
>>
>> Gyula
>>
>> On Thu, Mar 14, 2024 at 1:22 PM Zakelly Lan 
>> wrote:
>>
>>> Hi Gyula,
>>>
>>> Well I tried your example in local mini-cluster, and it seems the source
>>> can take checkpoints but it will block in the following AsyncWaitOperator.
>>> IIUC, the unaligned checkpoint barrier should wait until the current
>>> `processElement` finishes its execution. In your example, the element queue
>>> of `AsyncWaitOperator` will end up full and `processElement` will be
>>> blocked at `addToWorkQueue`. Even though it will call
>>> `mailboxExecutor.yield();`, it still leaves the checkpoint barrier
>>> unprocessed since the priority of the barrier is -1, lower than the one
>>> `yield()` should handle. I verified this using single-step debugging.
>>>
>>> And if one element could finish its async io, the cp barrier can be
>>> processed afterwards. For example:
>>> ```
>>> env.getCheckpointConfig().enableUnalignedCheckpoints();
>>> env.getCheckpointConfig().setCheckpointInterval(1);  // 10s interval
>>> env.getConfig().setParallelism(1);
>>> AsyncDataStream.orderedWait(
>>> env.fromSequence(Long.MIN_VALUE,
>>> Long.MAX_VALUE).shuffle(),
>>> new AsyncFunction() {
>>> boolean first = true;
>>> @Override
>>> public void asyncInvoke(Long aLong,
>>> ResultFuture resultFuture) {
>>> if (first) {
>>>
>>> Executors.newSingleThreadExecutor().execute(() -> {
>>> try {
>>> Thread.sleep(2); // process
>>> after 20s, only for the first one.
>>> } catch (Throwable e) {}
>>> LOG.info("Complete one");
>>>
>>> resultFuture.complete(Collections.singleton(1L));
>>> });
>>> first = false;
>>> }
>>> }
>>> },
>>> 24,
>>> TimeUnit.HOURS,
>>> 1)
>>> .print();
>>> ```
>>> The checkpoint 1 can be normally finished after the "Complete one" log
>>> print.
>>>
>>> I guess the users have no means to solve this problem, we might optimize
>>> this later.
>>>
>>>
>>> Best,
>>> Zakelly
>>>
>>> On Thu, Mar 14, 2024 at 5:41 PM Gyula Fóra  wrote:
>>>
>>>> Hey all!
>>>>
>>>> I encountered a strange and unexpected behaviour when trying to use
>>>> unaligned checkpoints with AsyncIO.
>>>>
>>>> If the async operation queue is full and backpressures the pipeline
>>>> 

Re: Unaligned checkpoint blocked by long Async operation

2024-03-14 Thread Gyula Fóra
Thank you for the detailed analysis Zakelly.

I think we should consider whether yield should process checkpoint barriers
because this puts quite a serious limitation on the unaligned checkpoints
in these cases.
Do you know what is the reason behind the current priority setting? Is
there a problem with processing the barrier here?

Gyula

On Thu, Mar 14, 2024 at 1:22 PM Zakelly Lan  wrote:

> Hi Gyula,
>
> Well I tried your example in local mini-cluster, and it seems the source
> can take checkpoints but it will block in the following AsyncWaitOperator.
> IIUC, the unaligned checkpoint barrier should wait until the current
> `processElement` finishes its execution. In your example, the element queue
> of `AsyncWaitOperator` will end up full and `processElement` will be
> blocked at `addToWorkQueue`. Even though it will call
> `mailboxExecutor.yield();`, it still leaves the checkpoint barrier
> unprocessed since the priority of the barrier is -1, lower than the one
> `yield()` should handle. I verified this using single-step debugging.
>
> And if one element could finish its async io, the cp barrier can be
> processed afterwards. For example:
> ```
> env.getCheckpointConfig().enableUnalignedCheckpoints();
> env.getCheckpointConfig().setCheckpointInterval(1);  // 10s interval
> env.getConfig().setParallelism(1);
> AsyncDataStream.orderedWait(
> env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
> new AsyncFunction() {
> boolean first = true;
> @Override
> public void asyncInvoke(Long aLong, ResultFuture
> resultFuture) {
> if (first) {
> Executors.newSingleThreadExecutor().execute(()
> -> {
> try {
> Thread.sleep(2); // process after
> 20s, only for the first one.
> } catch (Throwable e) {}
> LOG.info("Complete one");
>
> resultFuture.complete(Collections.singleton(1L));
> });
> first = false;
> }
> }
> },
> 24,
> TimeUnit.HOURS,
> 1)
> .print();
> ```
> The checkpoint 1 can be normally finished after the "Complete one" log
> print.
>
> I guess the users have no means to solve this problem, we might optimize
> this later.
>
>
> Best,
> Zakelly
>
> On Thu, Mar 14, 2024 at 5:41 PM Gyula Fóra  wrote:
>
>> Hey all!
>>
>> I encountered a strange and unexpected behaviour when trying to use
>> unaligned checkpoints with AsyncIO.
>>
>> If the async operation queue is full and backpressures the pipeline
>> completely, then unaligned checkpoints cannot be completed. To me this
>> sounds counterintuitive because one of the benefits of the AsyncIO would be
>> that we can simply checkpoint the queue and not have to wait for the
>> completion.
>>
>> To repro you can simply run:
>>
>> AsyncDataStream.orderedWait(
>> env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
>> new AsyncFunction() {
>> @Override
>> public void asyncInvoke(Long aLong, ResultFuture
>> resultFuture) {}
>> },
>> 24,
>> TimeUnit.HOURS,
>> 1)
>> .print();
>>
>> This pipeline will completely backpressure the source and checkpoints do
>> not progress even though they are unaligned. Already the source cannot take
>> a checkpoint it seems which for me is surprising because this is using the
>> new source interface.
>>
>> Does anyone know why this happens and if there may be a solution?
>>
>> Thanks
>> Gyula
>>
>


Unaligned checkpoint blocked by long Async operation

2024-03-14 Thread Gyula Fóra
Hey all!

I encountered a strange and unexpected behaviour when trying to use
unaligned checkpoints with AsyncIO.

If the async operation queue is full and backpressures the pipeline
completely, then unaligned checkpoints cannot be completed. To me this
sounds counterintuitive because one of the benefits of the AsyncIO would be
that we can simply checkpoint the queue and not have to wait for the
completion.

To repro you can simply run:

AsyncDataStream.orderedWait(
env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
new AsyncFunction() {
@Override
public void asyncInvoke(Long aLong, ResultFuture
resultFuture) {}
},
24,
TimeUnit.HOURS,
1)
.print();

This pipeline will completely backpressure the source and checkpoints do
not progress even though they are unaligned. Already the source cannot take
a checkpoint it seems which for me is surprising because this is using the
new source interface.

Does anyone know why this happens and if there may be a solution?

Thanks
Gyula


Re: Temporal join on rolling aggregate

2024-03-05 Thread Gyula Fóra
Hi Everyone!

I have discussed this with Sébastien Chevalley, he is going to prepare and
drive the FLIP while I will assist him along the way.

Thanks
Gyula

On Tue, Mar 5, 2024 at 9:57 AM  wrote:

> I do agree with Ron Liu.
> This would definitely need a FLIP as it would impact SQL and extend it
> with the equivalent of TimestampAssigners in the Java API.
>
> Is there any existing JIRA here, or is anybody willing to drive a FLIP?
> On Feb 26, 2024 at 02:36 +0100, Ron liu , wrote:
>
> +1,
> But I think this should be a more general requirement, that is, support for
> declaring watermarks in query, which can be declared for any type of
> source, such as table, view. Similar to databricks provided [1], this needs
> a FLIP.
>
> [1]
>
> https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-qry-select-watermark.html
>
> Best,
> Ron
>
>


Re: flink-operator-1.5.0 supports which versions of Kubernetes

2024-03-04 Thread Gyula Fóra
It should be compatible. There is no compatibility matrix but it is
compatible with most versions that are in use (at the different
companies/users etc)

Gyula

On Thu, Feb 29, 2024 at 6:21 AM 吴圣运  wrote:

> Hi,
>
> I'm using flink-operator-1.5.0 and I need to deploy it to Kubernetes 1.20.
> I want to confirm if this version of flink-operator is compatible with
> Kubernetes 1.20. I cannot find the compatible matrix in the github page.
> Could you help me confirm?
>
> Thanks
> shengyun.wu
>


Re: Temporal join on rolling aggregate

2024-02-22 Thread Gyula Fóra
Posting this to dev as well as it potentially has some implications on
development effort.

What seems to be the problem here is that we cannot control/override
Timestamps/Watermarks/Primary key on VIEWs. It's understandable that you
cannot create a PRIMARY KEY on the view but I think the temporal join also
should not require the PK, should we remove this limitation?

The general problem is the inflexibility of the timestamp/watermark
handling on query outputs, which makes this again impossible.

The workaround here can be to write the rolling aggregate to Kafka, read it
back again and join with that. The fact that this workaround is possible
actually highlights the need for more flexibility on the query/view side in
my opinion.

Has anyone else run into this issue and considered the proper solution to
the problem? Feels like it must be pretty common :)

Cheers,
Gyula




On Wed, Feb 21, 2024 at 10:29 PM Sébastien Chevalley 
wrote:

> Hi,
>
> I have been trying to write a temporal join in SQL done on a rolling
> aggregate view. However it does not work and throws :
>
> org.apache.flink.table.api.ValidationException: Event-Time Temporal Table
> Join requires both primary key and row time attribute in versioned table,
> but no row time attribute can be found.
>
> It seems that after the aggregation, the table looses the watermark and
> it's not possible to add one with the SQL API as it's a view.
>
> CREATE TABLE orders (
> order_id INT,
> price DECIMAL(6, 2),
> currency_id INT,
> order_time AS NOW(),
> WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
> )
> WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '10',
> 'fields.order_id.kind' = 'sequence',
> 'fields.order_id.start' = '1',
> 'fields.order_id.end' = '10',
> 'fields.currency_id.min' = '1',
> 'fields.currency_id.max' = '20'
> );
>
> CREATE TABLE currency_rates (
> currency_id INT,
> conversion_rate DECIMAL(4, 3),
> PRIMARY KEY (currency_id) NOT ENFORCED
> )
> WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '10',
> 'fields.currency_id.min' = '1',
> 'fields.currency_id.max' = '20'
> );
>
> CREATE TEMPORARY VIEW max_rates AS (
> SELECT
> currency_id,
> MAX(conversion_rate) AS max_rate
> FROM currency_rates
> GROUP BY currency_id
> );
>
> CREATE TEMPORARY VIEW temporal_join AS (
> SELECT
> order_id,
> max_rates.max_rate
> FROM orders
>  LEFT JOIN max_rates FOR SYSTEM_TIME AS OF orders.order_time
>  ON orders.currency_id = max_rates.currency_id
> );
>
> SELECT * FROM temporal_join;
>
> Am I missing something? What would be a good starting point to address
> this?
>
> Thanks in advance,
> Sébastien Chevalley


Re: Flink Kubernetes Operator - Deadlock when Cluster Cleanup Fails

2024-02-13 Thread Gyula Fóra
Hi Niklas!

The best way to report the issue would be to open a JIRA ticket with the
same detailed information.

Otherwise I think your observations are correct and this is indeed a
frequent problem that comes up, it would be good to improve on it. In
addition to improving logging we could also increase the default timeout
and if we could actually do something on the timeout that would be even
better.

Please open the JIRA ticket and if you have time to work on these
improvements I will assign it to you.

Cheers
Gyula

On Mon, Feb 12, 2024 at 11:59 PM Niklas Wilcke 
wrote:

> Hi Flink Kubernetes Operator Community,
>
> I hope this is the right way to report an issue with the Apache Flink
> Kubernetes Operator. We are experiencing problems with some streaming job
> clusters which end up in a terminated state, because of the operator not
> behaving as expected. The problem is that the teardown of the Flink cluster
> by the operator doesn't succeed in the default timeout of 1 minute. After
> that the operator proceeds and tries to create a fresh cluster, which
> fails, because parts of the cluster still exist. After that it tries to
> fully remove the cluster including the HA metadata. After that it is stuck
> in an error loop that manual recovery is necessary, since the HA metadata
> is missing. At the very bottom of the mail you can find an condensed log
> attached, which hopefully gives a more detailed impression about the
> problem.
>
> The current workaround is to increase the 
> "kubernetes.operator.resource.cleanup.timeout"
> [0] to 10 minutes. Time will tell whether this workaround fixes the
> problem for us.
>
> The main problem I see is that the
> method AbstractFlinkService.waitForClusterShutdown(...) [1] isn't handling
> a timeout at all. Please correct me in case I missed a detail, but this is
> how we experience the problem. In case one of the service, the jobmanagers
> or the taskmanagers survives the cleanup timeout (of 1 minute), the
> operator seems to proceed as if the entities have been removed properly. To
> me this doesn't look good. From my point of view at least an error should
> be logged.
>
> Additionally the current logging makes it difficult to analyse the problem
> and to be notified about the timeout. The following things could possibly
> be improved or implemented.
>
>1. Successful removal of the entities should be logged.
>2. Timing out isn't logged (An error should probably be logged here)
>3. For some reason the logging of the waited seconds is somehow
>incomplete (L944, further analysis needed)
>
> We use the following Flink and Operator versions:
>
> Flink Image: flink:1.17.1 (from Dockerhub)
> Operator Version: 1.6.1
>
> I hope this description is well enough to get into touch and discuss the
> matter. I'm open to provide additional information or with some guidance,
> provide a patch to resolve the issue.
> Thanks for your work on the Operator. It is highly appreciated!
>
> Cheers,
> Niklas
>
>
> [0]
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/
> [1]
> https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L903
>
>
>
> #
> # The job in the cluster failed
> Event  | Info| JOBSTATUSCHANGED | Job status changed from RUNNING to
> FAILED"
> Stopping failed Flink job...
> Status | Error   | FAILED  |
> {""type"":""org.apache.flink.util.SerializedThrowable"",""message"":"
> "org.apache.flink.runtime.JobException: Recovery is suppressed by
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=5,
> backoffTimeMS=3)"",""additionalMetadata"":{},""throwableList"":[{""type"":""org.apache.flink.util.SerializedThrowable"",""message"":""org
> .apache.flink.util.FlinkExpectedException: The TaskExecutor is shutting
> down."",""additionalMetadata"":{}}]}
>  Deleting JobManager deployment while preserving HA metadata.
> Deleting cluster with Foreground propagation
> Waiting for cluster shutdown... (10s)
> Waiting for cluster shutdown... (30s)
> Waiting for cluster shutdown... (40s)
> Waiting for cluster shutdown... (45s)
> Waiting for cluster shutdown... (50s)
> Resubmitting Flink job...
> Cluster shutdown completed.
> Deploying application cluster
> Event  | Info| SUBMIT  | Starting deployment
> Submitting application in 'Application Mode
> Deploying application cluster
> ...
> Event  | Warning | CLUSTERDEPLOYMENTEXCEPTION | Could not create
> Kubernetes cluster 
>  Status | Error   | FAILED  |
> {""type"":""org.apache.flink.kubernetes.operator.exception.ReconciliationException"",""message"":""org.apache.flink.client.deployment.ClusterDeploymentException:
> Could not create Kubernetes cluster
> \"" not create Kubernetes cluster
> 

Re: Flink pending record metric weired after autoscaler rescaling

2024-01-12 Thread Gyula Fóra
Could this be related to the issue reported here?
https://issues.apache.org/jira/browse/FLINK-34063

Gyula

On Wed, Jan 10, 2024 at 4:04 PM Yang LI  wrote:

> Just to give more context, my setup uses Apache Flink 1.18 with the
> adaptive scheduler enabled, issues happen randomly particularly
> post-restart behaviors.
>
> After each restart, the system logs indicate "Adding split(s) to reader:",
> signifying the reassignment of partitions across different TaskManagers. An
> anomaly arises with specific partitions, for example, partition-10. This
> partition does not appear in the logs immediately post-restart. It remains
> unlogged for several hours, during which no data consumption from
> partition-10 occurs. Subsequently, the logs display "Discovered new
> partitions:", and only then does the consumption of data from partition-10
> recommence.
>
> Could you provide any insights or hypotheses regarding the underlying cause
> of this delayed recognition and processing of certain partitions?
>
> Best regards,
> Yang
>
> On Mon, 8 Jan 2024 at 16:24, Yang LI  wrote:
>
> > Dear Flink Community,
> >
> > I've encountered an issue during the testing of my Flink autoscaler. It
> > appears that Flink is losing track of specific Kafka partitions, leading
> to
> > a persistent increase in lag on these partitions. The logs indicate a
> > 'kafka connector metricGroup name collision exception.' Notably, the
> > consumption on these Kafka partitions returns to normal after restarting
> > the Kafka broker. For context, I have enabled in-place rescaling support
> > with 'jobmanager.scheduler: Adaptive.'
> >
> > I suspect the problem may stem from:
> >
> > The in-place rescaling support triggering restarts of some taskmanagers.
> > This process might not be restarting the metric groups registered by the
> > Kafka source connector correctly, leading to a name collision exception
> and
> > preventing Flink from accurately reporting metrics related to Kafka
> > consumption.
> > A potential edge case in the metric for pending records, especially when
> > different partitions exhibit varying lags. This discrepancy might be
> > causing the pending record metric to malfunction.
> > I would appreciate your insights on these observations.
> >
> > Best regards,
> > Yang LI
> >
>


Re: Re: Optional fields during SQL insert

2024-01-11 Thread Gyula Fóra
This worked perfectly Xuyang, nice :)

Thanks!

On Thu, Jan 11, 2024 at 12:52 PM Xuyang  wrote:

> Hi, Gyula.
> If you want flink to fill the unspecified column with NULL, you can try
> the following sql like :
> ```
> INSERT INTO Sink(a) SELECT a from Source
> ```
>
>
> --
> Best!
> Xuyang
>
>
> 在 2024-01-11 16:10:47,"Giannis Polyzos"  写道:
>
> Hi Gyula,
> to the best of my knowledge, this is not feasible and you will have to do
> something like *CAST(NULL AS STRING)* to insert null values manually.
>
> Best,
> Giannis
>
> On Thu, Jan 11, 2024 at 9:58 AM Gyula Fóra  wrote:
>
>> Hi All!
>>
>> Is it possible to insert into a table without specifying all columns of
>> the target table?
>> In other words can we use the default / NULL values of the table when not
>> specified somehow?
>>
>> For example:
>> Query schema: [a: STRING]
>> Sink schema:  [a: STRING, b: STRING]
>>
>> I would like to be able to simply insert with column a and get null
>> values for b.
>> For a simple table like the above it doesn't make much of a practical
>> difference but if you have very large tables with complex fields and a lot
>> of optional columns this can be very useful.
>>
>> Thank you!
>> Gyula
>>
>


Optional fields during SQL insert

2024-01-10 Thread Gyula Fóra
Hi All!

Is it possible to insert into a table without specifying all columns of the
target table?
In other words can we use the default / NULL values of the table when not
specified somehow?

For example:
Query schema: [a: STRING]
Sink schema:  [a: STRING, b: STRING]

I would like to be able to simply insert with column a and get null values
for b.
For a simple table like the above it doesn't make much of a practical
difference but if you have very large tables with complex fields and a lot
of optional columns this can be very useful.

Thank you!
Gyula


Re: [Flink Kubernetes Operator] Restoring from an outdated savepoint.

2023-12-22 Thread Gyula Fóra
Please upgrade the operator to the latest release, and if the issue still
exists please open a Jira ticket with the details.

Gyula

On Fri, 22 Dec 2023 at 21:17, Ruibin Xing  wrote:

> I wanted to talk about an issue we've hit recently with Flink Kubernetes
> Operator 1.6.1 and Flink 1.17.1.
>
> As we're using the Savepoint upgrade mode, we ran into cases where
> the lastSavepoint in status doesn't seem to update (still digging into why,
> could be an exception when cancelling tasks?).This ends up making flink
> restore from an outdated savepoint.
>
> Since there is a job.upgrade.last-state.max.allowed.checkpoint.age
> option, could we add a similar option for savepoint as well? So if the
> last-savepoint is too old, we get an error or use the latest checkpoint
> when upgrading(fallback to last-state).
>
> Or is there a better solution maybe? Would love to hear your thoughts, or
> any other ideas you might have. Thanks!
>


Re: Flink operator autoscaler scaling down

2023-12-11 Thread Gyula Fóra
Could you please elaborate a little in which scenarios you find that the
pending record metrics are not good to track Kafka lag?

Thanks
Gyula

On Mon, Dec 11, 2023 at 4:26 PM Yang LI  wrote:

> Hello,
>
> Following our recent discussion, I've successfully implemented a Flink
> operator featuring a "memory protection" patch. However, in the course of
> my testing, I've encountered an issue: the Flink operator relies on the
> 'pending_record' metric to gauge backlog. Unfortunately, this metric
> doesn't seem to accurately represent the lag in the Kafka topic in certain
> scenarios.
>
> Could you advise if there are any configurations I might have overlooked
> that could enhance the autoscaler's ability to scale up in response to lags
> in Kafka topics?
>
> Regards,
> Yang LI
>
> On Wed, 15 Nov 2023 at 20:39, Yang LI  wrote:
>
>> Thanks Maximilian and Gyula, I'll keep you updated.
>>
>> Best,
>> Yang
>>
>> On Sat, 11 Nov 2023 at 16:18, Maximilian Michels  wrote:
>>
>>> Hi Yang,
>>>
>>> We're always open to changes / additions to the autoscaler logic and
>>> metric collection. Preferably, we change these directly in the
>>> autoscaler implementation, without adding additional processes or
>>> controllers. Let us know how your experiments go! If you want to
>>> contribute, a JIRA with a description of the changes would be the
>>> first step. We can take it from there.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Tue, Nov 7, 2023 at 9:04 PM Yang LI  wrote:
>>> >
>>> > Hi Gyula,
>>> >
>>> > Thank you for the feedback! With your permission, I plan to integrate
>>> the implementation into the flink-kubernetes-operator-autoscaler module to
>>> test it on my env. Subsequently, maybe contribute these changes back to the
>>> community by submitting a pull request to the GitHub repository in the
>>> coming months.
>>> >
>>> > Best,
>>> > Yang
>>> >
>>> > On Tue, 7 Nov 2023 at 19:08, Gyula Fóra  wrote:
>>> >>
>>> >> Sounds like a lot of work for very little gain to me. If you really
>>> feel that there is some room for improvement with the current
>>> implementation, it may be simpler to fix that .
>>> >>
>>> >> Gyula
>>> >>
>>> >> On Tue, 7 Nov 2023 at 01:20, Yang LI 
>>> wrote:
>>> >>>
>>> >>> Thanks for the information!
>>> >>>
>>> >>> I haven't tested Kuberntes's built-in rollback mechanism yet. I feel
>>> like I can create another independent operator which detects flink
>>> application jvm memory and triggers rollback.
>>> >>>
>>> >>> Another solution I would like to discuss is also to implement an
>>> independent operator. This operator do following things:
>>> >>>
>>> >>> Retrieve the state size metrics for Flink applications from
>>> Prometheus.
>>> >>> Gather current and recommended parallelism metrics from the Flink
>>> operator, also reported in Prometheus.
>>> >>> If a downscale is advised, I would calculate whether the new cluster
>>> configuration, considering state size and JVM heap max size, can support
>>> the entire state; if not, the operation would be halted.
>>> >>> If feasible, this operator would manage the rescaling process
>>> similarly to the Flink operator, either by making API requests or by
>>> applying a kubectl patch to the FlinkDeployment CRD.
>>> >>>
>>> >>> By doing this we could achieve something similar to what we can do
>>> with a plugin system, Of course in this case I'll disable scaling of the
>>> flink operator, Do you think it could work?
>>> >>>
>>> >>> Best,
>>> >>> Yang
>>> >>>
>>> >>> On Mon, 6 Nov 2023 at 23:43, Gyula Fóra 
>>> wrote:
>>> >>>>
>>> >>>> Hey!
>>> >>>>
>>> >>>> Bit of a tricky problem, as it's not really possible to know that
>>> the job will be able to start with lower parallelism in some cases. Custom
>>> plugins may work but that would be an extremely complex solution at this
>>> point.
>>> >>>>
>>> >>>> The Kubernetes operator has a built-in rollback mechanism that can
>>> help with rolling back these brok

Re: Production deployment of Flink

2023-12-07 Thread Gyula Fóra
Hi!

We recommend using the community supported Flink Kubernetes Operator:

https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.7/docs/try-flink-kubernetes-operator/quick-start/

Cheers,
Gyula

On Thu, Dec 7, 2023 at 6:33 PM Tauseef Janvekar 
wrote:

> Hi Al,
>
> I am using flink in my local setup and it works just fine - I installed it
> using confluent example training course. Here I had to manually execute
> start-cluster.sh and othe steps to start task managers.
>
> We installed flink on kubernetes using bitnami helm chart and it works
> just fine. But we get error messages like TM and JM are not able to
> communicate.
> Should we use something else to deploy flink on kubernetes ?
>
> Can someone please provide instructions on how to enable production ready
> flink deployment for version 1.18 on kubernetes.
>
> Thanks,
> Tauseef
>


Re: Flink Kubernetes Operator: Why one Helm repo for each version?

2023-12-01 Thread Gyula Fóra
Hi!

I already answered your question on slack :


“The main reason is that this allows us to completely separate release
resources etc. much easier for the release process

But this could be improved in the future if there is a good proposal for
the process”

Please do not cross post questions between slack and ml immediately, it’s
more respectful of the contributors’ time if you only ask in a single place
at a time.

Cheers
Gyula

On Fri, 1 Dec 2023 at 13:31, Salva Alcántara 
wrote:

> Hi! I'm curious why there is not a single repo URL for the Flink
> Kubernetes Operator Helm Chart, but multiples ones, one for each version.
> This forces users to add one repo for each version, like this (directly
> from the docs):
>
> ```
> helm repo add flink-operator-repo
> https://downloads.apache.org/flink/flink-kubernetes-operator-
> /
> helm install flink-kubernetes-operator
> flink-operator-repo/flink-kubernetes-operator
> ```
>
> This is weird and different from what I'm used to where you add the
> (version-less) repo once and then install whatever version you need from
> it...
>


[ANNOUNCE] Apache Flink Kubernetes Operator 1.7.0 released

2023-11-22 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.7.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Release highlights:
 - Standalone autoscaler module
 - Improved autoscaler metric tracking
 - Savepoint triggering improvements
 - Java 17 & 21 support

Release blogpost:
https://flink.apache.org/2023/11/22/apache-flink-kubernetes-operator-1.7.0-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator can be found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353462

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


[ANNOUNCE] Apache Flink Kubernetes Operator 1.7.0 released

2023-11-22 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.7.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Release highlights:
 - Standalone autoscaler module
 - Improved autoscaler metric tracking
 - Savepoint triggering improvements
 - Java 17 & 21 support

Release blogpost:
https://flink.apache.org/2023/11/22/apache-flink-kubernetes-operator-1.7.0-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator can be found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353462

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


Re: Flink operator autoscaler scaling down

2023-11-07 Thread Gyula Fóra
Sounds like a lot of work for very little gain to me. If you really feel
that there is some room for improvement with the current implementation, it
may be simpler to fix that .

Gyula

On Tue, 7 Nov 2023 at 01:20, Yang LI  wrote:

> Thanks for the information!
>
> I haven't tested Kuberntes's built-in rollback mechanism yet. I feel like
> I can create another independent operator which detects flink application
> jvm memory and triggers rollback.
>
> Another solution I would like to discuss is also to implement an
> independent operator. This operator do following things:
>
>- Retrieve the state size metrics for Flink applications from
>Prometheus.
>- Gather current and recommended parallelism metrics from the Flink
>operator, also reported in Prometheus.
>- If a downscale is advised, I would calculate whether the new cluster
>configuration, considering state size and JVM heap max size, can support
>the entire state; if not, the operation would be halted.
>- If feasible, this operator would manage the rescaling process
>similarly to the Flink operator, either by making API requests or by
>applying a kubectl patch to the FlinkDeployment CRD.
>
> By doing this we could achieve something similar to what we can do with a
> plugin system, Of course in this case I'll disable scaling of the flink
> operator, Do you think it could work?
>
> Best,
> Yang
>
> On Mon, 6 Nov 2023 at 23:43, Gyula Fóra  wrote:
>
>> Hey!
>>
>> Bit of a tricky problem, as it's not really possible to know that the job
>> will be able to start with lower parallelism in some cases. Custom plugins
>> may work but that would be an extremely complex solution at this point.
>>
>> The Kubernetes operator has a built-in rollback mechanism that can help
>> with rolling back these broken scale operations, have you tried that?
>> Furthermore we are planning to introduce some heap/GC related metrics soon
>> (probably after the next release for 1.8.0) that may help us catching these
>> issues.
>>
>> Cheers,
>> Gyula
>>
>> On Mon, Nov 6, 2023 at 9:27 AM Yang LI  wrote:
>>
>>> Dear Flink Community,
>>>
>>> I am currently working on implementing auto-scaling for my Flink
>>> application using the Flink operator's autoscaler. During testing, I
>>> encountered a "java.lang.OutOfMemoryError: Java heap space" exception when
>>> the autoscaler attempted to scale down. This issue arises when the incoming
>>> record rate decreases while the state size has not yet reduced
>>> correspondingly. Despite numerous tests, managing this issue has been
>>> difficult due to the lack of a parameter that allows for specifying a
>>> cooldown period(essential for processing and reducing state size)prior to
>>> actual scaling down. Moreover, determining an optimal duration for this
>>> cooldown period is also not straightforward. I believe that enhancing the
>>> autoscaler with a focus on memory checks or more broadly on stability
>>> conditions could significantly address this issue.. Here are some potential
>>> solutions that, in my opinion, could improve the situation:
>>>
>>>1. Integrate heap memory-related metrics into the metric collection,
>>>coupled with a memory safety margin check within the autoscaler's 
>>> algorithm.
>>>
>>>2. Introduce a plugin system and a pre-rescaling step in the Flink
>>>operator's autoscaler, which would allow users to implement custom 
>>> plugins.
>>>These plugins could host listeners that activate during the pre-hook 
>>> step,
>>>adding an additional checkpoint before the algorithm executes. So we can
>>>keep blocking scaling down until custom checks are passed to ensure it is
>>>safe to proceed with scaling down.
>>>
>>>3. Implement a parameter that establishes a stability threshold for
>>>heap memory usage percentage or jvm old gc (duration or count). In the
>>>event that the threshold is exceeded, the system would revert to the last
>>>stable scale in the scaling history. Then the stabilization interval 
>>> would
>>>start to work, providing the Flink cluster with additional time to 
>>> process
>>>and reduce the state size
>>>
>>>
>>>
>>> Let me know what you think about it! Thanks!
>>>
>>> Best,
>>>
>>> Yang LI
>>>
>>


Re: Flink operator autoscaler scaling down

2023-11-06 Thread Gyula Fóra
Hey!

Bit of a tricky problem, as it's not really possible to know that the job
will be able to start with lower parallelism in some cases. Custom plugins
may work but that would be an extremely complex solution at this point.

The Kubernetes operator has a built-in rollback mechanism that can help
with rolling back these broken scale operations, have you tried that?
Furthermore we are planning to introduce some heap/GC related metrics soon
(probably after the next release for 1.8.0) that may help us catching these
issues.

Cheers,
Gyula

On Mon, Nov 6, 2023 at 9:27 AM Yang LI  wrote:

> Dear Flink Community,
>
> I am currently working on implementing auto-scaling for my Flink
> application using the Flink operator's autoscaler. During testing, I
> encountered a "java.lang.OutOfMemoryError: Java heap space" exception when
> the autoscaler attempted to scale down. This issue arises when the incoming
> record rate decreases while the state size has not yet reduced
> correspondingly. Despite numerous tests, managing this issue has been
> difficult due to the lack of a parameter that allows for specifying a
> cooldown period(essential for processing and reducing state size)prior to
> actual scaling down. Moreover, determining an optimal duration for this
> cooldown period is also not straightforward. I believe that enhancing the
> autoscaler with a focus on memory checks or more broadly on stability
> conditions could significantly address this issue.. Here are some potential
> solutions that, in my opinion, could improve the situation:
>
>1. Integrate heap memory-related metrics into the metric collection,
>coupled with a memory safety margin check within the autoscaler's 
> algorithm.
>
>2. Introduce a plugin system and a pre-rescaling step in the Flink
>operator's autoscaler, which would allow users to implement custom plugins.
>These plugins could host listeners that activate during the pre-hook step,
>adding an additional checkpoint before the algorithm executes. So we can
>keep blocking scaling down until custom checks are passed to ensure it is
>safe to proceed with scaling down.
>
>3. Implement a parameter that establishes a stability threshold for
>heap memory usage percentage or jvm old gc (duration or count). In the
>event that the threshold is exceeded, the system would revert to the last
>stable scale in the scaling history. Then the stabilization interval would
>start to work, providing the Flink cluster with additional time to process
>and reduce the state size
>
>
>
> Let me know what you think about it! Thanks!
>
> Best,
>
> Yang LI
>


Re: Issue with flink-kubernetes-operator not updating execution.savepoint.path after savepoint deletion

2023-10-21 Thread Gyula Fóra
.flink.runtime.rest.handler.job.JobsOverviewHandler [] -
>> Unhandled exception.
>> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException:
>> Could not send message
>> [RemoteFencedMessage(b55fb309bb698aa75925f70bce254756,
>> RemoteRpcInvocation(null.requestMultipleJobDetails(Time)))] from sender
>> [Actor[akka.tcp://flink@10.11.76.167:6123/temp/dispatcher_0$Tb]] to
>> recipient [Actor[akka://flink/user/rpc/dispatcher_0#1755511719]], because
>> the recipient is unreachable. This can either mean that the recipient has
>> been terminated or that the remote RpcService is currently not reachable.
>> ...
>> 2023-10-21 10:25:45,798 INFO
>>  org.apache.flink.runtime.blob.FileSystemBlobStore[] - Creating
>> highly available BLOB storage directory at
>> s3:blob
>
>
> When the Flink cluster restarts, it doesn't try to restore from the latest
> savepoint anymore. Instead, it tries to restore from a savepoint in
> `execution.savepoint.path` in the flink-config. Since this savepoint was
> from a while ago, it has been disposed already, and so the Flink cluster
> cannot restart again:
>
> 2023-10-21 10:25:47,371 INFO
>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>> [] - Create KubernetesLeaderElector
>> -ee4f7c678794ee16506f9b41425c244e-jobmanager-leader with
>> lock identity e29bd174-fb42-4473-ba09-f0fc3f614b34.
>> ...
>> 2023-10-21 10:25:47,476 INFO
>>  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils
>> [] - Found 0 checkpoints in
>> KubernetesStateHandleStore{configMapName='-ee4f7c678794ee16506f9b41425c244e-jobmanager-leader'}.
>> ...
>> 2023-10-21 10:25:47,650 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - No
>> checkpoint found during restore.
>> ...
>> 2023-10-21 10:25:47,651 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Starting
>> job ee4f7c678794ee16506f9b41425c244e from savepoint
>> s3:///savepoint-ee4f7c-550378a4b4d1 (allowing non restored state)
>> ...
>> 2023-10-21 10:25:47,703 INFO
>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
>> ee4f7c678794ee16506f9b41425c244e reached terminal state FAILED.
>> org.apache.flink.runtime.client.JobInitializationException: Could not
>> start the JobMaster.
>> ...
>> Caused by: java.util.concurrent.CompletionException:
>> java.lang.RuntimeException: java.io.FileNotFoundException: Cannot find
>> checkpoint or savepoint file/directory
>> 's3:///savepoint-ee4f7c-550378a4b4d1' on file system 's3'.
>>
>
> Here is the execution.savepoint.path in the configmap
> flink-config-:
>
> execution.savepoint.path: s3:///savepoint-ee4f7c-550378a4b4d1
>
>
> This Flink application is running on Flink 1.14
>
> On Sat, Sep 23, 2023 at 12:15 AM Gyula Fóra  wrote:
>
>> Hi
>>
>> Operator savepoint retention and savepoint upgrades have nothing to do
>> with each other I think. Retention is only for periodic savepoints
>> triggered by the operator itself.
>>
>> I would upgrade to the latest 1.6.0 operator version before investigating
>> further.
>>
>> Cheers
>> Gyula
>>
>>
>> On Sat, 23 Sep 2023 at 06:02, Nathan Moderwell <
>> nathan.moderw...@robinhood.com> wrote:
>>
>>> Small update on this. I see that the issue is that we use `upgradeMode:
>>> savepoint`, but have not configured the operator to retain savepoints for
>>> long enough (the previous operator we used never deleted savepoints so we
>>> didn't run into this). I am reconfiguring to use `upgradeMode: last-state`
>>> and enabling HA to see if this provides us more stable job restoration on
>>> pod disruption.
>>>
>>> On Fri, Sep 22, 2023 at 10:20 AM Nathan Moderwell <
>>> nathan.moderw...@robinhood.com> wrote:
>>>
>>>> Hi flink-kubernetes-operator maintainers,
>>>>
>>>> We have recently migrated to the official operator and seeing a new
>>>> issue where our FlinkDeployments can fail and crashloop looking for a
>>>> non-existent savepoint. On further inspection, the job is attempting to
>>>> restart from the savepoint specified in execution.savepoint.path. This
>>>> config new for us (wasn't set by previous operator) is seems to be
>>>> automatically set behind the scenes by the official operator. We see the
>>>> savepoint in execution.savepoint.path existed but gets deleted after some
>>>> amount of time (in the latest exam

Re: Flink kubernets operator delete HA metadata after resuming from suspend

2023-10-19 Thread Gyula Fóra
Thanks for the feedback

We discussed with some devs and we are going to release the 1.6.1 based on
these batches in the next week or so.

Gyula

On Thu, Oct 19, 2023 at 9:44 AM Evgeniy Lyutikov 
wrote:

> Hi.
> I patched my copy of the 1.6.0 operator with edits from
> https://github.com/apache/flink-kubernetes-operator/pull/673
> This solved the problem
>
>
> --
> *От:* Tony Chen 
> *Отправлено:* 19 октября 2023 г. 4:18:36
> *Кому:* Evgeniy Lyutikov
> *Копия:* user@flink.apache.org; Gyula Fóra
> *Тема:* Re: Flink kubernets operator delete HA metadata after resuming
> from suspend
>
> HI Evgeniy,
>
> Did you rollback your operator version? If yes, did you run into any
> issues?
>
> I ran into the following exception in my flink-kubernetes-operator pod
> while rolling back, and I was wondering if you encountered this.
>
> 2023-10-18 21:01:15,251 i.f.k.c.e.l.LeaderElector  [ERROR] Exception
> occurred while releasing lock 'LeaseLock: flink-kubernetes-operator -
> flink-operator-lease (flink-kubernetes-operator-74f9688dd-bcqr2)'
> io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LockException:
> Unable to update LeaseLock
> at
> io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock.update(LeaseLock.java:102)
> at
> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.release(LeaderElector.java:139)
> at
> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.stopLeading(LeaderElector.java:120)
> at
> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.lambda$start$2(LeaderElector.java:104)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
> Source)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
> Source)
> at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown
> Source)
> at
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
> Source)
> at io.fabric8.kubernetes.client.utils.Utils.lambda$null$12(Utils.java:523)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
> Source)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
> Source)
> at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown
> Source)
> at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(Unknown
> Source)
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
> at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure
> executing: PUT at:
> https://10.241.0.1/apis/coordination.k8s.io/v1/namespaces/flink-kubernetes-operator/leases/flink-operator-lease
> <https://eur04.safelinks.protection.outlook.com/?url=https%3A%2F%2F10.241.0.1%2Fapis%2Fcoordination.k8s.io%2Fv1%2Fnamespaces%2Fflink-kubernetes-operator%2Fleases%2Fflink-operator-lease=05%7C01%7Ceblyutikov%40avito.ru%7Cb3331280021e47d1da6e08dbd01fd244%7Caf0e07b3b90b472392e63fab11dd5396%7C0%7C0%7C638332607322558199%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=n5H47J4tRlnE4S4AQsVj1jK8bhexUm9tbA1Zwu07LC8%3D=0>.
> Message: Operation cannot be fulfilled on leases.coordination.k8s.io
> <https://eur04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fleases.coordination.k8s.io%2F=05%7C01%7Ceblyutikov%40avito.ru%7Cb3331280021e47d1da6e08dbd01fd244%7Caf0e07b3b90b472392e63fab11dd5396%7C0%7C0%7C638332607322558199%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=8HIPvlz33QGMYoP%2BVvOYLtHIV9XoWZXtvQFNJPgiEx8%3D=0>
> "flink-operator-lease": the object has been modified; please apply your
> changes to the latest version and try again. Received status:
> Status(apiVersion=v1, code=409, details=StatusDetails(causes=[], group=
> coordination.k8s.io
> <https://eur04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcoordination.k8s.io%2F=05%7C01%7Ceblyutikov%40avito.ru%7Cb3331280021e47d1da6e08dbd01fd244%7Caf0e07b3b90b472392e63fab11dd5396%7C0%7C0%7C638332607322558199%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=DMECoc2sISic7vPN8JhRr5g0WMuxheeChCaEYvUeM5I%3D=0>,
> kind=leases, name=flink-operator-lease, retryAfterSeconds=null, uid=null,
> additionalProperties={}), kind=Status, message=Operation cannot be
> fulfilled on leases.coordination.k8s.io
> <https://eur04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fleases.coordination.k8s.io%2F=05%7C01%7Ceblyutikov%40avito.ru%7Cb3

Re: Flink Operator 1.6 causes JobManagerDeploymentStatus: MISSING

2023-10-18 Thread Gyula Fóra
Hi!
Not sure if it’s the same but could you try picking up the fix from the
release branch and confirming that it solves the problem?

If it does we may consider a quick bug fix release.

Cheers
Gyula

On Wed, 18 Oct 2023 at 18:09, Tony Chen  wrote:

> Hi Flink Community,
>
> Most of the Flink applications run on 1.14 at my company. After upgrading
> the Flink Operator to 1.6, we've seen many jobmanager pods show
> "JobManagerDeploymentStatus: MISSING".
>
> Here are some logs from the operator pod on one of our Flink applications:
>
> [m [33m2023-10-18 02:02:40,823 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
> ][nemo/nemo-streaming-users-identi-updates] >>> Event | Warning |
> SAVEPOINTERROR | Savepoint failed for savepointTriggerNonce: null
> ...
> [m [33m2023-10-18 02:02:40,883 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
> ][nemo/nemo-streaming-users-identi-updates] >>> Event | Warning |
> CLUSTERDEPLOYMENTEXCEPTION | Status have been modified externally in
> version 17447422864 Previous: 
> ...
> [m [33m2023-10-18 02:02:40,919 [m [36mi.j.o.p.e.ReconciliationDispatcher
> [m [1;31m[ERROR][nemo/nemo-streaming-users-identi-updates] Error during
> event processing ExecutionScope{ resource id:
> ResourceID{name='nemo-streaming-users-identi-updates', namespace='nemo'},
> version: 17447420285} failed.
> ...
> org.apache.flink.kubernetes.operator.exception.ReconciliationException:
> org.apache.flink.kubernetes.operator.exception.StatusConflictException:
> Status have been modified externally in version 17447422864 Previous:
> 
> ...
> [m [33m2023-10-18 02:03:03,273 [m [36mo.a.f.k.o.o.d.ApplicationObserver [m
> [1;31m[ERROR][nemo/nemo-streaming-users-identi-updates] Missing JobManager
> deployment
> ...
> [m [33m2023-10-18 02:03:03,295 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
> ][nemo/nemo-streaming-users-identi-updates] >>> Event | Warning | MISSING |
> Missing JobManager deployment
> [m [33m2023-10-18 02:03:03,295 [m [36mo.a.f.c.Configuration [m [33m[WARN
> ][nemo/nemo-streaming-users-identi-updates] Config uses deprecated
> configuration key 'high-availability' instead of proper key
> 'high-availability.type'
>
>
> This seems related to this email thread:
> https://www.mail-archive.com/user@flink.apache.org/msg51439.html.
> However, I believe that we're not seeing the HA metadata getting deleted.
>
> What could cause the JobManagerDeploymentStatus to be MISSING?
>
> Thanks,
> Tony
>
> --
>
> 
>
> Tony Chen
>
> Software Engineer
>
> Menlo Park, CA
>
> Don't copy, share, or use this email without permission. If you received
> it by accident, please let us know and then delete it right away.
>


Re: [EXTERNAL]Re: Next Release of the Flink Kubernetes Operator

2023-10-17 Thread Gyula Fóra
Building your own custom operator image is also fairly straightforward and
is also a good exercise to make sure you can easily backport critical fixes
into your prod envs in the future :)

Cheers,
Gyula

On Tue, Oct 17, 2023 at 10:48 AM Niklas Wilcke 
wrote:

> Hi Gyula,
>
> thank you very much for the update about the release schedule and for
> pointing me to the snapshot images. This is indeed very helpful and we will
> consider our options now.
>
> Regards,
> Niklas
>
> On 16. Oct 2023, at 17:56, Gyula Fóra  wrote:
>
> Hi Niklas!
>
> We weren't planning a 1.6.1 release and instead we were focusing on
> wrapping up changes for the 1.7.0 release coming in a month or so.
>
> However if there is enough interest and we have some committers/PMC
> willing to help with the release we can always do 1.6.1 but I personally
> don't have the bandwidth for it right now.
>
> In case you want to pick up the latest 1.6 backported fixes, you could
> either create a custom operator build or simply pick up the image built
> from the branch automatically [1]
> This would probably be an easy way to get the fixes into your environment.
> But of course this is not an official release, just a snapshot version so
> you have to do your own verification to make sure it works for you.
>
> Cheers,
> Gyula
>
> [1]
> https://github.com/apache/flink-kubernetes-operator/pkgs/container/flink-kubernetes-operator/127962962?tag=3f0dc2e
>
> On Mon, Oct 16, 2023 at 5:41 PM Niklas Wilcke 
> wrote:
>
>> Hi Flink Community,
>>
>> we are waiting for the next release of the Flink Kubernetes Operator,
>> because we are experiencing problems with loosing the HA metadata similar
>> to FLINK-33011 [0].
>> Since the problem is already fixed and also backported to the 1.6 branch
>> [1], my question would be whether we can expect a release soon?
>> Any input is highly appreciated. Thank you!
>>
>> Regards,
>> Niklas
>>
>>
>> [0] https://issues.apache.org/jira/browse/FLINK-33011
>> [1]
>> https://github.com/apache/flink-kubernetes-operator/commits/release-1.6
>>
>
>


Re: Next Release of the Flink Kubernetes Operator

2023-10-16 Thread Gyula Fóra
Hi Niklas!

We weren't planning a 1.6.1 release and instead we were focusing on
wrapping up changes for the 1.7.0 release coming in a month or so.

However if there is enough interest and we have some committers/PMC willing
to help with the release we can always do 1.6.1 but I personally don't have
the bandwidth for it right now.

In case you want to pick up the latest 1.6 backported fixes, you could
either create a custom operator build or simply pick up the image built
from the branch automatically [1]
This would probably be an easy way to get the fixes into your environment.
But of course this is not an official release, just a snapshot version so
you have to do your own verification to make sure it works for you.

Cheers,
Gyula

[1]
https://github.com/apache/flink-kubernetes-operator/pkgs/container/flink-kubernetes-operator/127962962?tag=3f0dc2e

On Mon, Oct 16, 2023 at 5:41 PM Niklas Wilcke 
wrote:

> Hi Flink Community,
>
> we are waiting for the next release of the Flink Kubernetes Operator,
> because we are experiencing problems with loosing the HA metadata similar
> to FLINK-33011 [0].
> Since the problem is already fixed and also backported to the 1.6 branch
> [1], my question would be whether we can expect a release soon?
> Any input is highly appreciated. Thank you!
>
> Regards,
> Niklas
>
>
> [0] https://issues.apache.org/jira/browse/FLINK-33011
> [1]
> https://github.com/apache/flink-kubernetes-operator/commits/release-1.6
>


Re: Using Flink k8s operator on OKD

2023-10-05 Thread Gyula Fóra
Hey,
We don’t have minimal supported version in the docs as we haven’t
experienced any issue specific to kubernetes versions so far.

We don’t really rely on any newer features

Cheers
Gyula



On Fri, 6 Oct 2023 at 06:02, Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> It seems that problem was caused by k8s 1.19.
> When we deployed Flink operator on vanilla k8s 1.19 we got the same error
> that we have on OKD 4.6.0 We are planing to update OKD to newer version
> that will use more up to date k8s.
>
> What is the minimal k8s version required for/supported by Flink operator?
> I haven't found it in operator docs - is not there or I have missed it?
>
> Thanks.
>
> Krzysztof Chmielewski
>
> śr., 20 wrz 2023 o 22:32 Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> napisał(a):
>
>> Thank you Zach,
>> our flink-operator and flink deployments are in same namespace -> called
>> "flink". We have executed what is described in [1] before my initial
>> message.
>> We are using OKD 4.6.0 that according to the doc is using k8s 1.19. the
>> very same config is working fine on "vanilla" k8s, but for some reason it
>> is failing on that system where we have OKD installed.
>>
>> I believe we do have proper roles/sa assigned, for example:
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *oc get saNAME SECRETS   AGEbuilder  2
>> 6d22hdefault  2 6d22hdeployer 2 6d22hflink
>>2 6d19hflink-operator   2 17hoc
>> get roleNAMECREATED ATflink   2023-09-13T11:53:42Zoc get
>> rolebindingNAME  ROLE
>>AGEflink-role-bindingRole/flink
>>6d19h*
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.6/docs/operations/rbac/
>>
>>
>> Thanks, Krzysztof Chmielewski
>>
>> śr., 20 wrz 2023 o 05:40 Zach Lorimer  napisał(a):
>>
>>> I haven’t used OKD but it sounds like OLM. If that’s the case, I’m
>>> assuming the operator was deployed to the “operators” namespace. In that
>>> case, you’ll need to create the RBACs and such in the Flink namespace for
>>> that deployment to work.
>>>
>>> For example this needs to be in each namespace that you want to have
>>> Flink deployments in.
>>>
>>> kubectl apply -f - <>> apiVersion: v1
>>> kind: ServiceAccount
>>> metadata:
>>>   labels:
>>> app.kubernetes.io/name: flink-kubernetes-operator
>>> app.kubernetes.io/version: 1.5.0
>>>   name: flink
>>> ---
>>> apiVersion: rbac.authorization.k8s.io/v1
>>> kind: Role
>>> metadata:
>>>   labels:
>>> app.kubernetes.io/name: flink-kubernetes-operator
>>> app.kubernetes.io/version: 1.5.0
>>>   name: flink
>>> rules:
>>> - apiGroups:
>>>   - ""
>>>   resources:
>>>   - pods
>>>   - configmaps
>>>   verbs:
>>>   - '*'
>>> - apiGroups:
>>>   - apps
>>>   resources:
>>>   - deployments
>>>   - deployments/finalizers
>>>   verbs:
>>>   - '*'
>>> ---
>>> apiVersion: rbac.authorization.k8s.io/v1
>>> kind: RoleBinding
>>> metadata:
>>>   labels:
>>> app.kubernetes.io/name: flink-kubernetes-operator
>>> app.kubernetes.io/version: 1.5.0
>>>   name: flink-role-binding
>>> roleRef:
>>>   apiGroup: rbac.authorization.k8s.io
>>>   kind: Role
>>>   name: flink
>>> subjects:
>>> - kind: ServiceAccount
>>>   name: flink
>>> EOF
>>>
>>> Hopefully that helps.
>>>
>>>
>>> On Tue, Sep 19, 2023 at 5:40 PM Krzysztof Chmielewski <
>>> krzysiek.chmielew...@gmail.com> wrote:
>>>
 Hi community,
 I was wondering if anyone tried to deploy Flink using Flink k8s
 operator on machine where OKD [1] is installed?

 We have tried to install Flink k8s operator version 1.6 which seems to
 succeed, however when we try to deploy simple Flink deployment we are
 getting an error.

 2023-09-19 10:11:36,440 i.j.o.p.e.ReconciliationDispatcher
 [ERROR][flink/test] Error during event processing ExecutionScope{ resource
 id: ResourceID{name='test', namespace='flink'}, version: 684949788} failed.

 io.fabric8.kubernetes.client.KubernetesClientException: Failure
 executing: PUT at:
 https://172.30.0.1:443/apis/flink.apache.org/v1beta1/namespaces/flink/flinkdeployments/test.
 Message: FlinkDeployment.flink.apache.org "test" is invalid:
 [spec.ingress: Invalid value: "null": spec.ingress in body must be of type
 object: "null", spec.mode: Invalid value: "null": spec.mode in body must be
 of type string: "null", spec.mode: Unsupported value: "null": supported
 values: "native", "standalone", spec.logConfiguration: Invalid value:
 "null": spec.logConfiguration in body must be of type object: "null",
 spec.imagePullPolicy: Invalid value: "null": spec.imagePullPolicy in body
 must be of type string: "null", spec.jobManager.podTemplate: Invalid value:
 "null": spec.jobManager.podTemplate in body must be of type object: "null",
 

Re: Rolling back a bad deployment of FlinkDeployment on kubernetes

2023-10-05 Thread Gyula Fóra
Hi Tony!

There are still a few corner cases when the operator cannot upgrade /
rollback deployments due to the loss of HA metadata (and with that
checkpoint information).

Most of these issues are not related to the operator logic directly but to
how Flink handles certain failures and are related to:

https://issues.apache.org/jira/browse/FLINK-30444 and
https://cwiki.apache.org/confluence/display/FLINK/FLIP-360%3A+Merging+the+ExecutionGraphInfoStore+and+the+JobResultStore+into+a+single+component+CompletedJobStore

Rollbacks are designed to allow automatic fallback to the last stable spec,
but the mechanism doesn't work in these corner cases (in the same way spec
upgrades also dont)

I hope this helps to understand the problem.
The solution in these cases is to manually recover the job from the last
checkpoint/savepoint.

Cheers,
Gyula


On Thu, Oct 5, 2023 at 7:56 PM Tony Chen  wrote:

> I tried this out with operator version 1.4 and it didn't work for me. I
> noticed that when I was deploying a bad version, the Kubernetes HA metadata
> and configmaps were deleted:
>
> [m [33m2023-10-05 14:52:17,493 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
> ][flink-testing-service/flink-testing-service] >>> Event | Info |
> SPECCHANGED | UPGRADE change(s) detected
> (FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJo,job.initialSavepointPath=s3a://robinhood-prod-flink/flink-testing-service/savepoints/savepoint-b832ef-05b185cb5800]
> differs from
> FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJob,job.initialSavepointPath=]),
> starting reconciliation.
> ...
> [m [33m2023-10-05 14:52:51,054 [m [36mo.a.f.k.o.s.AbstractFlinkService [m
> [32m[INFO ][flink-testing-service/flink-testing-service] Cluster shutdown
> completed.
> [m [33m2023-10-05 14:52:51,054 [m [36mo.a.f.k.o.s.AbstractFlinkService [m
> [32m[INFO ][flink-testing-service/flink-testing-service] Deleting
> Kubernetes HA metadata
> [m [33m2023-10-05 14:52:51,196 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
> ][flink-testing-service/flink-testing-service] >>> Status | Info |
> UPGRADING | The resource is being upgraded
>
>
>
> Eventually, the rollbak fails because the HA metadata is missing:
>
> [m [33m2023-10-05 14:58:16,119 [m
> [36mo.a.f.k.o.r.d.AbstractFlinkResourceReconciler [m [33m[WARN
> ][flink-testing-service/flink-testing-service] Rollback is not possible due
> to missing HA metadata
>
>
>
> Besides setting kubernetes.operator.deployment.rollback.enabled: true, is
> there anything else that I need to configure?
>
> On Thu, Oct 5, 2023 at 10:35 AM Tony Chen 
> wrote:
>
>> I just saw this experimental feature in the documentation:
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#application-upgrade-rollbacks-experimental
>>
>> I'm guessing this is the only way to automate rollbacks for now.
>>
>> On Wed, Oct 4, 2023 at 3:25 PM Tony Chen 
>> wrote:
>>
>>> Hi Flink Community,
>>>
>>> I am currently running Apache flink-kubernetes-operator on our
>>> kubernetes clusters, and I have Flink applications that are deployed using
>>> the FlinkDeployment Custom Resources (CR). I am trying to automate the
>>> process of rollbacks and I am running into some issues.
>>>
>>> I was testing out a bad deployment where the jobmanager never becomes
>>> healthy. I simulated this bad deployment by creating a Flink image with a
>>> bug in it. I see in the operator logs that the jobmanager is unhealthy:
>>>
>>> [m [33m2023-10-02 22:14:34,874 [m
>>> [36mo.a.f.k.o.r.d.AbstractFlinkResourceReconciler [m [32m[INFO
>>> ][flink-testing-service/flink-testing-service] UPGRADE change(s) detected
>>> (FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJo]
>>> differs from
>>> FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJob]),
>>> starting reconciliation.
>>> ...
>>> [m [33m2023-10-02 22:15:09,001 [m [36mo.a.f.k.o.l.AuditUtils [m
>>> [32m[INFO ][flink-testing-service/flink-testing-service] >>> Status | Info
>>> | UPGRADING | The resource is being upgraded
>>> ...
>>> [m [33m2023-10-02 22:17:23,911 [m [36mo.a.f.k.o.l.AuditUtils [m
>>> [32m[INFO ][flink-testing-service/flink-testing-service] >>> Status | Error
>>> | DEPLOYED |
>>> {"type":"org.apache.flink.kubernetes.operator.exception.DeploymentFailedException","message":"back-off
>>> 20s restarting failed container=flink-main-container
>>> pod=flink-testing-service-749dd97c75-4w9ps_flink-testing-service(6db1adb3-4ca4-4924-a8c3-57a417818d85)","additionalMetadata":{"reason":"CrashLoopBackOff"},"throwableList":[]}
>>>
>>> ...
>>> [m [33m2023-10-02 22:17:33,576 [m [36mo.a.f.k.o.o.d.ApplicationObserver
>>> [m [32m[INFO ][flink-testing-service/flink-testing-service] Observing
>>> JobManager deployment. Previous status: ERROR
>>>
>>>
>>> What I do next is I change the spec of the FlinkDeployment so that it
>>> uses a Flink image that is healthy. The operator shows 

Re: Issue with flink-kubernetes-operator not updating execution.savepoint.path after savepoint deletion

2023-09-22 Thread Gyula Fóra
Hi

Operator savepoint retention and savepoint upgrades have nothing to do with
each other I think. Retention is only for periodic savepoints triggered by
the operator itself.

I would upgrade to the latest 1.6.0 operator version before investigating
further.

Cheers
Gyula


On Sat, 23 Sep 2023 at 06:02, Nathan Moderwell <
nathan.moderw...@robinhood.com> wrote:

> Small update on this. I see that the issue is that we use `upgradeMode:
> savepoint`, but have not configured the operator to retain savepoints for
> long enough (the previous operator we used never deleted savepoints so we
> didn't run into this). I am reconfiguring to use `upgradeMode: last-state`
> and enabling HA to see if this provides us more stable job restoration on
> pod disruption.
>
> On Fri, Sep 22, 2023 at 10:20 AM Nathan Moderwell <
> nathan.moderw...@robinhood.com> wrote:
>
>> Hi flink-kubernetes-operator maintainers,
>>
>> We have recently migrated to the official operator and seeing a new issue
>> where our FlinkDeployments can fail and crashloop looking for a
>> non-existent savepoint. On further inspection, the job is attempting to
>> restart from the savepoint specified in execution.savepoint.path. This
>> config new for us (wasn't set by previous operator) is seems to be
>> automatically set behind the scenes by the official operator. We see the
>> savepoint in execution.savepoint.path existed but gets deleted after some
>> amount of time (in the latest example, a few hours). Then when there is
>> some pod disruption, the job attempts to restart from the savepoint (which
>> was deleted) and starts crashlooping.
>>
>> Hoping you can help us troubleshoot and figure out if this can be solved
>> through configuration (we are using equivalent configs from our previous
>> operator where we did not have this issue). Adding some details on version
>> and k8s state for your reference. Thank you for your support!
>>
>> Flink Version: 1.14.5
>> Flink Operator Version: 1.4.0
>>
>> At the time of the issue, here is the flink-config we see in the
>> configmap (the savepoint savepoint-bad5e5-6ab08cf0808e has been deleted
>> from s3 at this point):
>>
>> kubernetes.jobmanager.replicas: 1
>> jobmanager.rpc.address: 
>> metrics.scope.task:
>> flink.taskmanager.job..task..metric
>> kubernetes.service-account: 
>> kubernetes.cluster-id: 
>> pipeline.auto-generate-uids: false
>> metrics.scope.tm: flink.taskmanager.metric
>> parallelism.default: 2
>> kubernetes.namespace: 
>> metrics.reporters: prom
>> kubernetes.jobmanager.owner.reference: 
>> metrics.reporter.prom.port: 9090
>> taskmanager.memory.process.size: 10G
>> kubernetes.internal.jobmanager.entrypoint.class:
>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
>> pipeline.name: 
>> execution.savepoint.path: s3:///savepoint-bad5e5-6ab08cf0808e
>> kubernetes.pod-template-file:
>> /tmp/flink_op_generated_podTemplate_12924532349572558288.yaml
>> state.backend.rocksdb.localdir: /rocksdb/
>> kubernetes.pod-template-file.taskmanager:
>> /tmp/flink_op_generated_podTemplate_1129545383743356980.yaml
>> web.cancel.enable: false
>> execution.checkpointing.timeout: 5 min
>> kubernetes.container.image.pull-policy: IfNotPresent
>> $internal.pipeline.job-id: bad5e5682b8f4fbefbf75b00d285ac10
>> kubernetes.jobmanager.cpu: 2.0
>> state.backend: filesystem
>> $internal.flink.version: v1_14
>> kubernetes.pod-template-file.jobmanager:
>> /tmp/flink_op_generated_podTemplate_824610597202468981.yaml
>> blob.server.port: 6124
>> kubernetes.jobmanager.annotations:
>> flinkdeployment.flink.apache.org/generation:14
>> metrics.scope.operator:
>> flink.taskmanager.job..operator..metric
>> state.savepoints.dir: s3:///savepoints
>> kubernetes.taskmanager.cpu: 2.0
>> execution.savepoint.ignore-unclaimed-state: true
>> $internal.application.program-args:
>> kubernetes.container.image: 
>> taskmanager.numberOfTaskSlots: 1
>> metrics.scope.jm.job: flink.jobmanager.job..metric
>> kubernetes.rest-service.exposed.type: ClusterIP
>> metrics.reporter.prom.class:
>> org.apache.flink.metrics.prometheus.PrometheusReporter
>> $internal.application.main: 
>> metrics.scope.jm: flink.jobmanager.metric
>> execution.target: kubernetes-application
>> jobmanager.memory.process.size: 10G
>> metrics.scope.tm.job: flink.taskmanager.job..metric
>> taskmanager.rpc.port: 6122
>> internal.cluster.execution-mode: NORMAL
>> execution.checkpointing.externalized-checkpoint-retention:
>> RETAIN_ON_CANCELLATION
>> pipeline.jars: local:///build/flink/usrlib/.jar
>> state.checkpoints.dir: s3:///checkpoints
>>
>> At the time of the issue, here is our FlinkDeployment Spec:
>>
>> Spec:
>>   Flink Configuration:
>> execution.checkpointing.timeout:  5 min
>> kubernetes.operator.job.restart.failed:   true
>> kubernetes.operator.periodic.savepoint.interval:  600s
>> metrics.reporter.prom.class:
>>  org.apache.flink.metrics.prometheus.PrometheusReporter
>> metrics.reporter.prom.port: 

Re: Zookeeper HA with Kubernetes: Possible to use the same Zookeeper cluster w/multiple Flink Operators?

2023-09-20 Thread Gyula Fóra
Hi!

The cluster-id for each FlinkDeployment is simply the name of the
deployment. So they are all different in a given namespace. (In other words
they are not fixed as your question suggests but set automatically)

So there should be no problem sharing the ZK cluster .

Cheers
Gyula

On Thu, 21 Sep 2023 at 03:46, Brian King  wrote:

> Hello Flink Users!
>
> We're attempting to deploy a Flink application cluster on Kubernetes,
> using the Flink Operator and Zookeeper for HA.
>
> We're using Flink 1.16 and I have a question about some of the Zookeeper
> configuration[0]:
>
> "high-availability.zookeeper.path.root" is described as "The *root
> ZooKeeper node*, under which all cluster nodes are placed"
>
> "high-availability.cluster-id" , which says "important: customize per
> cluster." But it also says "you should not set this value manually when
> running on [...] native Kubernetes [...]in those cases a cluster-id is
> [...] automatically generated."
>
> Our design calls for multiple Flink application clusters managed by the
> same Flink Operator, and using the same Zookeeper quorum for each Flink
> Application cluster. Will the Flink Operator be able to handle this, or
> will the different clusters collide due to the fixed
> "high-availability.cluster-id" value? Is it possible to avoid this by
> setting a unique "high-availability.zookeeper.path.root" for each
> application cluster?
>
> Thanks for your time. I'm new to Flink, so I apologize if I did not
> explain myself clearly. Please let me know if you need additional
> information.
>
> Best,
>
> Brian King
> SRE, Data Platform/Search Platform
> Wikimedia Foundation
> IRC: inflatador
>
> [0]
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/ha/zookeeper_ha/#configuration
>
>
>


Re: Checkpoint jitter?

2023-09-13 Thread Gyula Fóra
No, I think what he means is to trigger the checkpoint at slightly
different times at the different sources so the different parts of the
pipeline would not checkpoint at the same time.

Gyula

On Wed, Sep 13, 2023 at 10:32 AM Hangxiang Yu  wrote:

> Hi, Matyas.
> Do you mean something like adjusting checkpoint intervals dynamically
> or frequency of uploading files according to the pressure of the durable
> storage ?
>
> On Wed, Sep 13, 2023 at 9:12 AM Őrhidi Mátyás 
> wrote:
>
>> Hey folks,
>>
>> Is it possible to add some sort of jitter to the checkpointing logic for
>> massively parallel jobs to mitigate the burst impact on the durable storage
>> when a checkpoint is triggered?
>>
>> Thanks,
>> Matyas
>>
>
>
> --
> Best,
> Hangxiang.
>


Re: Flink kubernets operator delete HA metadata after resuming from suspend

2023-09-12 Thread Gyula Fóra
Hi!

I think this issue is the same as
https://issues.apache.org/jira/browse/FLINK-33011
Not sure what exactly is the underlying cause as I could not repro it, but
the fix should be simple.

Also I believe it's not 1.6.0 related unless a JOSDK/Fabric8 upgrade caused
it.

Cheers,
Gyula


On Mon, Sep 11, 2023 at 7:47 PM Gyula Fóra  wrote:

> You don’t need it but you can really mess up clusters by rolling back CRD
> changes…
>
> On Mon, 11 Sep 2023 at 19:42, Evgeniy Lyutikov 
> wrote:
>
>> Why we need to use latest CRD version with older operator version?
>> ------
>> *От:* Gyula Fóra 
>> *Отправлено:* 12 сентября 2023 г. 0:36:26
>>
>> *Кому:* Evgeniy Lyutikov
>> *Копия:* user@flink.apache.org
>> *Тема:* Re: Flink kubernets operator delete HA metadata after resuming
>> from suspend
>>
>> Do not change the CRD but you can roll back the operator itself I believe
>>
>> Gyula
>>
>> On Mon, 11 Sep 2023 at 18:52, Evgeniy Lyutikov 
>> wrote:
>>
>>> Is it safe to rollback the operator version with replace to old CRDs?
>>> --
>>> *От:* Evgeniy Lyutikov 
>>> *Отправлено:* 11 сентября 2023 г. 23:50:26
>>> *Кому:* Gyula Fóra
>>>
>>> *Копия:* user@flink.apache.org
>>> *Тема:* Re: Flink kubernets operator delete HA metadata after resuming
>>> from suspend
>>>
>>>
>>> Hi!
>>> No, no one could restart jobmanager,
>>> I monitored the pods in real time, they all deleted when suspended as
>>> expected.
>>>
>>>
>>> --
>>> *От:* Gyula Fóra 
>>> *Отправлено:* 11 сентября 2023 г. 20:34:52
>>> *Кому:* Evgeniy Lyutikov
>>> *Копия:* user@flink.apache.org
>>> *Тема:* Re: Flink kubernets operator delete HA metadata after resuming
>>> from suspend
>>>
>>> Hi!
>>>
>>> I could not reproduce your issue, last-state suspend/restore seems to
>>> work as before.
>>> However these 2 logs seem very suspicious:
>>>
>>> 2023-09-11 06:02:07,481 o.a.f.k.o.o.d.ApplicationObserver [INFO
>>> ][rec-job/rec-job] Observing JobManager deployment. Previous status: MISSING
>>> 2023-09-11 06:02:07,488 o.a.f.k.o.o.d.ApplicationObserver [INFO
>>> ][rec-job/rec-job] JobManager is being deployed
>>>
>>> Looks like after suspending (and deleting the JobManager Deployment)
>>> somebody restarted the JobManager manually. Is that possible?
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Mon, Sep 11, 2023 at 2:59 PM Evgeniy Lyutikov 
>>> wrote:
>>>
>>>> Hi all!
>>>> After updating the operator to version 1.6.0, suspended and resuming
>>>> flink jobs stopped working.
>>>> When job resumes, the high availability metadata is removed.
>>>>
>>>> Suspend job:
>>>> 2023-09-11 06:01:41,548 o.a.f.k.o.l.AuditUtils [INFO
>>>> ][rec-job/rec-job] >>> Event  | Info| SPECCHANGED | UPGRADE
>>>> change(s) detected (Diff: FlinkDeploymentSpec[job.state : running ->
>>>> suspended]), starting reconciliation.
>>>> 2023-09-11 06:01:41,548 o.a.f.k.o.r.d.AbstractJobReconciler [INFO
>>>> ][rec-job/rec-job] Job is in running state, ready for upgrade with
>>>> LAST_STATE
>>>> 2023-09-11 06:01:41,558 o.a.f.k.o.l.AuditUtils [INFO
>>>> ][rec-job/rec-job] >>> Event  | Info| SUSPENDED   | Suspending
>>>> existing deployment.
>>>> 2023-09-11 06:01:41,558 o.a.f.k.o.s.AbstractFlinkService [INFO
>>>> ][rec-job/rec-job] Deleting cluster with Foreground propagation
>>>> 2023-09-11 06:01:41,558 o.a.f.k.o.s.NativeFlinkService [INFO
>>>> ][rec-job/rec-job] Deleting JobManager deployment while preserving HA
>>>> metadata.
>>>> 2023-09-11 06:01:41,598 o.a.f.k.o.s.AbstractFlinkService [INFO
>>>> ][rec-job/rec-job] Waiting for cluster shutdown...
>>>> 2023-09-11 06:01:45,667 o.a.f.k.o.s.AbstractFlinkService [INFO
>>>> ][rec-job/rec-job] Waiting for cluster shutdown... (5s)
>>>> 2023-09-11 06:01:50,730 o.a.f.k.o.s.AbstractFlinkService [INFO
>>>> ][rec-job/rec-job] Waiting for cluster shutdown... (10s)
>>>> 2023-09-11 06:01:55,837 o.a.f.k.o.s.AbstractFlinkService [INFO
>>>> ][rec-job/rec-job] Waiting for cluster shutdown... (15s)
>>>> 2023-09-11 06:02:00,885 o.a.f.k.o.s.AbstractFlinkService [INFO
>>>> ]

Re: Flink kubernets operator delete HA metadata after resuming from suspend

2023-09-11 Thread Gyula Fóra
You don’t need it but you can really mess up clusters by rolling back CRD
changes…

On Mon, 11 Sep 2023 at 19:42, Evgeniy Lyutikov  wrote:

> Why we need to use latest CRD version with older operator version?
> --
> *От:* Gyula Fóra 
> *Отправлено:* 12 сентября 2023 г. 0:36:26
>
> *Кому:* Evgeniy Lyutikov
> *Копия:* user@flink.apache.org
> *Тема:* Re: Flink kubernets operator delete HA metadata after resuming
> from suspend
>
> Do not change the CRD but you can roll back the operator itself I believe
>
> Gyula
>
> On Mon, 11 Sep 2023 at 18:52, Evgeniy Lyutikov 
> wrote:
>
>> Is it safe to rollback the operator version with replace to old CRDs?
>> --
>> *От:* Evgeniy Lyutikov 
>> *Отправлено:* 11 сентября 2023 г. 23:50:26
>> *Кому:* Gyula Fóra
>>
>> *Копия:* user@flink.apache.org
>> *Тема:* Re: Flink kubernets operator delete HA metadata after resuming
>> from suspend
>>
>>
>> Hi!
>> No, no one could restart jobmanager,
>> I monitored the pods in real time, they all deleted when suspended as
>> expected.
>>
>>
>> --
>> *От:* Gyula Fóra 
>> *Отправлено:* 11 сентября 2023 г. 20:34:52
>> *Кому:* Evgeniy Lyutikov
>> *Копия:* user@flink.apache.org
>> *Тема:* Re: Flink kubernets operator delete HA metadata after resuming
>> from suspend
>>
>> Hi!
>>
>> I could not reproduce your issue, last-state suspend/restore seems to
>> work as before.
>> However these 2 logs seem very suspicious:
>>
>> 2023-09-11 06:02:07,481 o.a.f.k.o.o.d.ApplicationObserver [INFO
>> ][rec-job/rec-job] Observing JobManager deployment. Previous status: MISSING
>> 2023-09-11 06:02:07,488 o.a.f.k.o.o.d.ApplicationObserver [INFO
>> ][rec-job/rec-job] JobManager is being deployed
>>
>> Looks like after suspending (and deleting the JobManager Deployment)
>> somebody restarted the JobManager manually. Is that possible?
>>
>> Cheers,
>> Gyula
>>
>> On Mon, Sep 11, 2023 at 2:59 PM Evgeniy Lyutikov 
>> wrote:
>>
>>> Hi all!
>>> After updating the operator to version 1.6.0, suspended and resuming
>>> flink jobs stopped working.
>>> When job resumes, the high availability metadata is removed.
>>>
>>> Suspend job:
>>> 2023-09-11 06:01:41,548 o.a.f.k.o.l.AuditUtils [INFO
>>> ][rec-job/rec-job] >>> Event  | Info| SPECCHANGED | UPGRADE
>>> change(s) detected (Diff: FlinkDeploymentSpec[job.state : running ->
>>> suspended]), starting reconciliation.
>>> 2023-09-11 06:01:41,548 o.a.f.k.o.r.d.AbstractJobReconciler [INFO
>>> ][rec-job/rec-job] Job is in running state, ready for upgrade with
>>> LAST_STATE
>>> 2023-09-11 06:01:41,558 o.a.f.k.o.l.AuditUtils [INFO
>>> ][rec-job/rec-job] >>> Event  | Info| SUSPENDED   | Suspending
>>> existing deployment.
>>> 2023-09-11 06:01:41,558 o.a.f.k.o.s.AbstractFlinkService [INFO
>>> ][rec-job/rec-job] Deleting cluster with Foreground propagation
>>> 2023-09-11 06:01:41,558 o.a.f.k.o.s.NativeFlinkService [INFO
>>> ][rec-job/rec-job] Deleting JobManager deployment while preserving HA
>>> metadata.
>>> 2023-09-11 06:01:41,598 o.a.f.k.o.s.AbstractFlinkService [INFO
>>> ][rec-job/rec-job] Waiting for cluster shutdown...
>>> 2023-09-11 06:01:45,667 o.a.f.k.o.s.AbstractFlinkService [INFO
>>> ][rec-job/rec-job] Waiting for cluster shutdown... (5s)
>>> 2023-09-11 06:01:50,730 o.a.f.k.o.s.AbstractFlinkService [INFO
>>> ][rec-job/rec-job] Waiting for cluster shutdown... (10s)
>>> 2023-09-11 06:01:55,837 o.a.f.k.o.s.AbstractFlinkService [INFO
>>> ][rec-job/rec-job] Waiting for cluster shutdown... (15s)
>>> 2023-09-11 06:02:00,885 o.a.f.k.o.s.AbstractFlinkService [INFO
>>> ][rec-job/rec-job] Waiting for cluster shutdown... (20s)
>>> 2023-09-11 06:02:01,895 o.a.f.k.o.s.AbstractFlinkService [INFO
>>> ][rec-job/rec-job] Cluster shutdown completed.
>>> 2023-09-11 06:02:01,973 o.a.f.k.o.l.AuditUtils [INFO
>>> ][rec-job/rec-job] >>> Status | Info| SUSPENDED   | The resource
>>> (job) has been suspended
>>> 2023-09-11 06:02:01,981 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler
>>> [INFO ][rec-job/rec-job] Resource fully reconciled, nothing to do...
>>>
>>> Resume:
>>> 2023-09-11 06:02:07,481 o.a.f.k.o.o.d.ApplicationObserver [INFO
>>> ][rec-job/rec-job] Observing JobManager deployment. Previous st

Re: Flink kubernets operator delete HA metadata after resuming from suspend

2023-09-11 Thread Gyula Fóra
Do not change the CRD but you can roll back the operator itself I believe

Gyula

On Mon, 11 Sep 2023 at 18:52, Evgeniy Lyutikov  wrote:

> Is it safe to rollback the operator version with replace to old CRDs?
> --
> *От:* Evgeniy Lyutikov 
> *Отправлено:* 11 сентября 2023 г. 23:50:26
> *Кому:* Gyula Fóra
>
> *Копия:* user@flink.apache.org
> *Тема:* Re: Flink kubernets operator delete HA metadata after resuming
> from suspend
>
>
> Hi!
> No, no one could restart jobmanager,
> I monitored the pods in real time, they all deleted when suspended as
> expected.
>
>
> --
> *От:* Gyula Fóra 
> *Отправлено:* 11 сентября 2023 г. 20:34:52
> *Кому:* Evgeniy Lyutikov
> *Копия:* user@flink.apache.org
> *Тема:* Re: Flink kubernets operator delete HA metadata after resuming
> from suspend
>
> Hi!
>
> I could not reproduce your issue, last-state suspend/restore seems to work
> as before.
> However these 2 logs seem very suspicious:
>
> 2023-09-11 06:02:07,481 o.a.f.k.o.o.d.ApplicationObserver [INFO
> ][rec-job/rec-job] Observing JobManager deployment. Previous status: MISSING
> 2023-09-11 06:02:07,488 o.a.f.k.o.o.d.ApplicationObserver [INFO
> ][rec-job/rec-job] JobManager is being deployed
>
> Looks like after suspending (and deleting the JobManager Deployment)
> somebody restarted the JobManager manually. Is that possible?
>
> Cheers,
> Gyula
>
> On Mon, Sep 11, 2023 at 2:59 PM Evgeniy Lyutikov 
> wrote:
>
>> Hi all!
>> After updating the operator to version 1.6.0, suspended and resuming
>> flink jobs stopped working.
>> When job resumes, the high availability metadata is removed.
>>
>> Suspend job:
>> 2023-09-11 06:01:41,548 o.a.f.k.o.l.AuditUtils [INFO
>> ][rec-job/rec-job] >>> Event  | Info| SPECCHANGED | UPGRADE
>> change(s) detected (Diff: FlinkDeploymentSpec[job.state : running ->
>> suspended]), starting reconciliation.
>> 2023-09-11 06:01:41,548 o.a.f.k.o.r.d.AbstractJobReconciler [INFO
>> ][rec-job/rec-job] Job is in running state, ready for upgrade with
>> LAST_STATE
>> 2023-09-11 06:01:41,558 o.a.f.k.o.l.AuditUtils [INFO
>> ][rec-job/rec-job] >>> Event  | Info| SUSPENDED   | Suspending
>> existing deployment.
>> 2023-09-11 06:01:41,558 o.a.f.k.o.s.AbstractFlinkService [INFO
>> ][rec-job/rec-job] Deleting cluster with Foreground propagation
>> 2023-09-11 06:01:41,558 o.a.f.k.o.s.NativeFlinkService [INFO
>> ][rec-job/rec-job] Deleting JobManager deployment while preserving HA
>> metadata.
>> 2023-09-11 06:01:41,598 o.a.f.k.o.s.AbstractFlinkService [INFO
>> ][rec-job/rec-job] Waiting for cluster shutdown...
>> 2023-09-11 06:01:45,667 o.a.f.k.o.s.AbstractFlinkService [INFO
>> ][rec-job/rec-job] Waiting for cluster shutdown... (5s)
>> 2023-09-11 06:01:50,730 o.a.f.k.o.s.AbstractFlinkService [INFO
>> ][rec-job/rec-job] Waiting for cluster shutdown... (10s)
>> 2023-09-11 06:01:55,837 o.a.f.k.o.s.AbstractFlinkService [INFO
>> ][rec-job/rec-job] Waiting for cluster shutdown... (15s)
>> 2023-09-11 06:02:00,885 o.a.f.k.o.s.AbstractFlinkService [INFO
>> ][rec-job/rec-job] Waiting for cluster shutdown... (20s)
>> 2023-09-11 06:02:01,895 o.a.f.k.o.s.AbstractFlinkService [INFO
>> ][rec-job/rec-job] Cluster shutdown completed.
>> 2023-09-11 06:02:01,973 o.a.f.k.o.l.AuditUtils [INFO
>> ][rec-job/rec-job] >>> Status | Info| SUSPENDED   | The resource
>> (job) has been suspended
>> 2023-09-11 06:02:01,981 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler
>> [INFO ][rec-job/rec-job] Resource fully reconciled, nothing to do...
>>
>> Resume:
>> 2023-09-11 06:02:07,481 o.a.f.k.o.o.d.ApplicationObserver [INFO
>> ][rec-job/rec-job] Observing JobManager deployment. Previous status: MISSING
>> 2023-09-11 06:02:07,488 o.a.f.k.o.o.d.ApplicationObserver [INFO
>> ][rec-job/rec-job] JobManager is being deployed
>> 2023-09-11 06:02:07,563 o.a.f.k.o.l.AuditUtils [INFO
>> ][rec-job/rec-job] >>> Status | Info| SUSPENDED   | The resource
>> (job) has been suspended
>> 2023-09-11 06:02:07,576 o.a.f.k.o.l.AuditUtils [INFO
>> ][rec-job/rec-job] >>> Event  | Info| SPECCHANGED | UPGRADE
>> change(s) detected (Diff: FlinkDeploymentSpec[job.state : suspended ->
>> running]), starting reconciliation.
>> 2023-09-11 06:02:07,649 o.a.f.k.o.l.AuditUtils [INFO
>> ][rec-job/rec-job] >>> Status | Info| UPGRADING   | The resource is
>> being upgraded
>> 2023-09-11 06:02:07,649 o.a.f.k.o.r.d.Appl

Re: Flink kubernets operator delete HA metadata after resuming from suspend

2023-09-11 Thread Gyula Fóra
Hi!

I could not reproduce your issue, last-state suspend/restore seems to work
as before.
However these 2 logs seem very suspicious:

2023-09-11 06:02:07,481 o.a.f.k.o.o.d.ApplicationObserver [INFO
][rec-job/rec-job] Observing JobManager deployment. Previous status: MISSING
2023-09-11 06:02:07,488 o.a.f.k.o.o.d.ApplicationObserver [INFO
][rec-job/rec-job] JobManager is being deployed

Looks like after suspending (and deleting the JobManager Deployment)
somebody restarted the JobManager manually. Is that possible?

Cheers,
Gyula

On Mon, Sep 11, 2023 at 2:59 PM Evgeniy Lyutikov 
wrote:

> Hi all!
> After updating the operator to version 1.6.0, suspended and resuming
> flink jobs stopped working.
> When job resumes, the high availability metadata is removed.
>
> Suspend job:
> 2023-09-11 06:01:41,548 o.a.f.k.o.l.AuditUtils [INFO
> ][rec-job/rec-job] >>> Event  | Info| SPECCHANGED | UPGRADE
> change(s) detected (Diff: FlinkDeploymentSpec[job.state : running ->
> suspended]), starting reconciliation.
> 2023-09-11 06:01:41,548 o.a.f.k.o.r.d.AbstractJobReconciler [INFO
> ][rec-job/rec-job] Job is in running state, ready for upgrade with
> LAST_STATE
> 2023-09-11 06:01:41,558 o.a.f.k.o.l.AuditUtils [INFO
> ][rec-job/rec-job] >>> Event  | Info| SUSPENDED   | Suspending
> existing deployment.
> 2023-09-11 06:01:41,558 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Deleting cluster with Foreground propagation
> 2023-09-11 06:01:41,558 o.a.f.k.o.s.NativeFlinkService [INFO
> ][rec-job/rec-job] Deleting JobManager deployment while preserving HA
> metadata.
> 2023-09-11 06:01:41,598 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Waiting for cluster shutdown...
> 2023-09-11 06:01:45,667 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Waiting for cluster shutdown... (5s)
> 2023-09-11 06:01:50,730 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Waiting for cluster shutdown... (10s)
> 2023-09-11 06:01:55,837 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Waiting for cluster shutdown... (15s)
> 2023-09-11 06:02:00,885 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Waiting for cluster shutdown... (20s)
> 2023-09-11 06:02:01,895 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Cluster shutdown completed.
> 2023-09-11 06:02:01,973 o.a.f.k.o.l.AuditUtils [INFO
> ][rec-job/rec-job] >>> Status | Info| SUSPENDED   | The resource
> (job) has been suspended
> 2023-09-11 06:02:01,981 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler
> [INFO ][rec-job/rec-job] Resource fully reconciled, nothing to do...
>
> Resume:
> 2023-09-11 06:02:07,481 o.a.f.k.o.o.d.ApplicationObserver [INFO
> ][rec-job/rec-job] Observing JobManager deployment. Previous status: MISSING
> 2023-09-11 06:02:07,488 o.a.f.k.o.o.d.ApplicationObserver [INFO
> ][rec-job/rec-job] JobManager is being deployed
> 2023-09-11 06:02:07,563 o.a.f.k.o.l.AuditUtils [INFO
> ][rec-job/rec-job] >>> Status | Info| SUSPENDED   | The resource
> (job) has been suspended
> 2023-09-11 06:02:07,576 o.a.f.k.o.l.AuditUtils [INFO
> ][rec-job/rec-job] >>> Event  | Info| SPECCHANGED | UPGRADE
> change(s) detected (Diff: FlinkDeploymentSpec[job.state : suspended ->
> running]), starting reconciliation.
> 2023-09-11 06:02:07,649 o.a.f.k.o.l.AuditUtils [INFO
> ][rec-job/rec-job] >>> Status | Info| UPGRADING   | The resource is
> being upgraded
> 2023-09-11 06:02:07,649 o.a.f.k.o.r.d.ApplicationReconciler [INFO
> ][rec-job/rec-job] Deleting deployment with terminated application before
> new deployment
> 2023-09-11 06:02:07,649 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Deleting cluster with Foreground propagation
> 2023-09-11 06:02:07,649 o.a.f.k.o.s.NativeFlinkService [INFO
> ][rec-job/rec-job] Deleting JobManager deployment and HA metadata.
> 2023-09-11 06:02:07,691 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Waiting for cluster shutdown...
> 2023-09-11 06:02:07,763 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Cluster shutdown completed.
> 2023-09-11 06:02:07,763 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Deleting Kubernetes HA metadata
> 2023-09-11 06:02:07,820 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Waiting for cluster shutdown...
> 2023-09-11 06:02:07,831 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Cluster shutdown completed.
> 2023-09-11 06:02:07,975 o.a.f.k.o.l.AuditUtils [INFO
> ][rec-job/rec-job] >>> Status | Info| UPGRADING   | The resource is
> being upgraded
> 2023-09-11 06:02:07,987 o.a.f.k.o.l.AuditUtils [INFO
> ][rec-job/rec-job] >>> Event  | Info| SUBMIT  | Starting
> deployment
> 2023-09-11 06:02:07,987 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][rec-job/rec-job] Deploying application cluster requiring last-state from
> HA metadata
> 2023-09-11 06:02:07,999 

Re: observedGeneration field in FlinkDeployment

2023-09-09 Thread Gyula Fóra
Actually, I just realized, the last fully reconciled spec generation should
be written into a metadata JSON inside the lastReconciledSpec. So this is
already available.

For example:
lastReconciledSpec: '{"spec":{...},"resource_metadata":{"apiVersion":"
flink.apache.org/v1beta1
","metadata":{"generation":2},"firstDeployment":true}}'

It's a bit hidden but it should do the trick :)
We could discuss moving this to a more standardized status field if you
think that's worth the effort.

Gyula

On Sat, Sep 9, 2023 at 7:04 AM Gyula Fóra  wrote:

> Hi!
> The lastReconciledSpec field serves similar purpose . We also use the
> generation in parts of the logic but not generically as observed generation
> .
>
> Could you give an example where this would be useful in addition to what
> we already have?
>
> Thanks
> Gyula
>
> On Sat, 9 Sep 2023 at 02:17, Tony Chen  wrote:
>
>> Hi Flink Community,
>>
>> I noticed that there is no status.observedGeneration field in the
>> FlinkDeployment spec. The closest field to this is
>> status.reconciliationStatus.lastReconciledSpec. Are there plans to add the
>> observedGeneration field in the spec? This field will be helpful in
>> detecting changes in the FlinkDeployment spec.
>>
>> Thanks,
>> Tony
>>
>> --
>>
>> <http://www.robinhood.com/>
>>
>> Tony Chen
>>
>> Software Engineer
>>
>> Menlo Park, CA
>>
>> Don't copy, share, or use this email without permission. If you received
>> it by accident, please let us know and then delete it right away.
>>
>


Re: observedGeneration field in FlinkDeployment

2023-09-08 Thread Gyula Fóra
Hi!
The lastReconciledSpec field serves similar purpose . We also use the
generation in parts of the logic but not generically as observed generation
.

Could you give an example where this would be useful in addition to what we
already have?

Thanks
Gyula

On Sat, 9 Sep 2023 at 02:17, Tony Chen  wrote:

> Hi Flink Community,
>
> I noticed that there is no status.observedGeneration field in the
> FlinkDeployment spec. The closest field to this is
> status.reconciliationStatus.lastReconciledSpec. Are there plans to add the
> observedGeneration field in the spec? This field will be helpful in
> detecting changes in the FlinkDeployment spec.
>
> Thanks,
> Tony
>
> --
>
> 
>
> Tony Chen
>
> Software Engineer
>
> Menlo Park, CA
>
> Don't copy, share, or use this email without permission. If you received
> it by accident, please let us know and then delete it right away.
>


Re: [Question] How to scale application based on 'reactive' mode

2023-09-07 Thread Gyula Fóra
Jung,
I don't want to sound unhelpful, but I think the best thing for you to do
is simply to try these different models in your local env.
It should be very easy to get started with the Kubernetes Operator on
Kind/Minikube (
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/
)

It's very difficult to answer these questions fully here. Try the different
modes, observe what happens, read the docs and you will get all the answers.

Gyula

On Thu, Sep 7, 2023 at 10:11 AM Dennis Jung  wrote:

> Hello Chen,
> Thanks for your reply! I have further questions as following...
>
> 1. In case of non-reactive mode in Flink 1.18, if the autoscaler adjusts
> parallelism, what is the difference by using 'reactive' mode?
> 2. In case if I use Flink 1.15~1.17 without autoscaler, is the difference
> of using 'reactive' mode is, changing parallelism dynamically by change of
> TM number (manually, or by custom scaler)?
>
> Regards,
> Jung
>
>
> 2023년 9월 5일 (화) 오후 3:59, Chen Zhanghao 님이 작성:
>
>> Hi Dennis,
>>
>>
>>1. In Flink 1.18 + non-reactive mode, autoscaler adjusts the job's
>>parallelism and the job will request for extra TMs if the current ones
>>cannot satisfy its need and redundant TMs will be released automatically
>>later for being idle. In other words, parallelism changes cause TM number
>>change.
>>2. The core metrics used is busy time (the amount of time spent on
>>task processing per 1 second = 1 s - backpressured time - idle time), it 
>> is
>>considered to be superior as it counts I/O cost etc into account as well.
>>Also, the metrics is on a per-task granularity and allows us to identify
>>bottleneck tasks.
>>3. Autoscaler feature currently only works for K8s opeartor + native
>>K8s mode.
>>
>>
>> Best,
>> Zhanghao Chen
>> --
>> *发件人:* Dennis Jung 
>> *发送时间:* 2023年9月2日 12:58
>> *收件人:* Gyula Fóra 
>> *抄送:* user@flink.apache.org 
>> *主题:* Re: [Question] How to scale application based on 'reactive' mode
>>
>> Hello,
>> Thanks for your notice.
>>
>> 1. In "Flink 1.18 + non-reactive", is parallelism being changed by the
>> number of TM?
>> 2. In the document(
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.6/docs/custom-resource/autoscaler/),
>> it said "we are not using any container memory / CPU utilization metrics
>> directly here". Which metrics are these using internally?
>> 3. I'm using standalone k8s(
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/)
>> for deployment. Is autoscaler features only available by using the "flink
>> k8s operator"(sorry I don't understand this clearly yet...)?
>>
>> Regards
>>
>>
>> 2023년 9월 1일 (금) 오후 10:20, Gyula Fóra 님이 작성:
>>
>> Pretty much, except that with Flink 1.18 autoscaler can scale the job in
>> place without restarting the JM (even without reactive mode )
>>
>> So actually best option is autoscaler with Flink 1.18 native mode (no
>> reactive)
>>
>> Gyula
>>
>> On Fri, 1 Sep 2023 at 13:54, Dennis Jung  wrote:
>>
>> Thanks for feedback.
>> Could you check whether I understand correctly?
>>
>> *Only using 'reactive' mode:*
>> By manually adding TaskManager(TM) (such as using './bin/taskmanager.sh
>> start'), parallelism will be increased. For example, when job parallelism
>> is 1 and TM is 1, and if adding 1 new TM, JobManager will be restarted and
>> parallelism will be 2.
>> But the number of TM is not being controlled automatically.
>>
>> *Autoscaler + non-reactive:*
>> It can flexibilly control the number of TM by several metrics(CPU usage,
>> throughput, ...), and JobManager will be restarted when scaling. But job
>> parallelism is the same after the number of TM has been changed.
>>
>> *Autoscaler + 'reactive' mode*:
>> It can control numbers of TM by metric, and increase/decrease job
>> parallelism by changing TM.
>>
>> Regards,
>> Jung
>>
>> 2023년 9월 1일 (금) 오후 8:16, Gyula Fóra 님이 작성:
>>
>> I would look at reactive scaling as a way to increase / decrease
>> parallelism.
>>
>> It’s not a way to automatically decide when to actually do it as you need
>> to create new TMs .
>>
>> The autoscaler could use reactive mode to change the parallelism but you
>> need the autoscaler itself to decide when new resources should be added
>>

Re: [Question] How to scale application based on 'reactive' mode

2023-09-01 Thread Gyula Fóra
Pretty much, except that with Flink 1.18 autoscaler can scale the job in
place without restarting the JM (even without reactive mode )

So actually best option is autoscaler with Flink 1.18 native mode (no
reactive)

Gyula

On Fri, 1 Sep 2023 at 13:54, Dennis Jung  wrote:

> Thanks for feedback.
> Could you check whether I understand correctly?
>
> *Only using 'reactive' mode:*
> By manually adding TaskManager(TM) (such as using './bin/taskmanager.sh
> start'), parallelism will be increased. For example, when job parallelism
> is 1 and TM is 1, and if adding 1 new TM, JobManager will be restarted and
> parallelism will be 2.
> But the number of TM is not being controlled automatically.
>
> *Autoscaler + non-reactive:*
> It can flexibilly control the number of TM by several metrics(CPU usage,
> throughput, ...), and JobManager will be restarted when scaling. But job
> parallelism is the same after the number of TM has been changed.
>
> *Autoscaler + 'reactive' mode*:
> It can control numbers of TM by metric, and increase/decrease job
> parallelism by changing TM.
>
> Regards,
> Jung
>
> 2023년 9월 1일 (금) 오후 8:16, Gyula Fóra 님이 작성:
>
>> I would look at reactive scaling as a way to increase / decrease
>> parallelism.
>>
>> It’s not a way to automatically decide when to actually do it as you need
>> to create new TMs .
>>
>> The autoscaler could use reactive mode to change the parallelism but you
>> need the autoscaler itself to decide when new resources should be added
>>
>> On Fri, 1 Sep 2023 at 13:09, Dennis Jung  wrote:
>>
>>> For now, the thing I've found about 'reactive' mode is that it
>>> automatically adjusts 'job parallelism' when TaskManager is
>>> increased/decreased.
>>>
>>>
>>> https://www.slideshare.net/FlinkForward/autoscaling-flink-with-reactive-mode
>>>
>>> Is there some other feature that only 'reactive' mode offers for scaling?
>>>
>>> Thanks.
>>> Regards.
>>>
>>>
>>>
>>> 2023년 9월 1일 (금) 오후 4:56, Dennis Jung 님이 작성:
>>>
>>>> Hello,
>>>> Thank you for your response. I have few more questions in following:
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/
>>>>
>>>> *Reactive Mode configures a job so that it always uses all resources
>>>> available in the cluster. Adding a TaskManager will scale up your job,
>>>> removing resources will scale it down. Flink will manage the parallelism of
>>>> the job, always setting it to the highest possible values.*
>>>> => Does this mean when I add/remove TaskManager in 'non-reactive' mode,
>>>> resource(CPU/Memory/Etc.) of the cluster is not being changed?
>>>>
>>>> *Reactive Mode restarts a job on a rescaling event, restoring it from
>>>> the latest completed checkpoint. This means that there is no overhead of
>>>> creating a savepoint (which is needed for manually rescaling a job). Also,
>>>> the amount of data that is reprocessed after rescaling depends on the
>>>> checkpointing interval, and the restore time depends on the state size.*
>>>> => As I know 'rescaling' also works in non-reactive mode, with
>>>> restoring checkpoint. What is the difference of using 'reactive' here?
>>>>
>>>> *The Reactive Mode allows Flink users to implement a powerful
>>>> autoscaling mechanism, by having an external service monitor certain
>>>> metrics, such as consumer lag, aggregate CPU utilization, throughput or
>>>> latency. As soon as these metrics are above or below a certain threshold,
>>>> additional TaskManagers can be added or removed from the Flink cluster.*
>>>> => Why is this only possible in 'reactive' mode? Seems this is more
>>>> related to 'autoscaler'. Are there some specific features/API which can
>>>> control TaskManager/Parallelism only in 'reactive' mode?
>>>>
>>>> Thank you.
>>>>
>>>> 2023년 9월 1일 (금) 오후 3:30, Gyula Fóra 님이 작성:
>>>>
>>>>> The reactive mode reacts to available resources. The autoscaler reacts
>>>>> to changing load and processing capacity and adjusts resources.
>>>>>
>>>>> Completely different concepts and applicability.
>>>>> Most people want the autoscaler , but this is a recent feature and is
>>>>> specific to the k8s operator at the moment.
>>>>>
>>>>> Gyula
>>>>>
>>>>> On Fr

Re: [Question] How to scale application based on 'reactive' mode

2023-09-01 Thread Gyula Fóra
I would look at reactive scaling as a way to increase / decrease
parallelism.

It’s not a way to automatically decide when to actually do it as you need
to create new TMs .

The autoscaler could use reactive mode to change the parallelism but you
need the autoscaler itself to decide when new resources should be added

On Fri, 1 Sep 2023 at 13:09, Dennis Jung  wrote:

> For now, the thing I've found about 'reactive' mode is that it
> automatically adjusts 'job parallelism' when TaskManager is
> increased/decreased.
>
>
> https://www.slideshare.net/FlinkForward/autoscaling-flink-with-reactive-mode
>
> Is there some other feature that only 'reactive' mode offers for scaling?
>
> Thanks.
> Regards.
>
>
>
> 2023년 9월 1일 (금) 오후 4:56, Dennis Jung 님이 작성:
>
>> Hello,
>> Thank you for your response. I have few more questions in following:
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/
>>
>> *Reactive Mode configures a job so that it always uses all resources
>> available in the cluster. Adding a TaskManager will scale up your job,
>> removing resources will scale it down. Flink will manage the parallelism of
>> the job, always setting it to the highest possible values.*
>> => Does this mean when I add/remove TaskManager in 'non-reactive' mode,
>> resource(CPU/Memory/Etc.) of the cluster is not being changed?
>>
>> *Reactive Mode restarts a job on a rescaling event, restoring it from the
>> latest completed checkpoint. This means that there is no overhead of
>> creating a savepoint (which is needed for manually rescaling a job). Also,
>> the amount of data that is reprocessed after rescaling depends on the
>> checkpointing interval, and the restore time depends on the state size.*
>> => As I know 'rescaling' also works in non-reactive mode, with restoring
>> checkpoint. What is the difference of using 'reactive' here?
>>
>> *The Reactive Mode allows Flink users to implement a powerful autoscaling
>> mechanism, by having an external service monitor certain metrics, such as
>> consumer lag, aggregate CPU utilization, throughput or latency. As soon as
>> these metrics are above or below a certain threshold, additional
>> TaskManagers can be added or removed from the Flink cluster.*
>> => Why is this only possible in 'reactive' mode? Seems this is more
>> related to 'autoscaler'. Are there some specific features/API which can
>> control TaskManager/Parallelism only in 'reactive' mode?
>>
>> Thank you.
>>
>> 2023년 9월 1일 (금) 오후 3:30, Gyula Fóra 님이 작성:
>>
>>> The reactive mode reacts to available resources. The autoscaler reacts
>>> to changing load and processing capacity and adjusts resources.
>>>
>>> Completely different concepts and applicability.
>>> Most people want the autoscaler , but this is a recent feature and is
>>> specific to the k8s operator at the moment.
>>>
>>> Gyula
>>>
>>> On Fri, 1 Sep 2023 at 04:50, Dennis Jung  wrote:
>>>
>>>> Hello,
>>>> Thanks for your notice.
>>>>
>>>> Than what is the purpose of using 'reactive', if this doesn't do
>>>> anything itself?
>>>> What is the difference if I use auto-scaler without 'reactive' mode?
>>>>
>>>> Regards,
>>>> Jung
>>>>
>>>>
>>>>
>>>> 2023년 8월 18일 (금) 오후 7:51, Gyula Fóra 님이 작성:
>>>>
>>>>> Hi!
>>>>>
>>>>> I think what you need is probably not the reactive mode but a proper
>>>>> autoscaler. The reactive mode as you say doesn't do anything in itself, 
>>>>> you
>>>>> need to build a lot of logic around it.
>>>>>
>>>>> Check this instead:
>>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/
>>>>>
>>>>> The Kubernetes Operator has a built in autoscaler that can scale jobs
>>>>> based on kafka data rate / processing throughput. It also doesn't rely on
>>>>> the reactive mode.
>>>>>
>>>>> Cheers,
>>>>> Gyula
>>>>>
>>>>> On Fri, Aug 18, 2023 at 12:43 PM Dennis Jung 
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>> Sorry for frequent questions. This is a question about 'reactive'
>>>>>> mode.
>>>>>>
>>>>>> 1. As far as I understand, though I've setup `scheduler-mode:
>>>>>> reactive`, it will not change parallelism automatically by itself, by CPU
>>>>>> usage or Kafka consumer rate. It needs additional resource monitor 
>>>>>> features
>>>>>> (such as Horizontal Pod Autoscaler, or else). Is this correct?
>>>>>> 2. Is it possible to create a custom resource monitor provider
>>>>>> application? For example, if I want to increase/decrease parallelism by
>>>>>> Kafka consumer rate, do I need to send specific API from outside, to 
>>>>>> order
>>>>>> rescaling?
>>>>>> 3. If 2 is correct, what is the difference when using 'reactive'
>>>>>> mode? Because as far as I think, calling a specific API will rescale 
>>>>>> either
>>>>>> using 'reactive' mode or not...(or is the API just working based on this
>>>>>> mode)?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> Regards
>>>>>>
>>>>>>


Re: [Question] How to scale application based on 'reactive' mode

2023-09-01 Thread Gyula Fóra
The reactive mode reacts to available resources. The autoscaler reacts to
changing load and processing capacity and adjusts resources.

Completely different concepts and applicability.
Most people want the autoscaler , but this is a recent feature and is
specific to the k8s operator at the moment.

Gyula

On Fri, 1 Sep 2023 at 04:50, Dennis Jung  wrote:

> Hello,
> Thanks for your notice.
>
> Than what is the purpose of using 'reactive', if this doesn't do anything
> itself?
> What is the difference if I use auto-scaler without 'reactive' mode?
>
> Regards,
> Jung
>
>
>
> 2023년 8월 18일 (금) 오후 7:51, Gyula Fóra 님이 작성:
>
>> Hi!
>>
>> I think what you need is probably not the reactive mode but a proper
>> autoscaler. The reactive mode as you say doesn't do anything in itself, you
>> need to build a lot of logic around it.
>>
>> Check this instead:
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/
>>
>> The Kubernetes Operator has a built in autoscaler that can scale jobs
>> based on kafka data rate / processing throughput. It also doesn't rely on
>> the reactive mode.
>>
>> Cheers,
>> Gyula
>>
>> On Fri, Aug 18, 2023 at 12:43 PM Dennis Jung  wrote:
>>
>>> Hello,
>>> Sorry for frequent questions. This is a question about 'reactive' mode.
>>>
>>> 1. As far as I understand, though I've setup `scheduler-mode: reactive`,
>>> it will not change parallelism automatically by itself, by CPU usage or
>>> Kafka consumer rate. It needs additional resource monitor features (such as
>>> Horizontal Pod Autoscaler, or else). Is this correct?
>>> 2. Is it possible to create a custom resource monitor provider
>>> application? For example, if I want to increase/decrease parallelism by
>>> Kafka consumer rate, do I need to send specific API from outside, to order
>>> rescaling?
>>> 3. If 2 is correct, what is the difference when using 'reactive' mode?
>>> Because as far as I think, calling a specific API will rescale either using
>>> 'reactive' mode or not...(or is the API just working based on this mode)?
>>>
>>> Thanks.
>>>
>>> Regards
>>>
>>>


Re: flink k8s operator - problem with patching seession cluster

2023-08-31 Thread Gyula Fóra
There is no effect of the replicas setting in native mode. Native session
clusters are "elastic", the number of task managers are determined on the
fly based on the job requirements.

Gyula

On Thu, Aug 31, 2023 at 11:19 AM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Thank you for the response.
>
> Yes currently in my PoC I'm using standalone integration.
> Does ` spec.taskManager.replicas` has any effect when using native mode?
>
> The reason I'm asking is that I need to know what is the "cacpacity" of
> particular session cluster before I will submit the job into it.
> And the way how I was doing this was this:
>
> try (KubernetesClient kubernetesClient = new
> KubernetesClientBuilder().build()) {
> MixedOperation KubernetesResourceList,
> io.fabric8.kubernetes.client.dsl.Resource>
> resources =
> kubernetesClient.resources(FlinkDeployment.class);
>
> List items =
> resources.inNamespace("default").list().getItems();
> for (FlinkDeployment item : items) {
> System.out.println("Flink Deployments: " + item);
> System.out.println("Number of TM replicas: " +
> item.getSpec().getTaskManager().getReplicas());
> }
> }
>
>
> Thanks,
> Krzysztof
>
> czw., 31 sie 2023 o 10:44 Gyula Fóra  napisał(a):
>
>> I guess your question is in the context of the standalone integration
>> because native session deployments automatically add TMs on the fly as more
>> are necessary.
>>
>> For standalone mode you should be able to configure
>> `spec.taskManager.replicas` and if I understand correctly that will not
>> shut down the running jobs.
>> If you have problems please share your FlinkDeployment yaml and the
>> operator logs in a JIRA ticket.
>>
>> In any case the native mode is probably better fit for your use-case.
>>
>> Gyula
>>
>> On Thu, Aug 31, 2023 at 2:42 AM Krzysztof Chmielewski <
>> krzysiek.chmielew...@gmail.com> wrote:
>>
>>> Just want to broth this up in case it was missed in the other
>>> messages/queries :)
>>>
>>> TL:DR
>>> How to add TM to Flink Session cluster via Java K8s client if Session
>>> Cluster has running jobs?
>>>
>>> Thanks,
>>> Krzysztof
>>>
>>> pt., 25 sie 2023 o 23:48 Krzysztof Chmielewski <
>>> krzysiek.chmielew...@gmail.com> napisał(a):
>>>
>>>> Hi community,
>>>> I have a use case where I would like to add an extra TM) to a running
>>>> Flink session cluster that has Flink jobs deployed. Session cluster
>>>> creation, job submission and cluster patching is done using flink k8s
>>>> operator Java API. The Details of this are presented here [1]
>>>>
>>>> I would like to ask, what is a recommended path to add a TM to existing
>>>> Session Cluster that currently runs number of Flink jobs using Java API.
>>>> For simplicity lets assume that I dont want to resume jobs from a
>>>> savepoint, just redeploy them.
>>>>
>>>> When executing steps from [1] I'm facing an issue where Session jobs
>>>> are not redeployed on patched Session cluster however kubectl shows that
>>>> there is FlinkSessionJob subbmited to the k8s.
>>>>
>>>> Additionally when I'm trying to delete FlinkSessionJob from kubectl,
>>>> Flink k8s operator throws an exception described in [1]. In fact the state
>>>> of that Flink deployment requires few steps to clean it up after that
>>>> patch.
>>>>
>>>>
>>>> [1]
>>>> https://github.com/kristoffSC/FlinkK8sControllerPoC/blob/ExceptionInOperator-clusterRestart/doc/K8sException_1.MD
>>>>
>>>


Re: flink k8s operator - problem with patching seession cluster

2023-08-31 Thread Gyula Fóra
I guess your question is in the context of the standalone integration
because native session deployments automatically add TMs on the fly as more
are necessary.

For standalone mode you should be able to configure
`spec.taskManager.replicas` and if I understand correctly that will not
shut down the running jobs.
If you have problems please share your FlinkDeployment yaml and the
operator logs in a JIRA ticket.

In any case the native mode is probably better fit for your use-case.

Gyula

On Thu, Aug 31, 2023 at 2:42 AM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Just want to broth this up in case it was missed in the other
> messages/queries :)
>
> TL:DR
> How to add TM to Flink Session cluster via Java K8s client if Session
> Cluster has running jobs?
>
> Thanks,
> Krzysztof
>
> pt., 25 sie 2023 o 23:48 Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> napisał(a):
>
>> Hi community,
>> I have a use case where I would like to add an extra TM) to a running
>> Flink session cluster that has Flink jobs deployed. Session cluster
>> creation, job submission and cluster patching is done using flink k8s
>> operator Java API. The Details of this are presented here [1]
>>
>> I would like to ask, what is a recommended path to add a TM to existing
>> Session Cluster that currently runs number of Flink jobs using Java API.
>> For simplicity lets assume that I dont want to resume jobs from a
>> savepoint, just redeploy them.
>>
>> When executing steps from [1] I'm facing an issue where Session jobs are
>> not redeployed on patched Session cluster however kubectl shows that there
>> is FlinkSessionJob subbmited to the k8s.
>>
>> Additionally when I'm trying to delete FlinkSessionJob from kubectl,
>> Flink k8s operator throws an exception described in [1]. In fact the state
>> of that Flink deployment requires few steps to clean it up after that
>> patch.
>>
>>
>> [1]
>> https://github.com/kristoffSC/FlinkK8sControllerPoC/blob/ExceptionInOperator-clusterRestart/doc/K8sException_1.MD
>>
>


Re: Blue green deployment with Flink Apache Operator

2023-08-31 Thread Gyula Fóra
The main concern as we discussed in previous mailing list threads before is
the general applicability of such solution:

 - Many production jobs cannot really afford running in parallel (starting
the second job while the first one is running), due to data
consistency/duplications reasons
 - Exactly once sinks do not really support this

So I think we should start with this maybe as an independent effort /
external library and if we see that it works we could discuss it in a FLIP.

What do you think?
Gyula

On Thu, Aug 31, 2023 at 9:23 AM Nicolas Fraison <
nicolas.frai...@datadoghq.com> wrote:

> Thanks Gyula for your feedback.
>
> We were also thinking of relying on such a solution, creating a dedicated
> crd/operator to manage this BlueGreenFlinkDeployment.
> Good to hear that it could be incorporated later in the operator.
>
> Will let you know once we will have something to share with you.
>
> Nicolas
>
> On Wed, Aug 30, 2023 at 4:28 PM Gyula Fóra  wrote:
>
>> Hey!
>>
>> I don't know if anyone has implemented this or not but one way to
>> approach this problem (and this may not be the right way, just an idea :) )
>> is to add a new Custom Resource type that sits on top of the
>> FlinkDeployment / FlinkSessionJob resources and add a small controller for
>> this.
>>
>> This new custom resource, BlueGreenDeployment, would be somewhat similar
>> to how a Replicaset vs Pod works in Kubernetes. It would create a new
>> FlinkDeployment and would delete the old one once the new reached a healthy
>> running state.
>>
>> Adding a new CR allows us to not overcomplicate the existing
>> resource/controller loop but simply leverage it. If you prototype something
>> along these lines, please feel free to share and then we can discuss if we
>> want to incorporate something like this in the operator repo in the future
>> :)
>>
>> Cheers,
>> Gyula
>>
>> On Wed, Aug 30, 2023 at 1:21 PM Nicolas Fraison via user <
>> user@flink.apache.org> wrote:
>>
>>> Hi,
>>>
>>> From https://issues.apache.org/jira/browse/FLINK-29199 it seems that
>>> support for blue green deployment will not be supported or will not happen
>>> soon.
>>>
>>> I'd like to know if some of you have built a custom mechanism on top of
>>> this operator to support the blue green deployment and if you would have
>>> any advice on implementing this?
>>>
>>> --
>>>
>>> Nicolas Fraison (he/him)
>>>
>>


Re: Enable RocksDB in FlinkDeployment with flink-kubernetes-operator

2023-08-30 Thread Gyula Fóra
I agree with Yaroslav, generally speaking PVs are not necessary or even
recommended for RocksDB because the state doesn't need to be shared,
recovered later anyways.
It's usually faster and cheaper to go with instance level SSDs.

Gyula

On Wed, Aug 30, 2023 at 8:37 PM Yaroslav Tkachenko 
wrote:

> It depends on your requirements. Personally, I don't use PVs and, instead,
> mount a volume from a host with a fast instance-level SSD.
>
> On Wed, Aug 30, 2023 at 11:26 AM Tony Chen 
> wrote:
>
>> We used to have a Persistent Volume (PV), attached to the pod, for
>> storing the RocksDB data while using the GoogleCloudPlatform operator. For
>> the Apache flink-kubernetes-operator, do the pods need a PV attached to it
>> to use RocksDB? If not, do you have recommendations on memory configuration
>> for these pods?
>>
>> I will also need to go through the documentation more on memory
>> configuration:
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/state_backends/
>>
>> On Wed, Aug 30, 2023 at 2:17 PM Gyula Fóra  wrote:
>>
>>> Hi!
>>>
>>> Rocksdb is supported and every other state backend as well.
>>>
>>> You can simply set this in you config like before :)
>>>
>>> Cheers
>>> Gyula
>>>
>>> On Wed, 30 Aug 2023 at 19:22, Tony Chen 
>>> wrote:
>>>
>>>> Hi Flink Community,
>>>>
>>>> Does the flink-kubernetes-operator support RocksDB as the state backend
>>>> for FlinkDeployment?
>>>>
>>>> We have some Flink applications that have large states, and we were
>>>> able to deal with these large states in the past with RocksDB. If there is
>>>> no support for RocksDB, are there any recommendations on how we can
>>>> decrease the size of these states?
>>>>
>>>> Thanks,
>>>> Tony
>>>>
>>>>
>>>> --
>>>>
>>>> <http://www.robinhood.com/>
>>>>
>>>> Tony Chen
>>>>
>>>> Software Engineer
>>>>
>>>> Menlo Park, CA
>>>>
>>>> Don't copy, share, or use this email without permission. If you
>>>> received it by accident, please let us know and then delete it right away.
>>>>
>>>
>>
>> --
>>
>> <http://www.robinhood.com/>
>>
>> Tony Chen
>>
>> Software Engineer
>>
>> Menlo Park, CA
>>
>> Don't copy, share, or use this email without permission. If you received
>> it by accident, please let us know and then delete it right away.
>>
>


Re: Enable RocksDB in FlinkDeployment with flink-kubernetes-operator

2023-08-30 Thread Gyula Fóra
Hi!

Rocksdb is supported and every other state backend as well.

You can simply set this in you config like before :)

Cheers
Gyula

On Wed, 30 Aug 2023 at 19:22, Tony Chen  wrote:

> Hi Flink Community,
>
> Does the flink-kubernetes-operator support RocksDB as the state backend
> for FlinkDeployment?
>
> We have some Flink applications that have large states, and we were able
> to deal with these large states in the past with RocksDB. If there is no
> support for RocksDB, are there any recommendations on how we can decrease
> the size of these states?
>
> Thanks,
> Tony
>
>
> --
>
> 
>
> Tony Chen
>
> Software Engineer
>
> Menlo Park, CA
>
> Don't copy, share, or use this email without permission. If you received
> it by accident, please let us know and then delete it right away.
>


Re: Blue green deployment with Flink Apache Operator

2023-08-30 Thread Gyula Fóra
Hey!

I don't know if anyone has implemented this or not but one way to approach
this problem (and this may not be the right way, just an idea :) ) is to
add a new Custom Resource type that sits on top of the FlinkDeployment /
FlinkSessionJob resources and add a small controller for this.

This new custom resource, BlueGreenDeployment, would be somewhat similar to
how a Replicaset vs Pod works in Kubernetes. It would create a new
FlinkDeployment and would delete the old one once the new reached a healthy
running state.

Adding a new CR allows us to not overcomplicate the existing
resource/controller loop but simply leverage it. If you prototype something
along these lines, please feel free to share and then we can discuss if we
want to incorporate something like this in the operator repo in the future
:)

Cheers,
Gyula

On Wed, Aug 30, 2023 at 1:21 PM Nicolas Fraison via user <
user@flink.apache.org> wrote:

> Hi,
>
> From https://issues.apache.org/jira/browse/FLINK-29199 it seems that
> support for blue green deployment will not be supported or will not happen
> soon.
>
> I'd like to know if some of you have built a custom mechanism on top of
> this operator to support the blue green deployment and if you would have
> any advice on implementing this?
>
> --
>
> Nicolas Fraison (he/him)
>


Re: [Question] How to scale application based on 'reactive' mode

2023-08-18 Thread Gyula Fóra
Hi!

I think what you need is probably not the reactive mode but a proper
autoscaler. The reactive mode as you say doesn't do anything in itself, you
need to build a lot of logic around it.

Check this instead:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/

The Kubernetes Operator has a built in autoscaler that can scale jobs based
on kafka data rate / processing throughput. It also doesn't rely on the
reactive mode.

Cheers,
Gyula

On Fri, Aug 18, 2023 at 12:43 PM Dennis Jung  wrote:

> Hello,
> Sorry for frequent questions. This is a question about 'reactive' mode.
>
> 1. As far as I understand, though I've setup `scheduler-mode: reactive`,
> it will not change parallelism automatically by itself, by CPU usage or
> Kafka consumer rate. It needs additional resource monitor features (such as
> Horizontal Pod Autoscaler, or else). Is this correct?
> 2. Is it possible to create a custom resource monitor provider
> application? For example, if I want to increase/decrease parallelism by
> Kafka consumer rate, do I need to send specific API from outside, to order
> rescaling?
> 3. If 2 is correct, what is the difference when using 'reactive' mode?
> Because as far as I think, calling a specific API will rescale either using
> 'reactive' mode or not...(or is the API just working based on this mode)?
>
> Thanks.
>
> Regards
>
>


[ANNOUNCE] Apache Flink Kubernetes Operator 1.6.0 released

2023-08-15 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.6.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Release highlights:
 - Improved rollback mechanism
 - Experimental in-place scaling support
 - General operational hardening

Release blogpost:
https://flink.apache.org/2023/08/15/apache-flink-kubernetes-operator-1.6.0-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator applications can be
found at: https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/projects/FLINK/versions/12353230

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


[ANNOUNCE] Apache Flink Kubernetes Operator 1.6.0 released

2023-08-15 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.6.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Release highlights:
 - Improved rollback mechanism
 - Experimental in-place scaling support
 - General operational hardening

Release blogpost:
https://flink.apache.org/2023/08/15/apache-flink-kubernetes-operator-1.6.0-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator applications can be
found at: https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/projects/FLINK/versions/12353230

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


Re: Flink Kubernetes Operator autoscaling GPU-based workload

2023-08-01 Thread Gyula Fóra
The autoscaler only works for FlinkDeployments in Native mode. You should
turn off the reactive scheduler mode as well because that's something
completely different.
After that you can check the autoscaler logs for more info.

Gyula

On Tue, Aug 1, 2023 at 10:33 AM Raihan Sunny via user 
wrote:

> Hi,
>
> I have a workload that depends on the GPU. I have only 1 GPU card. As per
> the documentation I have added the necessary configurations and can run the
> GPU workload in standalone REACTIVE mode with as many taskmanager instances
> as required.
>
> I have set the number of task slots to 1 so that a raise in parallelism
> causes a new pod to be created. I can scale up the job just fine in this
> mode, however when I add autoscaling configurations to the FlinkDeployment
> manifest, scaling up doesn't work. This is because with the autoscaling
> manifest, there seems to be resource requests and limits are being
> automatically set to the pods for the gpu. This is not the case with the
> standalone mode which is why I guess scaling up doesn't cause any issues.
>
> So, what can I do to get the autoscaler working? I'm using Flink version
> 1.17.1 with PyFlink and Flink Kubernetes Operator version 1.5.0.
>
>
> Regards,
> Sunny
>
> [image: SELISE]
>
> SELISE Group
> Zürich: The Circle 37, 8058 Zürich-Airport, Switzerland
> Munich: Tal 44, 80331 München, Germany
> Dubai: Building 3, 3rd Floor, Dubai Design District, Dubai, United Arab
> Emirates
> Dhaka: Midas Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
> Thimphu: Bhutan Innovation Tech Center, Babesa, P.O. Box 633, Thimphu,
> Bhutan
>
> Visit us: www.selisegroup.com
>
> *Important Note: This e-mail and any attachment are confidential and may
> contain trade secrets and may well also be legally privileged or otherwise
> protected from disclosure. If you have received it in error, you are on
> notice of its status. Please notify us immediately by reply e-mail and then
> delete this e-mail and any attachment from your system. If you are not the
> intended recipient please understand that you must not copy this e-mail or
> any attachment or disclose the contents to any other person. Thank you for
> your cooperation.*
>


Re: [EXTERNAL] Re: Query on flink-operator autoscale support

2023-08-01 Thread Gyula Fóra
The autoscaler scales jobs based on incoming data and processing
throughput. It's completely different from the reactive mod, if the
throughput/processing rate doesn't change it will not scale up even if you
have more resources available.

Also in native mode you cannot add pods to the cluster, Flink manages the
pod requests etc based on the resource needs.

I suggest you read the docs, try out the example that should help you get
started:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/

Gyula

On Sun, Jul 30, 2023 at 6:33 AM Xiao Ma  wrote:

> Hi Gyula,
>
> Could I ask if the autoscaler will support the native session cluster on
> 1.18 flink operator? Also, please correct me if I'm wrong. The autoscale
> will work similarly to the elastic scaling in reactive mode, right? For
> example, in the K8s cluster, if one pod is added to the session cluster,
> the job running on will be rebalanced to the new one, is it correct?
>
> Thank you very much.
> Xiao Ma
>
> On Wed, Feb 1, 2023 at 10:56 AM Gyula Fóra  wrote:
>
>> As I mentioned in the previous email, standalone mode is not on the
>> Autoscaler roadmap because the scheduling/resource model is different.
>> This applies to both standalone app and session clusters.
>>
>> Thanks
>> Gyula
>>
>> On Wed, Feb 1, 2023 at 4:48 PM Swathi Chandrashekar <
>> cswa...@microsoft.com> wrote:
>>
>>> Sure, thanks Gyula.
>>> Is there a roadmap to support standalone session clusters to scale based
>>> on the jobs added/deleted and change in parallelism ?
>>>
>>> Regards,
>>> Swathi C
>>>
>>> --
>>> *From:* Gyula Fóra 
>>> *Sent:* Wednesday, February 1, 2023 8:54 PM
>>> *To:* Swathi Chandrashekar 
>>> *Cc:* user@flink.apache.org 
>>> *Subject:* [EXTERNAL] Re: Query on flink-operator autoscale support
>>>
>>> The autoscaler currently only works with Native App clusters.
>>> Native session clusters may be supported in the future but standalone is
>>> not on our roadmap due to a different resource/scheduling model used.
>>>
>>> Gyula
>>>
>>> On Wed, Feb 1, 2023 at 4:22 PM Swathi Chandrashekar <
>>> cswa...@microsoft.com> wrote:
>>>
>>> Hi,
>>>
>>> I'm was testing OSS flink operator with flink 1.17 for autoscale
>>> feature. I was able to scale the cluster based on load in application
>>> cluster in native mode, but the same did not work in standalone mode as the
>>> operator gave the following error as below [ both for app and session mode
>>> ].
>>>
>>> Is the autoscale supported for the following :
>>>
>>>1. Session cluster in standalone
>>>2. Session cluster in native
>>>3. App cluster in standalone
>>>
>>>
>>> Exception in thread "pool-4-thread-3" java.lang.NoSuchMethodError:
>>> org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient.create(Lorg/apache/flink/configuration/Configuration;Ljava/util/concurrent/ExecutorService;)Lorg/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient;
>>> at
>>> org.apache.flink.kubernetes.operator.service.StandaloneFlinkService.createNamespacedKubeClient(StandaloneFlinkService.java:105)
>>> at
>>> org.apache.flink.kubernetes.operator.service.StandaloneFlinkService.submitClusterInternal(StandaloneFlinkService.java:110)
>>> at
>>> org.apache.flink.kubernetes.operator.service.StandaloneFlinkService.deployApplicationCluster(StandaloneFlinkService.java:69)
>>> at
>>> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitApplicationCluster(AbstractFlinkService.java:180)
>>> at
>>> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:175)
>>> at
>>> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:58)
>>> at
>>> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:115)
>>> at
>>> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:130)
>>> at
>>> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
>>> at
>>> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136)
>>> a

Re: Questions on Restarting a Flink Application from a savepoint or checkpoint

2023-07-21 Thread Gyula Fóra
"One customization we did was to have the job-submitter pod search for the
latest checkpoint or savepoint in S3 and then submit this information with
the Flink job to the Flink cluster"

I am aware that the Google operator does not support redeploying from last
checkpoint it always uses savepoint so you had to do this customization.
However the Flink Kubernetes Operator supports latest state upgrades /
restarts which uses the latest checkpoint/savepoint whichever was taken
last. So you can simply change the spec and the operator will upgrade with
latest state available (or take a savepoint and upgrade if you want).

" In this case, we had to delete the Flink application first, remove the
corrupted checkpoint or savepoint, and then redeploy the Flink application."

If you are concerned about checkpoint corruption, you can keep doing the
same thing with the Flink Kubernetes Operator. If you notice that some
upgrade/restart failing due to corruption, then you can simply delete the
FlinkDeployment object and re-create it by setting the
initialSavepointPath. We currently do not support changing the
initialSavepointPath for an already existing FlinkDeployment. For this you
have to delete/recreate it.
If you simply suspend , remove the checkpoint and then set to running, the
deployment will fail because it won't find the checkpoint, so you will have
to delete the FlinkDeployment to clear up any state information.

"In addition to restarting from a particular savepoint, is there a way to
restart a Flink application from the latest *checkpoint* while redeploying
the Flink application?"

If you have upgradeMode set to last-state, this is what happens always.
There is no way to upgrade/restart it to not use the last
checkpoint/savepoint :)


I feel that the Flink Operator should already cover your use cases but
there may be some semantic confusion about how it works with regards to
state handling during upgrades, spec changes etc.

Cheers,
Gyula


On Fri, Jul 21, 2023 at 5:20 PM Tony Chen  wrote:

> For context, we have forked the GoogleCloudPlatform operator (
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator), and we
> have customized it a bit to fit our use cases here. One customization we
> did was to have the job-submitter pod search for the latest checkpoint or
> savepoint in S3 and then submit this information with the Flink job to the
> Flink cluster.
>
> In the past we had some incidents where the checkpoint or savepoint was
> corrupted, and when the Flink application was trying to start again, it
> wasn't able to start from the latest checkpoint or savepoint. In this case,
> we had to delete the Flink application first, remove the corrupted
> checkpoint or savepoint, and then redeploy the Flink application.
>
> We are in the process of migrating to the Apache Flink Kubernetes
> Operator, and we would like to ensure that some of the customizations we
> did with the GoogleCloudPlatform operator can be performed with the Apache
> operator. I'm guessing that with the Apache operator, instead of deleting
> the Flink application, we should put the Flink application in a
> "suspended" state first, manually clean up the corrupted checkpoint or
> savepoint in S3, and then put the Flink application in a "running" state
> again?
>
> In addition to restarting from a particular savepoint, is there a way to
> restart a Flink application from the latest *checkpoint* while
> redeploying the Flink application? I was wondering if there's a field in
> the kubernetes field where I can specify which checkpoint to start from.
> For some of our applications, we complete checkpoints more often
> than savepoints, and we would like these Flink applications to always start
> from the latest checkpoint.
>
> Thanks,
> Tony
>
> On Thu, Jul 20, 2023 at 1:18 AM Gyula Fóra  wrote:
>
>> Hey!
>>
>> Please help us understand why you need to delete and recreate the
>> FlinkDeployment objects in your ecosystem. Maybe we can help suggest some
>> alternative to make your life easier :)
>>
>> Of course every prod ecosystem is unique in its own way and larger
>> platforms generally have a layer on top of the operator to manage these
>> special requirements.
>>
>> In most cases it’s possible to contribute these changes to Flink as long
>> as they fit the scope / larger development direction of the project . This
>> would require a FLIP.
>>
>> But before going there I think it’s worth talking about this
>> delete/recreate requirement because it sounds a bit strange in the
>> Kubernetes world . We specifically designed the operator in a way so that
>> you wouldn’t have to do this if you want the latest state and so far this
>> is the first I hear this ask :)
>

Re: How to define imagePullSecrets with k8s operator to fetch image using FlinkDeploymentSpec

2023-07-21 Thread Gyula Fóra
Hi!
We don't have imagePullSecrets as part of the FlinkDeplyomentSpec at the
moment, however you can simply use the following built in Flink
configuration:

https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#kubernetes-container-image-pull-secrets
kubernetes.container.image.pull-secrets: secrets

Cheers,
Gyula

On Fri, Jul 21, 2023 at 10:05 AM Rajat Ahuja 
wrote:

> Hi Flink Experts,
>
> When I need to fetch a customer Docker image (built on top of Flink 1.17
> to add connectors or SQL code) from our organization's private registry
> using *K8s operator, it fails with the error. Although image is present
> on repo, and able to pull on my local machine after pushing to
> artifactory. *
>
> *Warning  Failed 45s (x4 over 2m11s)  kubeletFailed to
> pull image
> "artifactory.us.bank-dns.com:5000/runner/flink-sql-runner-example:latest
> ":
> rpc error: code = Unknown desc = Error response from daemon: manifest for
> artifactory.us.bank-dns.com:5000/runner/flink-sql-runner-example:latest
> 
> not found: manifest unknown: The named manifest is not known to the
> registry."*
>
>
> If i use normal deployment, and pull image i use it with imagePullSecrets
> but i was not able to find the similar placeholder to define in Flink
> deployment kubernetes operator. Any help how to get pass this issue ?
>
> Thanks
>
>
>


Re: Questions on Restarting a Flink Application from a savepoint or checkpoint

2023-07-19 Thread Gyula Fóra
Hey!

Please help us understand why you need to delete and recreate the
FlinkDeployment objects in your ecosystem. Maybe we can help suggest some
alternative to make your life easier :)

Of course every prod ecosystem is unique in its own way and larger
platforms generally have a layer on top of the operator to manage these
special requirements.

In most cases it’s possible to contribute these changes to Flink as long as
they fit the scope / larger development direction of the project . This
would require a FLIP.

But before going there I think it’s worth talking about this
delete/recreate requirement because it sounds a bit strange in the
Kubernetes world . We specifically designed the operator in a way so that
you wouldn’t have to do this if you want the latest state and so far this
is the first I hear this ask :)

Cheers
Gyula

On Thu, 20 Jul 2023 at 00:07, Tony Chen  wrote:

> Hi Gyula,
>
> Got it. Our use case might be unique to our own ecosystem here at
> Robinhood, so I will have to look into creating a service that can search
> for the latest savepoint / checkpoint in S3 and provide that to the
> FlinkDeployment resource.
>
> Will the Flink Community be okay with us adding this feature to the GitHub
> repo eventually? I was going through this guide
> <https://flink.apache.org/how-to-contribute/contribute-code/>, and it
> looks like I need to get consensus first.
>
> Thanks,
> Tony
>
> On Wed, Jul 19, 2023 at 4:33 PM Gyula Fóra  wrote:
>
>> Hi!
>>
>> I don’t understand why you need to delete the deployment to restart. You
>> can suspend, use the restartNonce or simply upgrade .
>>
>> These should cover most upgrade/restart scenarios. Like with other
>> resources in Kubernetes once you delete them the status is gone, so the
>> FlinkDeployment won’t keep the last state info.
>>
>> To keep the state after deletion you would have to introduce new
>> resources or an external state store. We are not planning to support this
>> as it goes against the standard Kubernetes resource management flow.
>>
>> I think you should look into simply suspending the job for the while or
>> just use a regular upgrade to fit your needs .
>>
>> Cheers
>> Gyula
>>
>> On Wed, 19 Jul 2023 at 22:19, Tony Chen  wrote:
>>
>>> Hi Gyula,
>>>
>>> Thank you for responding so quickly. I went through the page you sent me
>>> a bit more, and I see the following (
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.4/docs/custom-resource/job-management/#running-suspending-and-deleting-applications
>>> ):
>>>
>>> Deleting a deployment will remove all checkpoint and status information.
>>>> Future deployments will from an empty state unless manually overridden by
>>>> the user.
>>>>
>>>
>>> For our use case, we do delete the deployment and redeploy the Flink
>>> application sometimes in order to restart our Flink applications. We were
>>> wondering if it's possible for the operator to retain checkpoint and status
>>> information even after the deployment gets deleted.
>>>
>>> Thanks,
>>> Tony
>>>
>>> On Wed, Jul 19, 2023 at 3:46 PM Gyula Fóra  wrote:
>>>
>>>> Hey Tony,
>>>>
>>>> Please see:
>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades
>>>>
>>>> The operator is made especially to handle stateful application upgrades
>>>> robustly. In general any spec change that you make that will lead to an
>>>> upgrade will be executed using the latest available / checkpoint or
>>>> savepoint. This is controlled by the `upgradeMode` setting for jobs, as
>>>> long as you have last-state or savepoint you will always get the latest
>>>> state.
>>>>
>>>> This is somewhat orthogonal to the savepoint trigger /
>>>> initialSavepointPath mechanisms. The initialSavepointPath should be used
>>>> only the first time the deployment is created because at that point the
>>>> operator is not aware of the latest state. After that all upgrades always
>>>> use the latest state unless the upgradeMode is stateless in which case no
>>>> state is used. Savepoint triggering can help you keep backups for failure
>>>> recovery but they should not be executed as part of your upgrade flow
>>>> because the operator already does this for you.
>>>>
>>>> Cheers,
>>>> Gyula
>>>&g

Re: Questions on Restarting a Flink Application from a savepoint or checkpoint

2023-07-19 Thread Gyula Fóra
Hi!

I don’t understand why you need to delete the deployment to restart. You
can suspend, use the restartNonce or simply upgrade .

These should cover most upgrade/restart scenarios. Like with other
resources in Kubernetes once you delete them the status is gone, so the
FlinkDeployment won’t keep the last state info.

To keep the state after deletion you would have to introduce new resources
or an external state store. We are not planning to support this as it goes
against the standard Kubernetes resource management flow.

I think you should look into simply suspending the job for the while or
just use a regular upgrade to fit your needs .

Cheers
Gyula

On Wed, 19 Jul 2023 at 22:19, Tony Chen  wrote:

> Hi Gyula,
>
> Thank you for responding so quickly. I went through the page you sent me a
> bit more, and I see the following (
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.4/docs/custom-resource/job-management/#running-suspending-and-deleting-applications
> ):
>
> Deleting a deployment will remove all checkpoint and status information.
>> Future deployments will from an empty state unless manually overridden by
>> the user.
>>
>
> For our use case, we do delete the deployment and redeploy the Flink
> application sometimes in order to restart our Flink applications. We were
> wondering if it's possible for the operator to retain checkpoint and status
> information even after the deployment gets deleted.
>
> Thanks,
> Tony
>
> On Wed, Jul 19, 2023 at 3:46 PM Gyula Fóra  wrote:
>
>> Hey Tony,
>>
>> Please see:
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades
>>
>> The operator is made especially to handle stateful application upgrades
>> robustly. In general any spec change that you make that will lead to an
>> upgrade will be executed using the latest available / checkpoint or
>> savepoint. This is controlled by the `upgradeMode` setting for jobs, as
>> long as you have last-state or savepoint you will always get the latest
>> state.
>>
>> This is somewhat orthogonal to the savepoint trigger /
>> initialSavepointPath mechanisms. The initialSavepointPath should be used
>> only the first time the deployment is created because at that point the
>> operator is not aware of the latest state. After that all upgrades always
>> use the latest state unless the upgradeMode is stateless in which case no
>> state is used. Savepoint triggering can help you keep backups for failure
>> recovery but they should not be executed as part of your upgrade flow
>> because the operator already does this for you.
>>
>> Cheers,
>> Gyula
>>
>> On Wed, Jul 19, 2023 at 8:20 PM Tony Chen 
>> wrote:
>>
>>> Hi Flink Community,
>>>
>>> My name is Tony Chen, and I am a software engineer at Robinhood. I have
>>> some questions on restarting a Flink Application from a savepoint or
>>> checkpoint.
>>>
>>> We currently store our checkpoints and savepoints in S3, and we would
>>> like to use the Apache Flink Kubernetes Operator to manage our Flink
>>> applications. I know that there is a field called "initialSavepointPath" (
>>> doc
>>> <https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#manual-recovery>)
>>> that I can set in my kubernetes manifest so that whenever I want my Flink
>>> application to start from a particular savepoint, it will start from
>>> the savepoint directory in this field. However, if I delete this
>>> FlinkDeployment resource altogether after new savepoints were triggered,
>>> and then redeploy this FlinkDeployment resource, it looks like I have to
>>> manually update the "initialSavepointPath" to a newer savepoint directory
>>> so that the Flink application starts from a newer savepoint.
>>>
>>> Is there a way for us to redeploy FlinkDeployment resources so that the
>>> latest checkpoint or savepoint is used, and without having to update the
>>> "initialSavepointPath" field? I noticed in my testing that whenever I
>>> deleted the FlinkDeployment resource and redeploy, it would either start
>>> from the savepoint in initialSavepointPath or from checkpoint 1 if
>>> initialSavepointPath was not set.
>>>
>>> For example, let's say I restarted a Flink application at savepoint 10
>>> with initialSavepointPath set to s3://savepoints/savepoint-10, and then
>>> later on a savepoint 20 was completed and store

Re: Questions on Restarting a Flink Application from a savepoint or checkpoint

2023-07-19 Thread Gyula Fóra
Hey Tony,

Please see:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#stateful-and-stateless-application-upgrades

The operator is made especially to handle stateful application upgrades
robustly. In general any spec change that you make that will lead to an
upgrade will be executed using the latest available / checkpoint or
savepoint. This is controlled by the `upgradeMode` setting for jobs, as
long as you have last-state or savepoint you will always get the latest
state.

This is somewhat orthogonal to the savepoint trigger / initialSavepointPath
mechanisms. The initialSavepointPath should be used only the first time the
deployment is created because at that point the operator is not aware of
the latest state. After that all upgrades always use the latest state
unless the upgradeMode is stateless in which case no state is used.
Savepoint triggering can help you keep backups for failure recovery but
they should not be executed as part of your upgrade flow because the
operator already does this for you.

Cheers,
Gyula

On Wed, Jul 19, 2023 at 8:20 PM Tony Chen  wrote:

> Hi Flink Community,
>
> My name is Tony Chen, and I am a software engineer at Robinhood. I have
> some questions on restarting a Flink Application from a savepoint or
> checkpoint.
>
> We currently store our checkpoints and savepoints in S3, and we would like
> to use the Apache Flink Kubernetes Operator to manage our Flink
> applications. I know that there is a field called "initialSavepointPath" (
> doc
> )
> that I can set in my kubernetes manifest so that whenever I want my Flink
> application to start from a particular savepoint, it will start from
> the savepoint directory in this field. However, if I delete this
> FlinkDeployment resource altogether after new savepoints were triggered,
> and then redeploy this FlinkDeployment resource, it looks like I have to
> manually update the "initialSavepointPath" to a newer savepoint directory
> so that the Flink application starts from a newer savepoint.
>
> Is there a way for us to redeploy FlinkDeployment resources so that the
> latest checkpoint or savepoint is used, and without having to update the
> "initialSavepointPath" field? I noticed in my testing that whenever I
> deleted the FlinkDeployment resource and redeploy, it would either start
> from the savepoint in initialSavepointPath or from checkpoint 1 if
> initialSavepointPath was not set.
>
> For example, let's say I restarted a Flink application at savepoint 10
> with initialSavepointPath set to s3://savepoints/savepoint-10, and then
> later on a savepoint 20 was completed and stored at
> s3://savepoints/savepoint-20. Is there a way for me to delete this
> FlinkDeployment and redeploy it without updating initialSavepointPath?
>
> Thanks,
> Tony
>
> P.S. I'm going through the source code more for Apache Flink Kubernetes
> Operator to understand how the operator starts a Flink job. Some relevant
> code:
>
>-
>
> https://github.com/apache/flink-kubernetes-operator/blob/0c341ebe13645f4e9802cfd780c5b50f59e29363/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L500
>-
>
> https://github.com/apache/flink-kubernetes-operator/blob/0c341ebe13645f4e9802cfd780c5b50f59e29363/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SavepointObserver.java#L204
>
>
> --
>
> 
>
> Tony Chen
>
> Software Engineer
>
> Menlo Park, CA
>
> Don't copy, share, or use this email without permission. If you received
> it by accident, please let us know and then delete it right away.
>


Re: [Flink K8s Operator] Trigger nonce missing for manual savepoint info

2023-07-12 Thread Gyula Fóra
Maybe you have inconsistent operator / CRC versions? In any case I highly
recommend upgrading to the lates operator version to get all the bug /
security fixes and improvements.

Gyula

On Wed, 12 Jul 2023 at 10:58, Paul Lam  wrote:

> Hi,
>
> I’m using K8s operator 1.3.1 with Flink 1.15.2 on 2 K8s clusters. Weird
> enough, on one K8s cluster the flink deployments would show savepoint
> trigger nonce. while the flink deployments on the other cluster wouldn’t.
>
> The normal output is as follows:
>
> ```
> Last Savepoint:
> Format Type: CANONICAL
> Location:
> hdfs://.../savepoints/61dfcb2fd7946a0001827c55/savepoint-a81885-68d00e5130b8
> Time Stamp: 1689145852437
> Trigger Nonce: 2
> Trigger Type: MANUAL
> ``
>  And the erroneous output is like this (without trigger nonce and format
> type):
> ```
> Last Savepoint:
> Location:
> hdfs://.../savepoints/61dfcb2fd7946a0001827c55/savepoint-31c74e-3347299b84ec
> Time Stamp: 1689150184799
> Trigger Type: MANUAL
> ```
>
> Do you have any ideas about what’s causing this? Thanks!
>
> Best,
> Paul Lam
>
>


Re: why are kubernetes.namespace and kubernetes.cluster-id config fields forbidden in the flink-kubernetes-operator validator?

2023-06-14 Thread Gyula Fóra
The namespace and cluster id are automatically set based on the namespace
and name of the FlinkDeployment resource .

This is an important design choice that allows efficient management of the
applications.

Gyula

On Wed, 14 Jun 2023 at 19:31, Nathan Moderwell <
nathan.moderw...@robinhood.com> wrote:

> Saw this error when creating a job:
>
>> unable to create object: admission webhook "
>> validationwebhook.flink.apache.org" denied the request: Forbidden Flink
>> config key: kubernetes.namespace
>
>
> Looking through the source code, I see these are forbidden config keys,
> but could not find any explanation for why searching through blame and old
> PR's. Do we not need this for HA setup with FlinkDeployment CRD?
>
> https://github.com/apache/flink-kubernetes-operator/blob/d43e1ca9050e83b492b2e16b0220afdba4ffa646/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java#L64-L69
>
> Thanks,
> Nathan
>


Re: Kubernetes operator: config for taskmanager.memory.process.size ignored

2023-06-14 Thread Gyula Fóra
Again, this has absolutely nothing to do with the Kubernetes Operator, but
simply how Flink Kubernetes Memory configs work:
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_tuning/#configure-memory-for-containers

You can probably play around with:  jobmanager.memory.jvm-overhead.fraction

You can set a larger memory size in the TM spec and increase the jvm
overhead fraction.

Gyula

On Wed, Jun 14, 2023 at 2:46 PM Robin Cassan 
wrote:

> Thanks Gyula for your answer! I'm wondering about your claim:
> > In Flink kubernetes the process is the pod so pod memory is always equal
> to process memory
> Why should the flink TM process use the whole container (and so, the whole
> pod) memory?
>
> Before migrating to the k8s operator, we still used Flink on kubernetes
> (without the operator) and left a little bit of margin between the process
> memory and the pod memory, which helped stability. It looks like it cannot
> be done with the k8s operator though and I wonder why the choice of
> removing this granularity in the settings
>
> Robin
>
> Le mer. 14 juin 2023 à 12:20, Gyula Fóra  a écrit :
>
>> Basically what happens is that whatever you set to the
>> spec.taskManager.resource.memory will be set in the config as process
>> memory.
>> In Flink kubernetes the process is the pod so pod memory is always equal
>> to process memory.
>>
>> So basically the spec is a config shorthand, there is no reason to
>> override it as you won't get a different behaviour at the end of the day.
>>
>> Gyula
>>
>> On Wed, Jun 14, 2023 at 11:55 AM Robin Cassan via user <
>> user@flink.apache.org> wrote:
>>
>>> Hello all!
>>>
>>> I am using the flink kubernetes operator and I would like to set the
>>> value for `taskmanager.memory.process.size`. I set the desired value in the
>>> flinkdeployment resource specs (here, I want 55gb), however it looks like
>>> the value that is effectively passed to the taskmanager is the same as the
>>> pod memory setting (which is set to 59gb).
>>>
>>> For example, this flinkdeployment configuration:
>>> ```
>>> Spec:
>>>   Flink Configuration:
>>> taskmanager.memory.process.size:
>>>  55gb
>>>   Task Manager:
>>> Resource:
>>>   Cpu: 6
>>>   Memory:  59Gb
>>> ```
>>> will create a pod with 59Gb total memory (as expected) but will also
>>> give 59Gb to the memory.process.size instead of 55Gb, as seen in this TM
>>> log: `Loading configuration property: taskmanager.memory.process.size, 59Gb`
>>>
>>> Maybe this part of the flink k8s operator code is responsible:
>>>
>>> https://github.com/apache/flink-kubernetes-operator/blob/d43e1ca9050e83b492b2e16b0220afdba4ffa646/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java#L393
>>>
>>> If so, I wonder what is the rationale for forcing the flink process
>>> memory to be the same as the pod memory?
>>> Is there a way to bypass that, for example by setting the desired
>>> process.memory configuration differently?
>>>
>>> Thanks!
>>>
>>


Re: Kubernetes operator: config for taskmanager.memory.process.size ignored

2023-06-14 Thread Gyula Fóra
Basically what happens is that whatever you set to the
spec.taskManager.resource.memory will be set in the config as process
memory.
In Flink kubernetes the process is the pod so pod memory is always equal to
process memory.

So basically the spec is a config shorthand, there is no reason to override
it as you won't get a different behaviour at the end of the day.

Gyula

On Wed, Jun 14, 2023 at 11:55 AM Robin Cassan via user <
user@flink.apache.org> wrote:

> Hello all!
>
> I am using the flink kubernetes operator and I would like to set the value
> for `taskmanager.memory.process.size`. I set the desired value in the
> flinkdeployment resource specs (here, I want 55gb), however it looks like
> the value that is effectively passed to the taskmanager is the same as the
> pod memory setting (which is set to 59gb).
>
> For example, this flinkdeployment configuration:
> ```
> Spec:
>   Flink Configuration:
> taskmanager.memory.process.size:  55gb
>   Task Manager:
> Resource:
>   Cpu: 6
>   Memory:  59Gb
> ```
> will create a pod with 59Gb total memory (as expected) but will also give
> 59Gb to the memory.process.size instead of 55Gb, as seen in this TM log:
> `Loading configuration property: taskmanager.memory.process.size, 59Gb`
>
> Maybe this part of the flink k8s operator code is responsible:
>
> https://github.com/apache/flink-kubernetes-operator/blob/d43e1ca9050e83b492b2e16b0220afdba4ffa646/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java#L393
>
> If so, I wonder what is the rationale for forcing the flink process memory
> to be the same as the pod memory?
> Is there a way to bypass that, for example by setting the desired
> process.memory configuration differently?
>
> Thanks!
>


Re: Fail to run flink 1.17 job with flink-operator 1.5.0 version

2023-06-12 Thread Gyula Fóra
Hi!

I think you forgot to upgrade the operator CRD (which contains the updates
enum values).

Please see:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/upgrade/#1-upgrading-the-crd

Cheers
Gyula

On Mon, 12 Jun 2023 at 13:38, Liting Liu (litiliu) 
wrote:

> Hi,  I was trying to submit a flink 1.17 job with the
> flink-kubernetes-operator version v1.5.0.
> But encountered the below exception:
>
>
> The FlinkDeployment "test-scale-z6t4cd" is invalid: spec.flinkVersion:
> Unsupported value: "v1_17": supported values: "v1_13", "v1_14", "v1_15",
> "v1_16"
>
>
> I think the flink-operator should have supported flink 1.17, because as
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/custom-resource/autoscaler/
>  requires,
>  autoscaler only work well with flink 1.17.
>
>
>
>


Re: Fail to run flink 1.17 job with flink-operator 1.5.0 version

2023-06-12 Thread Gyula Fóra
Hi!

I think you forgot to upgrade the operator CRD (which contains the updates
enum values).

Please see:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/upgrade/#1-upgrading-the-crd

Cheers
Gyula

On Mon, 12 Jun 2023 at 13:38, Liting Liu (litiliu) 
wrote:

> Hi,  I was trying to submit a flink 1.17 job with the
> flink-kubernetes-operator version v1.5.0.
> But encountered the below exception:
>
>
> The FlinkDeployment "test-scale-z6t4cd" is invalid: spec.flinkVersion:
> Unsupported value: "v1_17": supported values: "v1_13", "v1_14", "v1_15",
> "v1_16"
>
>
> I think the flink-operator should have supported flink 1.17, because as
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/custom-resource/autoscaler/
>  requires,
>  autoscaler only work well with flink 1.17.
>
>
>
>


Re: RocksDB segfault on state restore

2023-06-01 Thread Gyula Fóra
Hi!


In our case, no schema evolution was triggered , only the TTL was set from
the beginning as far as I remember.

I will double check

Gyula

On Fri, 2 Jun 2023 at 06:12, Hangxiang Yu  wrote:

> Hi, Gyula.
> It seems related to https://issues.apache.org/jira/browse/FLINK-23346.
> We also saw core dump while using list state after triggering state
> migration and ttl compaction filter. Have you triggered the schema
> evolution ?
> It seems a bug of the rocksdb list state together with ttl compaction
> filter.
>
> On Wed, May 17, 2023 at 7:05 PM Gyula Fóra  wrote:
>
>> Hi All!
>>
>> We are encountering an error on a larger stateful job (around 1 TB +
>> state) on restore from a rocksdb checkpoint. The taskmanagers keep crashing
>> with a segfault coming from the rocksdb native logic and seem to be related
>> to the FlinkCompactionFilter mechanism.
>>
>> The gist with the full error report:  report:
>> https://gist.github.com/gyfora/f307aa570d324d063e0ade9810f8bb25
>>
>> The core part is here:
>> V  [libjvm.so+0x79478f]  Exceptions::
>> (Thread*, char const*, int, oopDesc*)+0x15f
>> V  [libjvm.so+0x960a68]  jni_Throw+0x88
>> C  [librocksdbjni-linux64.so+0x222aa1]
>>  JavaListElementFilter::NextUnexpiredOffset(rocksdb::Slice const&, long,
>> long) const+0x121
>> C  [librocksdbjni-linux64.so+0x6486c1]
>>  rocksdb::flink::FlinkCompactionFilter::ListDecide(rocksdb::Slice const&,
>> std::string*) const+0x81
>> C  [librocksdbjni-linux64.so+0x648bea]
>>  rocksdb::flink::FlinkCompactionFilter::FilterV2(int, rocksdb::Slice
>> const&, rocksdb::CompactionFilter::ValueType, rocksdb::Slice const&,
>> std::string*, std::string*) const+0x14a
>>
>> Has anyone encountered a similar issue before?
>>
>> Thanks
>> Gyula
>>
>>
>
> --
> Best,
> Hangxiang.
>


Re: Flink Kubernetes Operator lifecycle state count metrics question

2023-05-23 Thread Gyula Fóra
Hi Andrew!

I think you are completely right, this is a bug. The per namespace metrics
do not seem to filter per namespace and show the aggregated global count
for each namespace:

I opened a ticket:
https://issues.apache.org/jira/browse/FLINK-32164

Thanks for reporting this!
Gyula

On Mon, May 22, 2023 at 10:49 PM Andrew Otto  wrote:

> Also!  I do have 2 FlinkDeployments deployed with this operator, but they
> are in different namespaces, and each of the per namespace metrics reports
> that it has 2 Deployments in them, even though there is only one according
> to kubectl.
>
> Actually...we just tried to deploy a change (enabling some checkpointing)
> that caused one of our FlinkDeployments to fail.  Now, both namespace
> STABLE_Counts each report 1.
>
> # curl -s : | grep
> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count
> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count{resourcetype="FlinkDeployment",resourcens="stream_enrichment_poc",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
> 1.0
> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count{resourcetype="FlinkDeployment",resourcens="rdf_streaming_updater",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
> 1.0
>
> It looks like maybe this metric is not reporting per namespace, but a
> global count.
>
>
>
> On Mon, May 22, 2023 at 2:56 PM Andrew Otto  wrote:
>
>> Oh, FWIW, I do have operator HA enabled with 2 replicas running, but in
>> my examples there, I am curl-ing the leader flink operator pod.
>>
>>
>>
>> On Mon, May 22, 2023 at 2:47 PM Andrew Otto  wrote:
>>
>>> Hello!
>>>
>>> I'm doing some grafana+prometheus dashboarding for
>>> flink-kubernetes-operator.  Reading metrics docs
>>> , I see that I have nice per k8s
>>> namespace lifecycle current count gauge metrics in Prometheus.
>>>
>>> Using kubectl, I can see that I have one FlinkDeployment in my namespace:
>>>
>>> # kubectl -n stream-enrichment-poc get flinkdeployments
>>> NAME JOB STATUS   LIFECYCLE STATE
>>> flink-app-main   RUNNING  STABLE
>>>
>>> But, prometheus is reporting that I have 2 FlinkDeployments in the
>>> STABLE state.
>>>
>>> # curl -s :  | grep
>>> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count
>>> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count{resourcetype="FlinkDeployment",resourcens="stream_enrichment_poc",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
>>> 2.0
>>>
>>> I'm not sure why I see 2.0 reported.
>>> flink_k8soperator_namespace_JmDeploymentStatus_READY_Count reports only
>>> one FlinkDeployment.
>>>
>>> # curl :/metrics | grep
>>> flink_k8soperator_namespace_JmDeploymentStatus_READY_Count
>>> flink_k8soperator_namespace_JmDeploymentStatus_READY_Count{resourcetype="FlinkDeployment",resourcens="stream_enrichment_poc",name="flink_kubernetes_operator",host="flink_kubernetes_operator_86b888d6b6_gbrt4",namespace="flink_operator",}
>>> 1.0
>>>
>>> Is it possible that
>>> flink_k8soperator_namespace_Lifecycle_State_STABLE_Count is being
>>> reported as an incrementing counter instead of a guage?
>>>
>>> Thanks
>>> -Andrew Otto
>>>  Wikimedia Foundation
>>>
>>>


[ANNOUNCE] Apache Flink Kubernetes Operator 1.5.0 released

2023-05-17 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.5.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Release highlights:
 - Autoscaler improvements
 - Operator stability, observability improvements

Release blogpost:
https://flink.apache.org/2023/05/17/apache-flink-kubernetes-operator-1.5.0-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator applications can be
found at: https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/projects/FLINK/versions/12352931

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


[ANNOUNCE] Apache Flink Kubernetes Operator 1.5.0 released

2023-05-17 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.5.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Release highlights:
 - Autoscaler improvements
 - Operator stability, observability improvements

Release blogpost:
https://flink.apache.org/2023/05/17/apache-flink-kubernetes-operator-1.5.0-release-announcement/

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator applications can be
found at: https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/projects/FLINK/versions/12352931

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


RocksDB segfault on state restore

2023-05-17 Thread Gyula Fóra
Hi All!

We are encountering an error on a larger stateful job (around 1 TB + state)
on restore from a rocksdb checkpoint. The taskmanagers keep crashing with a
segfault coming from the rocksdb native logic and seem to be related to the
FlinkCompactionFilter mechanism.

The gist with the full error report:  report:
https://gist.github.com/gyfora/f307aa570d324d063e0ade9810f8bb25

The core part is here:
V  [libjvm.so+0x79478f]  Exceptions::
(Thread*, char const*, int, oopDesc*)+0x15f
V  [libjvm.so+0x960a68]  jni_Throw+0x88
C  [librocksdbjni-linux64.so+0x222aa1]
 JavaListElementFilter::NextUnexpiredOffset(rocksdb::Slice const&, long,
long) const+0x121
C  [librocksdbjni-linux64.so+0x6486c1]
 rocksdb::flink::FlinkCompactionFilter::ListDecide(rocksdb::Slice const&,
std::string*) const+0x81
C  [librocksdbjni-linux64.so+0x648bea]
 rocksdb::flink::FlinkCompactionFilter::FilterV2(int, rocksdb::Slice
const&, rocksdb::CompactionFilter::ValueType, rocksdb::Slice const&,
std::string*, std::string*) const+0x14a

Has anyone encountered a similar issue before?

Thanks
Gyula


Re: [Flink K8s Operator] Automatic cleanup of terminated deployments

2023-05-14 Thread Gyula Fóra
There is no such feature currently, Kubernetes resources usually do not
delete themselves :)
The problem I see here is by deleting the resource you lose all information
about what happened, you won't know if it failed or completed etc.
What is the use-case you are thinking about?

If this is something you think would be good to add, please open a JIRA
ticket for it. But in any case this will probably merit a dev list
discussion.

Gyula

On Sun, May 14, 2023 at 11:54 AM Paul Lam  wrote:

> Hi all,
>
> Currently, if a job turns into terminated status (e.g. FINISHED or
> FAILED), the flinkdeployment remains until a manual cleanup is performed. I
> went through the docs but did not find any way to clean them up
> automatically. Am I missing something? Thanks!
>
> Best,
> Paul Lam
>


Re: flink-kubernetes-operator HA k8s RoleBinding for Leases?

2023-05-08 Thread Gyula Fóra
Hey!

Sounds like a bug :) Could you please open a jira / PR (in case you fixed
this already)?

Thanks
Gyula

On Mon, 8 May 2023 at 22:20, Andrew Otto  wrote:

> Hi,
>
> I'm trying to enable HA for flink-kubernetes-operator
> 
> with Helm.  We are using namespaced RBAC via watchedNamespaces.
>
> I've followed instructions and set
> kubernetes.operator.leader-election.enabled and
> kubernetes.operator.leader-election.lease-name, and increased replicas to
> 2.  When I deploy, the second replica comes online, but errors with:
>
> Exception occurred while acquiring lock 'LeaseLock: flink-operator -
> flink-operator-lease (flink-kubernetes-operator-86b888d6b6-8cxjs
> Failure executing: GET at:
> https://x.x.x.x/apis/coordination.k8s.io/v1/namespaces/flink-operator/leases/flink-operator-lease.
> Message: Forbidden!Configured service account doesn't have access. Service
> account may have been revoked. leases.coordination.k8s.io
> "flink-operator-lease" is forbidden: User
> "system:serviceaccount:flink-operator:flink-operator" cannot get resource
> "leases" in API group "coordination.k8s.io" in the namespace
> "flink-operator".
>
> Looking at the rbac.yaml helm template
> ,
> it looks like the Role and RoleBindings that grant access to the leases
> resource are created for the configured watchNamespaces, but not for the
> namespace in which the flink-kubernetes-operator is deployed.  I think that
> for HA, the flink-kubernetes-operator is going to be asking k8s for Leases
> in its own namespace, right?
>
> Is this a bug, or am I doing something wrong?  I'd file a JIRA, but I
> betcha I'm just doing something wrong (unless I'm the first person who's
> tried to use HA + namespaced RBAC with the helm charts?).
>
> Thanks!
> -Andrew Otto
>  Wikimedia Foundation
>
>
>
>
>
>


Re: Autoscaler

2023-05-01 Thread Gyula Fóra
There is only one kind of autoscaler in the Flink Kubernetes Operator. And
the docs can be found here:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/

We usually refer to it as the Job Autoscaler (as it scales individual jobs)
but the mechanism itself scales the job vertexes (operators) of the job.
Hope this helps.

Gyula

On Mon, May 1, 2023 at 11:14 AM rania duni  wrote:

> Hello!
>
> How can I enable the operator autoscaler and not the job autoscaler ? I
> don't understand if the default behavior is job scaling or operator
> scaling, I am a little confused with the documentation.
>
> Thank you!
>


Re: Can I setup standby taskmanagers while using reactive mode?

2023-04-28 Thread Gyula Fóra
You could also check out the Autoscaler logic in the Flink Kubernetes
Operator (
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/
)
On the current main and in the upcoming 1.5.0 release the mechanism is
pretty nice and solid :)

It works with the native integration so you can also set standby TMs with a
simple config.

Cheers,
Gyula

On Fri, Apr 28, 2023 at 7:31 AM Wei Hou  wrote:

> Thank you for all your responses! I think Gyula is right, simply do a MAX -
> some_offset is not ideal as it can make the standby TM useless.
> It is difficult for the scheduler to determine whether a pod has been lost
> or scaled down when we enable autoscaling, which affects its decision to
> utilize standby TMs. We probably need to monitor the HPA events in order to
> get this information.
> I will wait to see if there is a solution for this problem in the future.
>
>
> On Wed, Apr 26, 2023 at 7:20 AM Gyula Fóra  wrote:
>
>> I think the behaviour is going to get a little weird because this would
>> actually defeat the purpose of the standby TM.
>> MAX - some offset will decrease once you lose a TM so in this case we
>> would scale down to again have a spare (which we never actually use.)
>>
>> Gyula
>>
>> On Wed, Apr 26, 2023 at 4:02 PM Chesnay Schepler 
>> wrote:
>>
>>> Reactive mode doesn't support standby taskmanagers. As you said it
>>> always uses all available resources in the cluster.
>>>
>>> I can see it being useful though to not always scale to MAX but (MAX -
>>> some_offset).
>>>
>>> I'd suggest to file a ticket.
>>>
>>> On 26/04/2023 00:17, Wei Hou via user wrote:
>>> > Hi Flink community,
>>> >
>>> > We are trying to use Flink’s reactive mode with Kubernetes HPA for
>>> autoscaling, however since the reactive mode will always use all available
>>> resources, it causes a problem when we need standby task managers for fast
>>> failure recover: The job will always use these extra standby task managers
>>> as active task manager to process data.
>>> >
>>> > I wonder if you have any suggestion on this, should we avoid using
>>> Flink reactive mode together with standby task managers?
>>> >
>>> > Best,
>>> > Wei
>>> >
>>> >
>>>
>>>


Re: Flink Kubernetes Operator Scale Issue

2023-04-27 Thread Gyula Fóra
Hi!

It’s currently not possible to run the operator in parallel by simply
adding more replicas. However there are different things you can do to
scale both vertically and horizontally.

First of all you can run multiple operators each watching different set of
namespaces to partition the load.

The operator also supports watching CRs with a certain label selector which
would allow you to horizontally partition the load with custom CR labels if
necessary.

You can also try increasing the reconciler parallelism of the operator to
use more threads and reconcile more CRs in parallel. If you increase this
you might need to increase the heap size as well.

Let me know if this helps!

Gyula

On Thu, 27 Apr 2023 at 09:15, Talat Uyarer via user 
wrote:

> Hi All,
>
> We are using Flink Kubernetes Operator on our production. We have 3k+ jobs
> in standalone mode. But after 2.5k jobs operator getting slow. Now when we
> submit a job it takes 10+ minutes to the job runs. Does anyone use similar
> scale or more job ?
>
> Now we run as a single pod. Does operator support multi pods if i increase
> replicas ?
>
> Do you have any suggestions where should i start looking to debug ?
>
> Thanks
>


Re: Can I setup standby taskmanagers while using reactive mode?

2023-04-26 Thread Gyula Fóra
I think the behaviour is going to get a little weird because this would
actually defeat the purpose of the standby TM.
MAX - some offset will decrease once you lose a TM so in this case we would
scale down to again have a spare (which we never actually use.)

Gyula

On Wed, Apr 26, 2023 at 4:02 PM Chesnay Schepler  wrote:

> Reactive mode doesn't support standby taskmanagers. As you said it
> always uses all available resources in the cluster.
>
> I can see it being useful though to not always scale to MAX but (MAX -
> some_offset).
>
> I'd suggest to file a ticket.
>
> On 26/04/2023 00:17, Wei Hou via user wrote:
> > Hi Flink community,
> >
> > We are trying to use Flink’s reactive mode with Kubernetes HPA for
> autoscaling, however since the reactive mode will always use all available
> resources, it causes a problem when we need standby task managers for fast
> failure recover: The job will always use these extra standby task managers
> as active task manager to process data.
> >
> > I wonder if you have any suggestion on this, should we avoid using Flink
> reactive mode together with standby task managers?
> >
> > Best,
> > Wei
> >
> >
>
>


Re: [Flink operator] Flink Autoscale - Limit the max number of scale ups

2023-04-24 Thread Gyula Fóra
Hi!

Please opena JIRA ticket with the details of your log, config and operator
version and we will take a look!

Thanks
Gyula

On Mon, Apr 24, 2023 at 2:04 PM Sriram Ganesh  wrote:

> Hi,
>
> I am trying the autoscale provided by the operator. I found that Autoscale
> keeps happening even after reaching max-parallelism. I have a question.
>
> How can we set the limit for the scale-up process? Because in my cluster
> it reached the max limit. So, there won't be any more resources available.
>
> And also why do you perform scale operations after reaching
> 'pipeline.max-parallelism' ?
>
> Please clarify.
>
> Thanks,
> Sriram G
>
> --
> *Sriram G*
> *Tech*
>
>


Re: Kubernetes operator stops responding due to Connection reset by peer

2023-04-22 Thread Gyula Fóra
Hi Alexis,

We have recently added support for canary deployments which allows the
liveness probe to detect general operator problems.

https://issues.apache.org/jira/browse/FLINK-31219

It's not completely automatic and you have to deploy the canaries yourself
but I think it will be helpful :)
This will be part of the upcoming 1.5.0 release.

Cheers,
Gyula

On Fri, Apr 21, 2023 at 11:50 PM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hello,
>
> Today, we received an alert because the operator appeared to be down. Upon
> further investigation, we realized the alert was triggered because the
> endpoint for Prometheus metrics (which we enabled) stopped responding, so
> it seems the endpoint used for the liveness probe wasn't affected and the
> pod was not restarted automatically.
>
> The logs right before the problem started don't show anything odd, and
> once the problem started, the logs were spammed with warning messages
> stating "Connection reset by peer" with no further information. From what I
> can see, nothing else was logged during that time, so it looks like the
> process really had stalled.
>
> I imagine this is not easy to reproduce and, while a pod restart was
> enough to get back on track, it might be worth improving the liveness probe
> to catch these situations.
>
> Full stacktrace for reference:
>
> An exceptionCaught() event was fired, and it reached at the tail of the
> pipeline. It usually means the last handler in the pipeline did not handle
> the exception.
> java.io.IOException: Connection reset by peer at
> java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method) at
> java.base/sun.nio.ch.SocketDispatcher.read(Unknown Source) at
> java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(Unknown Source) at
> java.base/sun.nio.ch.IOUtil.read(Unknown Source) at
> java.base/sun.nio.ch.IOUtil.read(Unknown Source) at
> java.base/sun.nio.ch.SocketChannelImpl.read(Unknown Source) at
> org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledDirectByteBuf.setBytes(UnpooledDirectByteBuf.java:570)
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> at
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> at java.base/java.lang.Thread.run(Unknown Source)
>
> Regards,
> Alexis.
>
>


Re: [Kubernetes Operator] NullPointerException from KubernetesApplicationClusterEntrypoint

2023-03-31 Thread Gyula Fóra
Never seen this before but also you should not set the cluster-id in your
config as that should be controlled by the operator itself.

Gyula

On Fri, Mar 31, 2023 at 2:39 PM Pierre Bedoucha 
wrote:

> Hi,
>
>
>
> We are trying to use Flink Kubernetes Operator 1.4.0 with Flink 1.16.
>
>
>
> However, at the job-manager deployment step we get the following error:
> ```
>
> Exception in thread "main" java.lang.NullPointerException
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.shutDownAsync(ClusterEntrypoint.java:585)
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:242)
>
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729)
>
> at
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86)
>
>
>
> ```
> It sems it is related to the following line:
>
> ```
>
> this.clusterId =
> checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID),
> "ClusterId must be specified!");
>
> ```
> We specified the CLUSTER_ID but it seems that the flinkConfig object is
> not handled correctly.
>
> We have the following flinkConfiguration defined in deployment.yaml:
> ```
> spec:
>
>   flinkConfiguration:
>
> execution.checkpointing.externalized-checkpoint-retention:
> RETAIN_ON_CANCELLATION
>
> execution.checkpointing.interval: 120s
>
> execution.checkpointing.min-pause: 120s
>
> execution.checkpointing.mode: AT_LEAST_ONCE
>
> execution.checkpointing.snapshot-compression: "false"
>
> execution.checkpointing.timeout: 3000s
>
> execution.checkpointing.tolerable-failed-checkpoints: "5"
>
> execution.checkpointing.unaligned: "false"
>
> fs.hdfs.hadoopconf: /opt/hadoop-conf/
>
> high-availability.storageDir: gs:///ha
>
> high-availability: kubernetes
>
> high-availability.cluster-id: 
>
> kubernetes.operator.periodic.savepoint.interval: 6h
>
> kubernetes.operator.savepoint.history.max.age: 72h
>
> kubernetes.operator.savepoint.history.max.count: "15"
>
> metrics.reporter.prom.class:
> org.apache.flink.metrics.prometheus.PrometheusReporter
>
> metrics.reporter.prom.port: "2112"
>
> metrics.reporters: prom
>
> rest.flamegraph.enabled: "false"
>
> state.backend: rocksdb
>
> state.backend.incremental: "false"
>
> state.backend.rocksdb.localdir: /rocksdb
>
> state.checkpoint-storage: filesystem
>
> state.checkpoints.dir: gs:///checkpoints
>
> state.savepoints.dir: gs:///savepoints
>
> taskmanager.memory.managed.fraction: "0"
>
> taskmanager.network.memory.buffer-debloat.enabled: "false"
>
> taskmanager.network.memory.buffer-debloat.period: "200"
>
> taskmanager.network.memory.buffers-per-channel: "2"
>
> taskmanager.network.memory.floating-buffers-per-gate: "8"
>
> taskmanager.network.memory.max-buffers-per-channel: "10"
>
> taskmanager.network.sort-shuffle.min-buffers: "512"
>
> taskmanager.numberOfTaskSlots: "1"
>
> kubernetes.taskmanager.cpu.limit-factor: "4"
>
> kubernetes.taskmanager.cpu: "0.5"
>
> kubernetes.cluster-id: 
>
> ```
> Have someone encountered the issue before?
>
> Thanks,
> PB
>


Re: Unable to Use spec.flinkVersion v1_17 with Flink Operator

2023-03-28 Thread Gyula Fóra
I think you forgot to upgrade the CRD during the upgrade process on your
cluster.

As you can see here:
https://github.com/apache/flink-kubernetes-operator/blob/release-1.4/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml#L38-L44
The newer version already contains support for 1.17.

For docs you can refer to:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/upgrade/#1-upgrading-the-crd


Cheers,
Gyula

On Tue, Mar 28, 2023 at 10:29 PM Ivan Webber via user 
wrote:

> Hello Flink Users:
>
>
>
> I'm trying to upgrade to use Flink 1.17.0 with my pipeline in order to
> have support for writing to Azure Data Lake Storage. However when I change
> the `spec.flinkVersion` to v1_17 I get an error message:
>
>
>
> ```bash
>
> The FlinkDeployment "test-replay-streaming-run" is invalid:
> spec.flinkVersion: Unsupported value: "v1_17": supported values: "v1_13",
> "v1_14", "v1_15", "v1_16"
>
> ```
>
>
>
> The documentation (here
> )
> says latest version of Flink Operator (1.4) should support 1.17.0 and I
> made sure to update by running the below commands. I’m wondering if an Enum
> needs updated or if the latest stable Flink Operator doesn’t actually
> support 1.17.0 yet. Any pointers would be appreciated.
>
>
>
> ```bash
>
> helm uninstall flink-kubernetes-operator
> flink-operator-repo/flink-kubernetes-operator
>
> helm repo rm flink-operator-repo
>
> helm repo add flink-operator-repo
> https://downloads.apache.org/flink/flink-kubernetes-operator-1.4.0/
>
> helm install flink-kubernetes-operator
> flink-operator-repo/flink-kubernetes-operator
>
> ```
>
>
>
> Thanks,
>
>
>
> Ivan Webber
>
>
>


Re: Flink K8s operator pod section of CRD

2023-02-23 Thread Gyula Fóra
Hey!
You are right, these fields could have been of the PodTemplate /
PodTemplateSpec type (probably PodTemplateSpec is actually better).

I think the reason why we used it is two fold:
 - Simple oversight :)
 - Flink itself "expects" the podtemplate in this form for the native
integration as you can see here:
https://github.com/apache/flink/blob/master/flink-kubernetes/src/test/resources/testing-pod-template.yaml

I think we could actually change Pod -> PodTemplateSpec without breaking
the api. Let me think about this and see.

Cheers,
Gyula

On Fri, Feb 24, 2023 at 1:47 AM Mason Chen  wrote:

> Hi all,
>
> Why does the FlinkDeployment CRD refer to the Pod class instead of the
> PodTemplate class from the fabric8 library? As far as I can tell, the only
> difference is that the Pod class exposes the PodStatus, which doesn't seem
> mutable. Thanks in advance!
>
> Best,
> Mason
>


[ANNOUNCE] Apache Flink Kubernetes Operator 1.4.0 released

2023-02-23 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.4.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Release highlights:

   - Flink Job Autoscaler initial implementation
   - Stability improvements
   - Support for Zookeeper based HA

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator applications can be
found at: https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/projects/FLINK/versions/12352604

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,

Gyula Fora


[ANNOUNCE] Apache Flink Kubernetes Operator 1.4.0 released

2023-02-23 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.4.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Release highlights:

   - Flink Job Autoscaler initial implementation
   - Stability improvements
   - Support for Zookeeper based HA

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator applications can be
found at: https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/projects/FLINK/versions/12352604

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,

Gyula Fora


Re: Kubernetes operator's merging strategy for template arrays

2023-02-23 Thread Gyula Fóra
If you are interested in helping to review this, here is the relevant
ticket and the PR I just opened:

https://issues.apache.org/jira/browse/FLINK-30786
https://github.com/apache/flink-kubernetes-operator/pull/535

Cheers,
Gyula

On Thu, Feb 23, 2023 at 2:10 PM Gyula Fóra  wrote:

> Hi!
>
> The current array merging strategy in the operator is basically an
> overwrite by position yes.
> I actually have a pending improvement to make this configurable and allow
> merging arrays by "name" attribute. This is generally more practical for
> such cases.
>
> Cheers,
> Gyula
>
> On Thu, Feb 23, 2023 at 1:37 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> I noticed that if I set environment variables in both spec.podTemplate &
>> spec.jobManager.podTemplate for the same container (flink-maincontainer),
>> the values from the latter selectively overwrite the values from the
>> former. For example, if I define something like this (omitting metadata
>> properties):
>>
>> spec:
>>   podTemplate:
>> spec:
>>   containers:
>>   - name: flink-main-container
>> env:
>>   - name: FOO
>> value: BAR
>>   - name: BAZ
>> value: BAK
>>   jobManager:
>> podTemplate:
>>   spec:
>> containers:
>> - name: flink-main-container
>>   env:
>> - name: EXTRA
>>   value: ENVVAR
>>
>> The final spec for the Job Manager Deployment will only contain EXTRA and
>> BAZ, so FOO is overwritten by EXTRA.
>>
>> Is this expected? I am already evaluating the latest release of the
>> operator (1.4.0).
>>
>> Regards,
>> Alexis.
>>
>


Re: Kubernetes operator's merging strategy for template arrays

2023-02-23 Thread Gyula Fóra
Hi!

The current array merging strategy in the operator is basically an
overwrite by position yes.
I actually have a pending improvement to make this configurable and allow
merging arrays by "name" attribute. This is generally more practical for
such cases.

Cheers,
Gyula

On Thu, Feb 23, 2023 at 1:37 PM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hello,
>
> I noticed that if I set environment variables in both spec.podTemplate &
> spec.jobManager.podTemplate for the same container (flink-maincontainer),
> the values from the latter selectively overwrite the values from the
> former. For example, if I define something like this (omitting metadata
> properties):
>
> spec:
>   podTemplate:
> spec:
>   containers:
>   - name: flink-main-container
> env:
>   - name: FOO
> value: BAR
>   - name: BAZ
> value: BAK
>   jobManager:
> podTemplate:
>   spec:
> containers:
> - name: flink-main-container
>   env:
> - name: EXTRA
>   value: ENVVAR
>
> The final spec for the Job Manager Deployment will only contain EXTRA and
> BAZ, so FOO is overwritten by EXTRA.
>
> Is this expected? I am already evaluating the latest release of the
> operator (1.4.0).
>
> Regards,
> Alexis.
>


Re: [EXTERNAL] Re: Query on flink-operator autoscale support

2023-02-01 Thread Gyula Fóra
As I mentioned in the previous email, standalone mode is not on the
Autoscaler roadmap because the scheduling/resource model is different.
This applies to both standalone app and session clusters.

Thanks
Gyula

On Wed, Feb 1, 2023 at 4:48 PM Swathi Chandrashekar 
wrote:

> Sure, thanks Gyula.
> Is there a roadmap to support standalone session clusters to scale based
> on the jobs added/deleted and change in parallelism ?
>
> Regards,
> Swathi C
>
> ------
> *From:* Gyula Fóra 
> *Sent:* Wednesday, February 1, 2023 8:54 PM
> *To:* Swathi Chandrashekar 
> *Cc:* user@flink.apache.org 
> *Subject:* [EXTERNAL] Re: Query on flink-operator autoscale support
>
> The autoscaler currently only works with Native App clusters.
> Native session clusters may be supported in the future but standalone is
> not on our roadmap due to a different resource/scheduling model used.
>
> Gyula
>
> On Wed, Feb 1, 2023 at 4:22 PM Swathi Chandrashekar 
> wrote:
>
> Hi,
>
> I'm was testing OSS flink operator with flink 1.17 for autoscale feature.
> I was able to scale the cluster based on load in application cluster in
> native mode, but the same did not work in standalone mode as the operator
> gave the following error as below [ both for app and session mode ].
>
> Is the autoscale supported for the following :
>
>1. Session cluster in standalone
>2. Session cluster in native
>3. App cluster in standalone
>
>
> Exception in thread "pool-4-thread-3" java.lang.NoSuchMethodError:
> org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient.create(Lorg/apache/flink/configuration/Configuration;Ljava/util/concurrent/ExecutorService;)Lorg/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient;
> at
> org.apache.flink.kubernetes.operator.service.StandaloneFlinkService.createNamespacedKubeClient(StandaloneFlinkService.java:105)
> at
> org.apache.flink.kubernetes.operator.service.StandaloneFlinkService.submitClusterInternal(StandaloneFlinkService.java:110)
> at
> org.apache.flink.kubernetes.operator.service.StandaloneFlinkService.deployApplicationCluster(StandaloneFlinkService.java:69)
> at
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitApplicationCluster(AbstractFlinkService.java:180)
> at
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:175)
> at
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:58)
> at
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:115)
> at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:130)
> at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
> at
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136)
> at
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94)
> at
> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
> at
> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93)
> at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130)
> at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110)
> at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:81)
> at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
> at
> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:834)
>
>
> Regards,
> Swathi C
>
>


Re: Query on flink-operator autoscale support

2023-02-01 Thread Gyula Fóra
The autoscaler currently only works with Native App clusters.
Native session clusters may be supported in the future but standalone is
not on our roadmap due to a different resource/scheduling model used.

Gyula

On Wed, Feb 1, 2023 at 4:22 PM Swathi Chandrashekar 
wrote:

> Hi,
>
> I'm was testing OSS flink operator with flink 1.17 for autoscale feature.
> I was able to scale the cluster based on load in application cluster in
> native mode, but the same did not work in standalone mode as the operator
> gave the following error as below [ both for app and session mode ].
>
> Is the autoscale supported for the following :
>
>1. Session cluster in standalone
>2. Session cluster in native
>3. App cluster in standalone
>
>
> Exception in thread "pool-4-thread-3" java.lang.NoSuchMethodError:
> org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient.create(Lorg/apache/flink/configuration/Configuration;Ljava/util/concurrent/ExecutorService;)Lorg/apache/flink/kubernetes/operator/kubeclient/Fabric8FlinkStandaloneKubeClient;
> at
> org.apache.flink.kubernetes.operator.service.StandaloneFlinkService.createNamespacedKubeClient(StandaloneFlinkService.java:105)
> at
> org.apache.flink.kubernetes.operator.service.StandaloneFlinkService.submitClusterInternal(StandaloneFlinkService.java:110)
> at
> org.apache.flink.kubernetes.operator.service.StandaloneFlinkService.deployApplicationCluster(StandaloneFlinkService.java:69)
> at
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitApplicationCluster(AbstractFlinkService.java:180)
> at
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:175)
> at
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:58)
> at
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:115)
> at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:130)
> at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
> at
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:136)
> at
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:94)
> at
> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
> at
> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:93)
> at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:130)
> at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:110)
> at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:81)
> at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
> at
> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:834)
>
>
> Regards,
> Swathi C
>


Re: "Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-31 Thread Gyula Fóra
Thanks @Anton Ippolitov 
At this stage I would highly recommend the native mode if you have the
liberty to try that.
I think that has better production characteristics and will work out of the
box with the autoscaler. (the standalone mode won't)

Gyula

On Tue, Jan 31, 2023 at 10:41 AM Anton Ippolitov <
anton.ippoli...@datadoghq.com> wrote:

> I am using the Standalone Mode indeed, should've mentioned it right away.
> This fix looks exactly like what I need, thank you!!
>
> On Tue, Jan 31, 2023 at 9:16 AM Gyula Fóra  wrote:
>
>> There is also a pending fix for the standalone + k8s HA case :
>> https://github.com/apache/flink-kubernetes-operator/pull/518
>>
>> You could maybe try and review the fix :)
>>
>> Gyula
>>
>> On Tue, Jan 31, 2023 at 8:36 AM Yang Wang 
>> wrote:
>>
>>> I assume you are using the standalone mode. Right?
>>>
>>> For the native K8s mode, the leader address should be 
>>> *akka.tcp://flink@JM_POD_IP:6123/user/rpc/dispatcher_1
>>> *when HA enabled.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Anton Ippolitov via user  于2023年1月31日周二 00:21写道:
>>>
>>>> This is actually what I'm already doing, I'm only setting 
>>>> high-availability:
>>>> kubernetes myself. The other values are either defaults or set by the
>>>> Operator:
>>>> - jobmanager.rpc.port: 6123 is the default value (docs
>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#hosts-and-ports>
>>>> )
>>>> -  high-availability.jobmanager.port: 6123 is set by the Operator here
>>>> <https://github.com/apache/flink-kubernetes-operator/blob/7d5bf9536bdfbf86de5803766b28e503cd32ee04/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java#L141-L144>
>>>>
>>>> - jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE is set by
>>>> the Operator here
>>>> <https://github.com/apache/flink-kubernetes-operator/blob/261fed2076efe385ede148152c946eb7c5f1f48d/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactory.java#L80>
>>>>  (the
>>>> actual code which gets executed is here
>>>> <https://github.com/apache/flink/blob/1ae09ae6942eaaf5a6197c68fc12ee4e9fc1a105/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java#L60-L66>
>>>> )
>>>>
>>>>  Looking at what the Lyft Operator is doing here
>>>> <https://github.com/lyft/flinkk8soperator/blob/435640258b72d9c9efdbadc3579411224f510a72/pkg/controller/flink/container_utils.go#L215>,
>>>>  I thought
>>>> this would be a common issue but since you've never seen this error before,
>>>> not sure what to do 樂
>>>>
>>>> On Fri, Jan 27, 2023 at 10:59 PM Gyula Fóra 
>>>> wrote:
>>>>
>>>>> We never encountered this problem before but also we don't configure
>>>>> those settings.
>>>>> Can you simply try:
>>>>>
>>>>> high-availability: kubernetes
>>>>>
>>>>> And remove the other configs? I think that can only cause problems and
>>>>> should not achieve anything :)
>>>>>
>>>>> Gyula
>>>>>
>>>>> On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user <
>>>>> user@flink.apache.org> wrote:
>>>>>
>>>>>> Hi everyone,
>>>>>>
>>>>>> I've been experimenting with Kubernetes HA and the Kubernetes
>>>>>> Operator and ran into the following issue which is happening regularly on
>>>>>> TaskManagers with Flink 1.16:
>>>>>>
>>>>>> Error while retrieving the leader gateway. Retrying to connect to 
>>>>>> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
>>>>>> org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not 
>>>>>> complete the operation. Number of retries has been exhausted.
>>>>>>
>>>>>> (The whole stacktrace is quite long, I put it in a Github Gist here
>>>>>> <https://gist.github.com/antonipp/41b4cb732522a91799e0f57ea96805a3>.
>>>>>> Note that I put placeholder values for the Kubernetes Service name and 
>>>>>> the

Re: "Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-31 Thread Gyula Fóra
There is also a pending fix for the standalone + k8s HA case :
https://github.com/apache/flink-kubernetes-operator/pull/518

You could maybe try and review the fix :)

Gyula

On Tue, Jan 31, 2023 at 8:36 AM Yang Wang  wrote:

> I assume you are using the standalone mode. Right?
>
> For the native K8s mode, the leader address should be 
> *akka.tcp://flink@JM_POD_IP:6123/user/rpc/dispatcher_1
> *when HA enabled.
>
>
> Best,
> Yang
>
> Anton Ippolitov via user  于2023年1月31日周二 00:21写道:
>
>> This is actually what I'm already doing, I'm only setting high-availability:
>> kubernetes myself. The other values are either defaults or set by the
>> Operator:
>> - jobmanager.rpc.port: 6123 is the default value (docs
>> <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#hosts-and-ports>
>> )
>> -  high-availability.jobmanager.port: 6123 is set by the Operator here
>> <https://github.com/apache/flink-kubernetes-operator/blob/7d5bf9536bdfbf86de5803766b28e503cd32ee04/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/standalone/KubernetesStandaloneClusterDescriptor.java#L141-L144>
>>
>> - jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE is set by the
>> Operator here
>> <https://github.com/apache/flink-kubernetes-operator/blob/261fed2076efe385ede148152c946eb7c5f1f48d/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesJobManagerFactory.java#L80>
>>  (the
>> actual code which gets executed is here
>> <https://github.com/apache/flink/blob/1ae09ae6942eaaf5a6197c68fc12ee4e9fc1a105/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InternalServiceDecorator.java#L60-L66>
>> )
>>
>>  Looking at what the Lyft Operator is doing here
>> <https://github.com/lyft/flinkk8soperator/blob/435640258b72d9c9efdbadc3579411224f510a72/pkg/controller/flink/container_utils.go#L215>,
>>  I thought
>> this would be a common issue but since you've never seen this error before,
>> not sure what to do 樂
>>
>> On Fri, Jan 27, 2023 at 10:59 PM Gyula Fóra  wrote:
>>
>>> We never encountered this problem before but also we don't configure
>>> those settings.
>>> Can you simply try:
>>>
>>> high-availability: kubernetes
>>>
>>> And remove the other configs? I think that can only cause problems and
>>> should not achieve anything :)
>>>
>>> Gyula
>>>
>>> On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user <
>>> user@flink.apache.org> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> I've been experimenting with Kubernetes HA and the Kubernetes Operator
>>>> and ran into the following issue which is happening regularly on
>>>> TaskManagers with Flink 1.16:
>>>>
>>>> Error while retrieving the leader gateway. Retrying to connect to 
>>>> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
>>>> org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not 
>>>> complete the operation. Number of retries has been exhausted.
>>>>
>>>> (The whole stacktrace is quite long, I put it in a Github Gist here
>>>> <https://gist.github.com/antonipp/41b4cb732522a91799e0f57ea96805a3>.
>>>> Note that I put placeholder values for the Kubernetes Service name and the
>>>> Namespace name)
>>>>
>>>> The job configuration has the following values which should be relevant:
>>>> high-availability: kubernetes
>>>> high-availability.jobmanager.port: 6123
>>>> jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
>>>> jobmanager.rpc.port: 6123
>>>>
>>>> Looking a bit more into the logs, I can see that the Akka Actor System
>>>> is started with an external address pointing to the Kubernetes Service
>>>> defined by jobmanager.rpc.address:
>>>> Trying to start actor system, external
>>>> address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address
>>>> 0.0.0.0:6123.
>>>> Actor system started at
>>>> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123
>>>>
>>>> (I believe the external address for the Akka Actor System is set to
>>>> jobmanager.rpc.address from this place
>>>> <https://github.com/apache/flink/blob/0141f13ca801d5db45435d101a9c3ef83889bbc0/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#

Re: "Error while retrieving the leader gateway" when using Kubernetes HA

2023-01-27 Thread Gyula Fóra
We never encountered this problem before but also we don't configure those
settings.
Can you simply try:

high-availability: kubernetes

And remove the other configs? I think that can only cause problems and
should not achieve anything :)

Gyula

On Fri, Jan 27, 2023 at 6:44 PM Anton Ippolitov via user <
user@flink.apache.org> wrote:

> Hi everyone,
>
> I've been experimenting with Kubernetes HA and the Kubernetes Operator and
> ran into the following issue which is happening regularly on TaskManagers
> with Flink 1.16:
>
> Error while retrieving the leader gateway. Retrying to connect to 
> akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1.
> org.apache.flink.util.concurrent.FutureUtils$RetryException: Could not 
> complete the operation. Number of retries has been exhausted.
>
> (The whole stacktrace is quite long, I put it in a Github Gist here
> . Note
> that I put placeholder values for the Kubernetes Service name and the
> Namespace name)
>
> The job configuration has the following values which should be relevant:
> high-availability: kubernetes
> high-availability.jobmanager.port: 6123
> jobmanager.rpc.address: SERVICE-NAME-HERE.NAMESPACE-HERE
> jobmanager.rpc.port: 6123
>
> Looking a bit more into the logs, I can see that the Akka Actor System is
> started with an external address pointing to the Kubernetes Service defined
> by jobmanager.rpc.address:
> Trying to start actor system, external
> address SERVICE-NAME-HERE.NAMESPACE-HERE:6123, bind address 0.0.0.0:6123.
> Actor system started at akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
> :6123
>
> (I believe the external address for the Akka Actor System is set to
> jobmanager.rpc.address from this place
> 
> in the code but I might be wrong)
>
> I can also see these logs for the Dispatcher RPC endpoint:
> Starting RPC endpoint for
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
> akka://flink/user/rpc/dispatcher_1 .
> Successfully wrote leader information
> LeaderInformation{leaderSessionID='8fd2bda3-1775-4b51-bf63-8da385247a18',
> leaderAddress=akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE:6123/user/rpc/dispatcher_1}
> for leader dispatcher into the config map JOB-NAME-HERE-cluster-config-map.
>
> I confirmed that the HA ConfigMap contains an address which also uses the
> Kubernetes Service defined by jobmanager.rpc.address:
> $ kubectl get cm JOB-NAME-HERE-cluster-config-map -o json | jq -r
> '.data["org.apache.flink.k8s.leader.dispatcher"]'
>
> ce33b6d4-a55f-475c-9b6e-b21d25c8e6b5,akka.tcp://flink@SERVICE-NAME-HERE.NAMESPACE-HERE
> :6123/user/rpc/dispatcher_1
>
> When looking at the code of the Operator and Flink itself, I can see
> that jobmanager.rpc.address is set automatically by the
> InternalServiceDecorator
> 
>  and
> it points to the Kubernetes Service.
> However, the comment
> 
> above clearly says that "only the non-HA scenario relies on this Service
> for internal communication, since in the HA mode, the TaskManager(s)
> directly connects to the JobManager via IP address." According to the docs
> ,
> jobmanager.rpc.address "is ignored on setups with high-availability where
> the leader election mechanism is used to discover this automatically."
>
> This is not what I'm observing as it seems that despite enabling HA, the
> TaskManagers don't use IP addresses but still use this Kubernetes Service
> for JM communication.
>
> Moreover, I've used the Lyft Kubernetes Operator before and it has these
> interesting lines in the code:
> https://github.com/lyft/flinkk8soperator/blob/435640258b72d9c9efdbadc3579411224f510a72/pkg/controller/flink/container_utils.go#L212-L216
> It explicitly sets jobmanager.rpc.address to the host IPs.
>
> Am I misconfiguring or misunderstanding something? Is there any way to fix
> these errors?
>
> Thanks!
> Anton
>


Re: PyFlink job in kubernetes operator

2023-01-25 Thread Gyula Fóra
Did you check the Python example?
https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-python-example

Gyula

On Wed, Jan 25, 2023 at 2:54 PM Evgeniy Lyutikov 
wrote:

> Hello
>
> Is there a way to run PyFlink jobs in k8s with flink kubernetes operator?
> And if not, is it planned to add such functionality?
>
>
> * -- *“This message contains confidential
> information/commercial secret. If you are not the intended addressee of
> this message you may not copy, save, print or forward it to any third party
> and you are kindly requested to destroy this message and notify the sender
> thereof by email.
> Данное сообщение содержит конфиденциальную информацию/информацию,
> являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом
> данного сообщения, Вы не вправе копировать, сохранять, печатать или
> пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и
> уведомить об этом отправителя электронным письмом.”
>


Job stuck in CREATED state with scheduling failures

2023-01-21 Thread Gyula Fóra
Hi Devs!

We noticed a very strange failure scenario a few times recently with the
Native Kubernetes integration.

The issue is triggered by a heartbeat timeout (a temporary network
problem). We observe the following behaviour:

===
3 pods (1 JM, 2 TMs), Flink 1.15 (Kubernetes Native Integration):

1. Temporary network problem
 - Heartbeat failure, TM1 loses JM connection and JM loses TM1 connection.
 - Both the JM and TM1 trigger the job failure on their sides and cancel
the tasks
 - JM releases TM1 slots

2. While failing/cancelling the job, the network connection recovers and
TM1 reconnects to JM:
*TM1: Resolved JobManager address, beginning registration*

3. JM tries to resubmit the job using TM1 + TM2 but the scheduler keeps
failing as it cannot seem to allocate all the resources:

*NoResourceAvailableException: Slot request bulk is not fulfillable! Could
not allocate the required slot within slot request timeout*
On TM1 we see the following logs repeating (mutliple times every few
seconds until the slot request times out after 5 minutes):
*Receive slot request ... for job ... from resource manager with leader id
...*
*Allocated slot for ...*
*Receive slot request ... for job ... from resource manager with leader id
...*
*Allocated slot for *
*Free slot TaskSlot(index:0, state:ALLOCATED, resource profile:
ResourceProfile{...}, allocationId: ..., jobId: ...).*

While all these are happening on TM1 we don't see any allocation related
INFO logs on TM2.
===

Seems like something weird happens when TM1 reconnects after the heartbeat
loss. I feel that the JM should probably shut down the TM and create a new
one. But instead it gets stuck.

Any ideas what could be happening here?

Thanks
Gyula


Re: Kubernetes JobManager and TaskManager minimum/maximum resources

2023-01-21 Thread Gyula Fóra
But of course the actual memory requirement will largely depend on the type
of job, statebackend , number of task slots etc

Production TM/JMs usually have much more resources allocated than 2gb/1cpu
as you never want to run out of it :)

Gyula

On Sat, 21 Jan 2023 at 11:17, Gyula Fóra  wrote:

> Hi!
>
> I think the examples allocate too many resources by default and we should
> reduce it in the yamls.
>
> 1gb memory and 0.5 cpu should be more than enough , we could probably get
> away with even less for example purposes.
>
> Would you have time trying this out and maybe contributing this
> improvement? :)
>
> Thanks
> Gyula
>
>
> On Fri, 20 Jan 2023 at 05:32, Lee Parayno  wrote:
>
>> For application mode FlinkDeployments (maybe even session mode) in
>> Kubernetes from the Flink Kubernetes Operator what is the absolute minimum
>> amount of CPU and RAM that is required to run the JobManager and
>> TaskManager processes?
>>
>> Some of the example deployment yaml examples have CPU set at 1 full vCPU
>> and memory at 2GB (2048 MB).  If you factor in JobManager HA, and 1 or more
>> TaskManagers (not sure what is the bounding limit for these processes), you
>> can be at 3 vCPU and 6 GB memory used just by the “Flink Infrastructure”
>> not counting the Job pods.
>>
>> Has anyone seen a need to have more resources dedicated to these
>> processes for some reason?  Has anyone run it leaner than this (like with
>> 0.5 vCPU and less than 1GB memory) in production?
>>
>> Comparing this to Google Cloud Platform and the Dataflow Runner, AFAIK
>> the only resources utilized (that customers pay for) are the Job instances.
>>
>> Lee Parayno
>> Sent from my iPhone
>
>


Re: Kubernetes JobManager and TaskManager minimum/maximum resources

2023-01-21 Thread Gyula Fóra
Hi!

I think the examples allocate too many resources by default and we should
reduce it in the yamls.

1gb memory and 0.5 cpu should be more than enough , we could probably get
away with even less for example purposes.

Would you have time trying this out and maybe contributing this
improvement? :)

Thanks
Gyula


On Fri, 20 Jan 2023 at 05:32, Lee Parayno  wrote:

> For application mode FlinkDeployments (maybe even session mode) in
> Kubernetes from the Flink Kubernetes Operator what is the absolute minimum
> amount of CPU and RAM that is required to run the JobManager and
> TaskManager processes?
>
> Some of the example deployment yaml examples have CPU set at 1 full vCPU
> and memory at 2GB (2048 MB).  If you factor in JobManager HA, and 1 or more
> TaskManagers (not sure what is the bounding limit for these processes), you
> can be at 3 vCPU and 6 GB memory used just by the “Flink Infrastructure”
> not counting the Job pods.
>
> Has anyone seen a need to have more resources dedicated to these processes
> for some reason?  Has anyone run it leaner than this (like with 0.5 vCPU
> and less than 1GB memory) in production?
>
> Comparing this to Google Cloud Platform and the Dataflow Runner, AFAIK the
> only resources utilized (that customers pay for) are the Job instances.
>
> Lee Parayno
> Sent from my iPhone


Re: DuplicateJobSubmissionException on restart after taskmanagers crash

2023-01-21 Thread Gyula Fóra
Hi Javier,

I will try to look into this as I have not personally seen this problem
while using the operator .

It would be great if you could reach out to me on slack or email directly
so we can discuss the issue and get to the bottom of it.

Cheer
Gyula

On Fri, 20 Jan 2023 at 23:53, Javier Vegas  wrote:

> My issue is described in https://issues.apache.org/jira/browse/FLINK-21928
> where it says was fixed in 1.14, but I am still seeing the problem.
> Although there it says:
>
> "Additionally, it is still required that the user cleans up the
> corresponding HA entries for the running jobs registry because these
> entries won't be reliably cleaned up when encountering the situation
> described by FLINK-21928
> ."
>
> so I guess I need to do some manual cleanup of my S3 HA data before
> restarting
>
> El vie, 20 ene 2023 a las 4:58, Javier Vegas ()
> escribió:
>
>>
>> I have a Flink app (Flink 1.16.0, deployed to Kubernetes via operator
>> 1.3.1 and using Kubernetes HighAvailaibilty with storage in S3) that
>> depends on multiple Thrift services for data queries. When one of those
>> services is down (or throws exceptions) the Flink job managers end up
>> crashing and only the task managers remain up. Once the dependencies are
>> fixed, when I try to restart the Flink app I end up with a
>> "DuplicateJobSubmissionException: Job has already been submitted" (see
>> below for detailed log) and the task managers never start. The only
>> solution I have found is to delete the deployment from Kubernetes and then
>> deploy again as a new job.
>>
>> 1) Is there a better way to handle failures on dependencies than letting
>> task managers crash and keep job managers up, and restart after
>> dependencies are fixed?
>> 1) If not, is there a way to handle the DuplicateJobSubmissionException
>> so the Flink app can be restarted without having to uninstall it first?
>>
>> Thanks,
>>
>> Javier Vegas
>>
>>
>> org.apache.flink.util.FlinkException: Failed to execute job
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
>> Caused by:
>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
>> already been submitted.
>> at
>> org.apache.flink.runtime.client.DuplicateJobSubmissionException.ofGloballyTerminated(DuplicateJobSubmissionException.java:35)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:449)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
>> Source)
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
>> Source)
>> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
>> at
>> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> at akka.actor.Actor.aroundReceive(Actor.scala:537)
>> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>> ... 5 more
>> Exception thrown in main on startup
>>
>>
>>
>


Re: Flink Kubernetes Operator podTemplate and 'app' pod label bug?

2023-01-19 Thread Gyula Fóra
To clarify this logic is inherited from the Flink Native Kubernetes
integration itself. The operator specific labels we use are already fully
qualified.
I agree that this could be improved in Flink by a better label.

Cheers,
Gyula

On Thu, Jan 19, 2023 at 11:00 PM Mason Chen  wrote:

> @Andrew I was also confused by this earlier and FYI this line where it is
> referenced
> https://github.com/apache/flink-kubernetes-operator/blame/7d5bf9536bdfbf86de5803766b28e503cd32ee04/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/utils/StandaloneKubernetesUtils.java#L43
>
> On Thu, Jan 19, 2023 at 1:59 PM Őrhidi Mátyás 
> wrote:
>
>> On a side note, we should probably use a qualified label name instead of
>> the pretty common app here. WDYT Gyula?
>>
>> On Thu, Jan 19, 2023 at 1:48 PM Gyula Fóra  wrote:
>>
>>> Hi!
>>>
>>> The app label itself is used by Flink internally for a different purpose
>>> so it’s overriden. This is completely expected.
>>>
>>> I think it would be better to use some other label :)
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Thu, 19 Jan 2023 at 19:02, Andrew Otto  wrote:
>>>
>>>> Hello!
>>>>
>>>> I'm seeing an unexpected label value assignment happening, and I'm not
>>>> sure how it's happening.  It is possible it is in my own helm charts and
>>>> templates somewhere, but I'm not seeing it, so I'm beginning to think this
>>>> is happening in the FlinkDeployment CRD in the operator code somewhere.
>>>>
>>>> I'm using FlinkDeployment podTemplate
>>>> <https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/>
>>>> to add an 'app' label:
>>>>
>>>>  podTemplate:
>>>> apiVersion: v1
>>>> kind: Pod
>>>> metadata:
>>>>   labels:
>>>> app: flink-app
>>>> release: flink-example
>>>> ...
>>>>
>>>> I also have this app label set in the FlinkDeployment labels:
>>>>
>>>> kind: FlinkDeployment
>>>> metadata:
>>>>   name: flink-app-flink-example
>>>>   labels:
>>>> app: flink-app
>>>> chart: flink-app-0.1.1
>>>> release: flink-example
>>>>
>>>> Since I've set app: flink-app in the podTemplate, I would expect all
>>>> pods to get this label.  The FlinkDeployment resource has this label
>>>> value as expected.  However, I see that in the pods, as well as the
>>>> Deployment that are created by FlinkDeployment:
>>>>
>>>> *$ kubectl -n flink-app0 describe deployments flink-app-flink-example*
>>>> ...
>>>> Name:   flink-app-flink-example
>>>> Namespace:  flink-app0
>>>> CreationTimestamp:  Thu, 19 Jan 2023 12:42:05 -0500
>>>> Labels: app=flink-app-flink-example
>>>> component=jobmanager
>>>> ...
>>>>
>>>> Pod Template:
>>>>   Labels:   app=flink-app-flink-example
>>>> component=jobmanager
>>>> release=flink-example
>>>> ...
>>>>
>>>>
>>>> *$ kubectl -n flink-app0 describe pod
>>>> flink-app-flink-example-d974cb595-788ch*
>>>> ...
>>>> Labels:   app=flink-app-flink-example
>>>>   component=jobmanager
>>>>   pod-template-hash=d974cb595
>>>>   release=flink-example
>>>> ...
>>>>
>>>>
>>>> I'd expect the app label to be 'flink-app' for at least the Deployment
>>>> PodTemplate and the Pod, if not the Deployment itself too.
>>>>
>>>> Something is overriding the app label in podTemplate, and I don't think
>>>> it's my chart or installation.  I looked in flink-kubernetes-operator code
>>>> and I didn't find where this was happening either.  I am not setting e.g.
>>>> kubernetes.jobmanager.labels
>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#kubernetes-jobmanager-labels>
>>>> .
>>>>
>>>> Is this expected?
>>>>
>>>> Thank you!
>>>>
>>>> -Andrew Otto
>>>>  Wikimedia Foundation
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>


Re: Flink Kubernetes Operator podTemplate and 'app' pod label bug?

2023-01-19 Thread Gyula Fóra
Hi!

The app label itself is used by Flink internally for a different purpose so
it’s overriden. This is completely expected.

I think it would be better to use some other label :)

Cheers,
Gyula

On Thu, 19 Jan 2023 at 19:02, Andrew Otto  wrote:

> Hello!
>
> I'm seeing an unexpected label value assignment happening, and I'm not
> sure how it's happening.  It is possible it is in my own helm charts and
> templates somewhere, but I'm not seeing it, so I'm beginning to think this
> is happening in the FlinkDeployment CRD in the operator code somewhere.
>
> I'm using FlinkDeployment podTemplate
> 
> to add an 'app' label:
>
>  podTemplate:
> apiVersion: v1
> kind: Pod
> metadata:
>   labels:
> app: flink-app
> release: flink-example
> ...
>
> I also have this app label set in the FlinkDeployment labels:
>
> kind: FlinkDeployment
> metadata:
>   name: flink-app-flink-example
>   labels:
> app: flink-app
> chart: flink-app-0.1.1
> release: flink-example
>
> Since I've set app: flink-app in the podTemplate, I would expect all pods
> to get this label.  The FlinkDeployment resource has this label value as
> expected.  However, I see that in the pods, as well as the Deployment
> that are created by FlinkDeployment:
>
> *$ kubectl -n flink-app0 describe deployments flink-app-flink-example*
> ...
> Name:   flink-app-flink-example
> Namespace:  flink-app0
> CreationTimestamp:  Thu, 19 Jan 2023 12:42:05 -0500
> Labels: app=flink-app-flink-example
> component=jobmanager
> ...
>
> Pod Template:
>   Labels:   app=flink-app-flink-example
> component=jobmanager
> release=flink-example
> ...
>
>
> *$ kubectl -n flink-app0 describe pod
> flink-app-flink-example-d974cb595-788ch*
> ...
> Labels:   app=flink-app-flink-example
>   component=jobmanager
>   pod-template-hash=d974cb595
>   release=flink-example
> ...
>
>
> I'd expect the app label to be 'flink-app' for at least the Deployment
> PodTemplate and the Pod, if not the Deployment itself too.
>
> Something is overriding the app label in podTemplate, and I don't think
> it's my chart or installation.  I looked in flink-kubernetes-operator code
> and I didn't find where this was happening either.  I am not setting e.g.
> kubernetes.jobmanager.labels
> 
> .
>
> Is this expected?
>
> Thank you!
>
> -Andrew Otto
>  Wikimedia Foundation
>
>
>
>
>
>


Re: Regarding Changes in Flink Operator 1.3.1

2023-01-18 Thread Gyula Fóra
Please see the release announcements:
https://flink.apache.org/news/2022/10/07/release-kubernetes-operator-1.2.0.html
https://flink.apache.org/news/2022/12/14/release-kubernetes-operator-1.3.0.html
https://flink.apache.org/news/2023/01/10/release-kubernetes-operator-1.3.1.html

https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/upgrade/

Gyula

On Thu, Jan 19, 2023 at 8:17 AM Sumit Aich  wrote:

> also are the changes in operator version 1.3.1 backward compatible ?
>
> On Thu, Jan 19, 2023 at 12:38 PM Sumit Aich  wrote:
>
>> Hi Team,
>>
>> Can you please share what has changed in Flink Kubernetes Operator
>> version 1.3.1 from the 1.1.0 version.
>>
>> Thanks,
>> Sumit
>>
>


  1   2   3   4   >