Re: [External] Regarding java.lang.IllegalStateException

2024-04-26 Thread Maxim Senin via user
My guess it’s a major known issue. Need a workaround.

https://issues.apache.org/jira/browse/FLINK-32212

/Maxim

From: prashant parbhane 
Date: Tuesday, April 23, 2024 at 11:09 PM
To: user@flink.apache.org 
Subject: [External] Regarding java.lang.IllegalStateException
Hello,

We have been facing this weird issue, where we are getting below exception and 
the job is getting restarted with new task managers. We are using flink 1.17.
Same job works fine with lower number of task managers. (<10)

java.lang.IllegalStateException: The library registration references a 
different set of library BLOBs than previous registrations for this job:
old:[p-ecf88f3c5c35842d7bca235cfd7592c53f9fdbe0-e06122dd51cccba866932318dc031d68]
new:[p-ecf88f3c5c35842d7bca235cfd7592c53f9fdbe0-7651f69109c915ac830aa42ce2ab67f0]
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419)
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCacheManager.java:359)
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:235)
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:202)
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:336)
at 
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1041)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:624)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Unknown Source)

Thanks,
Prashant



COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this email 
is confidential and is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be taken in 
reliance on it, is prohibited and may be unlawful.


Re: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0

2024-04-26 Thread Gyula Fóra
I was talking about Flink Kubernetes operator and HA , not the individual
Flink jobs. But based on your answer it’s probably not the cause

Gyula

On Fri, 26 Apr 2024 at 21:15, Maxim Senin  wrote:

> Hi, Gyula. Thanks for the tips.
>
> All jobs are deployed in a single namespace, “flink”.
>
> Which replicas? The JM replicas are already 1, I tried with TM replicas
> set to 1, but same exception happens. We have only 1 instance of the
> operator (replicas=1) in this environment.
>
> The only workarounds I discovered is either
> a) disable autoscaling for the failing job (autoscaler scales the job to
> zero for “gracefully” stopping it and then never starts it)  or
> b) some jobs that keep restarting can be fixed by disabling HA for that job
>
> And ` *Cannot rescale the given pointwise partitioner.` *is also still a
> mystery.
>
> *Thanks,*
>
> *Maxim*
>
>
>
> *From: *Gyula Fóra 
> *Date: *Friday, April 26, 2024 at 1:10 AM
> *To: *Maxim Senin 
> *Cc: *Maxim Senin via user 
> *Subject: *Re: [External] Exception during autoscaling operation - Flink
> 1.18/Operator 1.8.0
>
> Hi Maxim!
>
>
>
> Regarding the status update error, it could be related to a problem that
> we have discovered recently with the Flink Operator HA. Where during a
> namespace change both leader and follower instances would start processing.
>
> It has been fixed in the current master by updating the JOSDK version to
> the one containing the fix.
>
>
>
> For details you can check:
>
> https://github.com/operator-framework/java-operator-sdk/issues/2341
>
>
> https://github.com/apache/flink-kubernetes-operator/commit/29a4aa5adf101920cbe1a3a9a178ff16f52c746d
>
>
>
> To resolve the issue (if it's caused by this), you could either
> cherry-pick the fix internally to the operator or reduce the replicas to 1
> if you are using HA.
>
>
>
> Cheers,
>
> Gyula
>
>
>
>
>
>
>
> On Fri, Apr 26, 2024 at 3:48 AM Maxim Senin via user <
> user@flink.apache.org> wrote:
>
> I have also seen this exception:
>
> o.a.f.k.o.o.JobStatusObserver  
> *[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6]
> Job d0ac9da5959d8cc9a82645eeef6751a5 failed with error:
> java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException:
> java.lang.UnsupportedOperationException: Cannot rescale the given pointwise
> partitioner.*
>
> *Did you change the partitioner to forward or rescale?*
>
> *It may also help to add an explicit shuffle().*
>
> *at
> org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305)*
>
> *at
> java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)*
>
> *at
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown
> Source)*
>
> *at
> java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown
> Source)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)*
>
> *at
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)*
>
> *at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)*
>
> *at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)*
>
> *at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:127)*
>
> *at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)*
>
> *at
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)*
>
> I can’t find any information on how to interpret this. Please advise..
>
> Cheers,
> Maxim
>
> *From: *Maxim Senin via user 
> *Date: *Thursday, April 25, 2024 at 12:01 PM
> *To: *Maxim Senin via user 
> *Subject: *[External] Exception during autoscaling operation - Flink
> 1.18/Operator 1.8.0
>
> Hi.
>
> I already asked before but never got an answer. My observation is that the
> operator, after collecting some stats, is trying to restart one of the
> deployments. This includes taking a savepoint (`takeSavepointOnUpgrade:
> true`, `upgradeMode: savepoint`) and “gracefully” shutting down the
> JobManager by “scaling it to zero” (by setting replicas = 0 in the new
> generated config).
>
> However, the deployment never comes back up, apparently, due to exception:
>
>
> 2024-04-25 17:20:52,920 mi.j.o.p.e.ReconciliationDispatcher *[ERROR]*
> [flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] 

Re: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0

2024-04-26 Thread Maxim Senin via user
Here’s more detail on the UnsupportedOperation exception. The job starts, 
operator collects some stats and then the job dies, apparently on rescaling op:

[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] >>> Event  | Info| 
JOBSTATUSCHANGED | Job status changed from CREATED to RUNNING
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] The derived from fraction 
jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] >>> Status | Info| 
STABLE  | The resource deployment is considered to be stable and won't 
be rolled back
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Resource fully 
reconciled, nothing to do...
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] The derived from fraction 
jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Stabilizing until 
2024-04-26 19:22:43
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Creating config map 
autoscaler-f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] The derived from fraction 
jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 
192.000mb (201326592 bytes), min value will be used instead
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] The derived from fraction 
jvm overhead memory (1.000gb (1073741840 bytes)) is greater than its max value 
1024.000mb (1073741824 bytes), max value will be used instead
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Stabilizing until 
2024-04-26 19:22:43
[INFO ][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Resource fully 
reconciled, nothing to do...
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Job 
f5179d479dc1921693ffeb3797345458 failed with error: 
java.util.concurrent.CompletionException: 
java.util.concurrent.CompletionException: 
java.lang.UnsupportedOperationException: Cannot rescale the given pointwise 
partitioner.
Did you change the partitioner to forward or rescale?
It may also help to add an explicit shuffle().
at 
org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown 
Source)
at 
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
Source)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown 
Source)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
at 
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: java.util.concurrent.CompletionException: 
java.lang.UnsupportedOperationException: Cannot rescale the given pointwise 
partitioner.
Did you change the partitioner to forward or 

Re: Regarding java.lang.IllegalStateException

2024-04-26 Thread Maxim Senin via user
We are also seeing something similar:

2024-04-26 16:30:44,401 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source: Power 
Consumption:power_consumption -> Ingest Power Consumption -> PopSysFields -> 
WindowingWatermarkPreCheck (1/1) 
(cb8c425b6463b1ade9b8359c0514386b_28bc590bb7896e1df191c306d7cb6d23_0_11) 
switched from DEPLOYING to FAILED on 
f-a9ad4438-cddf-512f-94c6-c5f921f66078-taskmanager-1-1 @ 10.111.164.30 
(dataPort=42621).
java.lang.IllegalStateException: The library registration references a 
different set of library BLOBs than previous registrations for this job:
old:[p-1b01ac6374a137939ffa18432714b7c9af30dc3f-522999dc4412a76d33728a97225de573]
new:[p-1b01ac6374a137939ffa18432714b7c9af30dc3f-819e2ecffa2f22c5e0f4d88ef5789421]
  at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:437)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$600(BlobLibraryCacheManager.java:373)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:249)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1200(BlobLibraryCacheManager.java:210)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:350)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at 
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1047)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:628) 
~[flink-dist-1.19.0.jar:1.19.0]
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
~[flink-dist-1.19.0.jar:1.19.0]
  at java.lang.Thread.run(Unknown Source) ~[?:?]
2024-04-26 16:30:44,402 INFO  
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler [] - Restarting 
job.
java.lang.IllegalStateException: The library registration references a 
different set of library BLOBs than previous registrations for this job:
old:[p-1b01ac6374a137939ffa18432714b7c9af30dc3f-522999dc4412a76d33728a97225de573]
new:[p-1b01ac6374a137939ffa18432714b7c9af30dc3f-819e2ecffa2f22c5e0f4d88ef5789421]
  at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:437)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$600(BlobLibraryCacheManager.java:373)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:249)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1200(BlobLibraryCacheManager.java:210)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:350)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at 
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1047)
 ~[flink-dist-1.19.0.jar:1.19.0]
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:628) 
~[flink-dist-1.19.0.jar:1.19.0]
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
~[flink-dist-1.19.0.jar:1.19.0]
  at java.lang.Thread.run(Unknown Source) ~[?:?]

Flink 1.19, operator 1.9.0


. . . . . . . . . . . . . . . . . . . . .
  Maxim Senin
   Senior Backend Engineer
 COGILITY

From: prashant parbhane 
Date: Tuesday, April 23, 2024 at 11:09 PM
To: user@flink.apache.org 
Subject: Regarding java.lang.IllegalStateException
Hello,

We have been facing this weird issue, where we are getting below exception and 
the job is getting restarted with new task managers. We are using flink 1.17.
Same job works fine with lower number of task managers. (<10)

java.lang.IllegalStateException: The library registration references a 
different set of library BLOBs than previous registrations for this job:
old:[p-ecf88f3c5c35842d7bca235cfd7592c53f9fdbe0-e06122dd51cccba866932318dc031d68]
new:[p-ecf88f3c5c35842d7bca235cfd7592c53f9fdbe0-7651f69109c915ac830aa42ce2ab67f0]
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419)
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCacheManager.java:359)
at 

Re: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0

2024-04-26 Thread Maxim Senin via user
Hi, Gyula. Thanks for the tips.

All jobs are deployed in a single namespace, “flink”.

Which replicas? The JM replicas are already 1, I tried with TM replicas set to 
1, but same exception happens. We have only 1 instance of the operator 
(replicas=1) in this environment.

The only workarounds I discovered is either
a) disable autoscaling for the failing job (autoscaler scales the job to zero 
for “gracefully” stopping it and then never starts it)  or
b) some jobs that keep restarting can be fixed by disabling HA for that job

And ` Cannot rescale the given pointwise partitioner.` is also still a mystery.
Thanks,
Maxim

From: Gyula Fóra 
Date: Friday, April 26, 2024 at 1:10 AM
To: Maxim Senin 
Cc: Maxim Senin via user 
Subject: Re: [External] Exception during autoscaling operation - Flink 
1.18/Operator 1.8.0
Hi Maxim!

Regarding the status update error, it could be related to a problem that we 
have discovered recently with the Flink Operator HA. Where during a namespace 
change both leader and follower instances would start processing.
It has been fixed in the current master by updating the JOSDK version to the 
one containing the fix.

For details you can check:
https://github.com/operator-framework/java-operator-sdk/issues/2341
https://github.com/apache/flink-kubernetes-operator/commit/29a4aa5adf101920cbe1a3a9a178ff16f52c746d

To resolve the issue (if it's caused by this), you could either cherry-pick the 
fix internally to the operator or reduce the replicas to 1 if you are using HA.

Cheers,
Gyula



On Fri, Apr 26, 2024 at 3:48 AM Maxim Senin via user 
mailto:user@flink.apache.org>> wrote:
I have also seen this exception:

o.a.f.k.o.o.JobStatusObserver  
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Job 
d0ac9da5959d8cc9a82645eeef6751a5 failed with error: 
java.util.concurrent.CompletionException: 
java.util.concurrent.CompletionException: 
java.lang.UnsupportedOperationException: Cannot rescale the given pointwise 
partitioner.
Did you change the partitioner to forward or rescale?
It may also help to add an explicit shuffle().
at 
org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305)
at 
java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at 
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
Source)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)

I can’t find any information on how to interpret this. Please advise..

Cheers,
Maxim
From: Maxim Senin via user mailto:user@flink.apache.org>>
Date: Thursday, April 25, 2024 at 12:01 PM
To: Maxim Senin via user mailto:user@flink.apache.org>>
Subject: [External] Exception during autoscaling operation - Flink 
1.18/Operator 1.8.0
Hi.

I already asked before but never got an answer. My observation is that the 
operator, after collecting some stats, is trying to restart one of the 
deployments. This includes taking a savepoint (`takeSavepointOnUpgrade: true`, 
`upgradeMode: savepoint`) and “gracefully” shutting down the JobManager by 
“scaling it to zero” (by setting replicas = 0 in the new generated config).

However, the deployment never comes back up, apparently, due to exception:

2024-04-25 17:20:52,920 mi.j.o.p.e.ReconciliationDispatcher 
[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] Error during error status 
handling.
org.apache.flink.kubernetes.operator.exception.StatusConflictException: Status 
have been modified externally in version 50607043 Previous: 
{"jobStatus":{"jobName":"autoscaling 
test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHEDINFO\\n\"},\"mode\":\"native\"},\"resource_metadata\":…
at 
org.apache.flink.kubernetes.operator.utils.StatusRecorder.replaceStatus(StatusRecorder.java:161)
 

Re: Strange Problem (0 AvailableTask)

2024-04-26 Thread Hemi Grs
Hi Biao,

Thanks for your reply, fortunately the problem is solved.
All I did was changed the bind-host to 0.0.0.0 (previously it was set to
the server's IP).

I don't know if it's best practice or not but everything is working fine
now.

RIght now I am using flink as standalone (I have the binaries in Centos and
just run it using the ./start-cluster command), but I want to learn on how
to set-it up in cluster. Are there any tutorials that you can recommend?

Thanks

On Fri, Apr 26, 2024 at 3:33 PM Biao Geng  wrote:

> Hi Hemi,
> How do you start your flink cluster? Are you using standalone cluster or
> using k8s/yarn as resource providers?
> Also, it would be very helpful if you can share the full jobmanager log.
>
> Best,
> Biao Geng
>
> Hemi Grs  于2024年4月18日周四 15:43写道:
>
>> Hello,
>>
>> I have several versions of Flink (1.17.0, 1.18.0, 1.18.1 and 1.19.0) on
>> my server.
>> I am still trying it out (on & off), and I was running a job for sync a
>> table from mysql to elasticsearch and it was running find without any
>> problems ( I was using 1.18.1 version).
>> But after a few weeks, I forgot about it and check the dashboard and no
>> job is running. But the strange part is there are 0 available task (I
>> config it to have 10 tasks).
>>
>> I tried restarting the service but still it show 0 available tasks. I
>> even try using different versions and all of them (except 1.17.0) has no
>> available tasks. So now I am back using the 1.17.0.
>>
>> When I checked the log it has this message:
>> -
>> Tokens update task not started because either no tokens obtained or none
>> of the tokens specified its renewal date
>> -
>>
>> Is it because of that? and what is the solution?
>>
>> Appreciate for all the help.
>>
>> Thanks
>>
>


Re: Flink SQL Client does not start job with savepoint

2024-04-26 Thread Biao Geng
Hi Lee,

A quick question: what version of flink are you using for
testing execution.state-recovery.path?
It looks like this config is only supported in flink 1.20
 which is not released
yet.


Best,
Biao Geng


Lee, Keith  于2024年4月26日周五 04:51写道:

> Apologies, I have included the jobmanager log for
> 6969725a69ecc967aac2ce3eedcc274a  instead of
> 7881d53d28751f9bbbd3581976d9fe3d, however they looked exactly the same.
>
> Can include if necessary.
>
> Thanks
> Keith
>
>
>
> *From: *"Lee, Keith" 
> *Date: *Thursday, 25 April 2024 at 21:41
> *To: *"user@flink.apache.org" 
> *Subject: *Flink SQL Client does not start job with savepoint
>
>
>
> Hi,
>
> Referring to
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint
>
>
> I’ve followed the instruction however I do not see evidence of the job
> being started with savepoint. See SQL statements excerpt below:
>
>
>
> Flink SQL> STOP JOB '14de8cc898d56653b96872fc0ba03c91' WITH SAVEPOINT;
>
> +--+
>
> |   savepoint path |
>
> +--+
>
> | file:/tmp/flink-savepoints/savepoint-14de8c-f744e50d3ecc |
>
> +--+
>
>
> …
>
> Flink SQL> CREATE TABLE Orders (order_number BIGINT,price
> DECIMAL(32,2),buyer ROW,order_time
> TIMESTAMP(3)) WITH ('connector' = 'datagen');
>
> *[INFO] Execute statement succeed.*
>
>
>
> Flink SQL> CREATE TABLE OrdersBlackhole (order_number BIGINT,price
> DECIMAL(32,2),buyer ROW,order_time
> TIMESTAMP(3)) WITH ('connector' = 'blackhole');
>
> *[INFO] Execute statement succeed.*
>
>
>
> Flink SQL> INSERT INTO OrdersBlackhole SELECT * FROM Orders;
>
> *[INFO] Submitting SQL update statement to the cluster...*
>
> *[INFO] SQL update statement has been successfully submitted to the
> cluster:*
>
> Job ID: 6969725a69ecc967aac2ce3eedcc274a
>
>
>
>
>
> Flink SQL> STOP JOB '6969725a69ecc967aac2ce3eedcc274a';
>
> *[INFO] Execute statement succeed.*
>
>
>
> Flink SQL> SET 'execution.state-recovery.path' = '
> file:///tmp/flink-savepoints/savepoint-14de8c-f744e50d3ecc';
>
> *[INFO] Execute statement succeed.*
>
>
>
> Flink SQL> INSERT INTO OrdersBlackhole SELECT * FROM Orders;
>
> *[INFO] Submitting SQL update statement to the cluster...*
>
> *[INFO] SQL update statement has been successfully submitted to the
> cluster:*
>
> Job ID: 7881d53d28751f9bbbd3581976d9fe3d
>
>
>
>
> I have attempted with and without the prefix file:// and file:/.
> Additionally, I’ve also attempted the following in config.yml
>
> state.savepoints.dir: file:///tmp/flink-savepoints/
>
> state.checkpoints.dir: file:///tmp/flink-checkpoints/
>
>
> Am I missing something? The jobmanager log did not indicate a start from
> savepoint.
>
>
> Received JobGraph submission
> 'insert-into_default_catalog.default_database.OrdersBlackhole'
> (6969725a69ecc967aac2ce3eedcc274a).
>
> Submitting job
> 'insert-into_default_catalog.default_database.OrdersBlackhole'
> (6969725a69ecc967aac2ce3eedcc274a).
>
> JobMasterServiceLeadershipRunner for job 6969725a69ecc967aac2ce3eedcc274a
> was granted leadership with leader id ----.
> Creating new JobMasterServiceProcess.
>
> Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
> pekko://flink/user/rpc/jobmanager_4 .
>
> Initializing job
> 'insert-into_default_catalog.default_database.OrdersBlackhole'
> (6969725a69ecc967aac2ce3eedcc274a).
>
> Using restart back off time strategy NoRestartBackoffTimeStrategy for
> insert-into_default_catalog.default_database.OrdersBlackhole
> (6969725a69ecc967aac2ce3eedcc274a).
>
> Created execution graph 9905f321e9958b6c36b71e0601a85a59 for job
> 6969725a69ecc967aac2ce3eedcc274a.
>
> Running initialization on master for job
> insert-into_default_catalog.default_database.OrdersBlackhole
> (6969725a69ecc967aac2ce3eedcc274a).
>
> Successfully ran initialization on master in 0 ms.
>
> Built 1 new pipelined regions in 0 ms, total 1 pipelined regions currently.
>
> State backend is set to heap memory
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend@78e93599
>
> State backend loader loads the state backend as HashMapStateBackend
>
> Using job/cluster config to configure application-defined checkpoint
> storage:
> org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@acb26a25
>
> No checkpoint found during restore.
>
> Using failover strategy
> org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionFailoverStrategy@7db68f8f
> for insert-into_default_catalog.default_database.OrdersBlackhole
> (6969725a69ecc967aac2ce3eedcc274a).
>
> Starting execution of job
> 'insert-into_default_catalog.default_database.OrdersBlackhole'
> (6969725a69ecc967aac2ce3eedcc274a) under job master id
> .
>
> Starting 

Re: Strange Problem (0 AvailableTask)

2024-04-26 Thread Biao Geng
Hi Hemi,
How do you start your flink cluster? Are you using standalone cluster or
using k8s/yarn as resource providers?
Also, it would be very helpful if you can share the full jobmanager log.

Best,
Biao Geng

Hemi Grs  于2024年4月18日周四 15:43写道:

> Hello,
>
> I have several versions of Flink (1.17.0, 1.18.0, 1.18.1 and 1.19.0) on my
> server.
> I am still trying it out (on & off), and I was running a job for sync a
> table from mysql to elasticsearch and it was running find without any
> problems ( I was using 1.18.1 version).
> But after a few weeks, I forgot about it and check the dashboard and no
> job is running. But the strange part is there are 0 available task (I
> config it to have 10 tasks).
>
> I tried restarting the service but still it show 0 available tasks. I even
> try using different versions and all of them (except 1.17.0) has no
> available tasks. So now I am back using the 1.17.0.
>
> When I checked the log it has this message:
> -
> Tokens update task not started because either no tokens obtained or none
> of the tokens specified its renewal date
> -
>
> Is it because of that? and what is the solution?
>
> Appreciate for all the help.
>
> Thanks
>


Re: [External] Exception during autoscaling operation - Flink 1.18/Operator 1.8.0

2024-04-26 Thread Gyula Fóra
Hi Maxim!

Regarding the status update error, it could be related to a problem that we
have discovered recently with the Flink Operator HA. Where during a
namespace change both leader and follower instances would start processing.
It has been fixed in the current master by updating the JOSDK version to
the one containing the fix.

For details you can check:
https://github.com/operator-framework/java-operator-sdk/issues/2341
https://github.com/apache/flink-kubernetes-operator/commit/29a4aa5adf101920cbe1a3a9a178ff16f52c746d

To resolve the issue (if it's caused by this), you could either cherry-pick
the fix internally to the operator or reduce the replicas to 1 if you are
using HA.

Cheers,
Gyula



On Fri, Apr 26, 2024 at 3:48 AM Maxim Senin via user 
wrote:

> I have also seen this exception:
>
> o.a.f.k.o.o.JobStatusObserver  
> *[ERROR][flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6]
> Job d0ac9da5959d8cc9a82645eeef6751a5 failed with error:
> java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException:
> java.lang.UnsupportedOperationException: Cannot rescale the given pointwise
> partitioner.*
>
> *Did you change the partitioner to forward or rescale?*
>
> *It may also help to add an explicit shuffle().*
>
> *at
> org.apache.flink.util.concurrent.FutureUtils.lambda$switchExecutor$23(FutureUtils.java:1305)*
>
> *at
> java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)*
>
> *at
> java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown
> Source)*
>
> *at
> java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown
> Source)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)*
>
> *at
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)*
>
> *at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)*
>
> *at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)*
>
> *at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)*
>
> *at
> scala.PartialFunction.applyOrElse(PartialFunction.scala:127)*
>
> *at
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)*
>
> *at
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)*
>
> I can’t find any information on how to interpret this. Please advise..
>
> Cheers,
> Maxim
>
> *From: *Maxim Senin via user 
> *Date: *Thursday, April 25, 2024 at 12:01 PM
> *To: *Maxim Senin via user 
> *Subject: *[External] Exception during autoscaling operation - Flink
> 1.18/Operator 1.8.0
>
> Hi.
>
> I already asked before but never got an answer. My observation is that the
> operator, after collecting some stats, is trying to restart one of the
> deployments. This includes taking a savepoint (`takeSavepointOnUpgrade:
> true`, `upgradeMode: savepoint`) and “gracefully” shutting down the
> JobManager by “scaling it to zero” (by setting replicas = 0 in the new
> generated config).
>
> However, the deployment never comes back up, apparently, due to exception:
>
>
> 2024-04-25 17:20:52,920 mi.j.o.p.e.ReconciliationDispatcher *[ERROR]*
> [flink/f-d7681d0f-c093-5d8a-b5f5-2b66b4547bf6] *Error* during error
> status handling.
>
> org.apache.flink.kubernetes.operator.exception.StatusConflictException:
> Status have been modified externally in version 50607043 Previous:
> {"jobStatus":{"jobName":"autoscaling
> test:attack-surface","jobId":"be93ad9b152c1f11696e971e6a638b63","state":"FINISHEDINFO\\n\"},\"mode\":\"native\"},\"resource_metadata\":…
>
> *at
> org.apache.flink.kubernetes.operator.utils.StatusRecorder.replaceStatus(StatusRecorder.java:161)*
>
> *at
> org.apache.flink.kubernetes.operator.utils.StatusRecorder.patchAndCacheStatus(StatusRecorder.java:97)*
>
> *at
> org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils.toErrorStatusUpdateControl(ReconciliationUtils.java:438)*
>
> *at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:209)*
>
> *at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.updateErrorStatus(FlinkDeploymentController.java:57)*
>
> *at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleErrorStatusHandler(ReconciliationDispatcher.java:194)*
>
> *at
> 

Re: Async code inside Flink Sink

2024-04-26 Thread Biao Geng
Hi Jacob,

For your first question, I think it is fine to use Java completableFuture
for your case. If we create  lots of threads, of course it would consume
more CPU and influent the processing of records. But in your case, the
close op may not be very heavy. One thing comes to mind is that when the
exception happens during the close, if necessary, you should find a way to
propagated the exception to the flink's main thread. Maybe you can take a
look at kafka connector's codes

for more insights of using flink's mailboxExecutor.

For your second question, it seems to me that your deletion logic is your
customized action when writing to the sink. You can use Java's
ScheduledExecutorService for that. Flink has its timer

and
async

mechanism.  But it is mainly designed for processing records. Maybe they
are not what you are looking for.

Best,
Biao Geng



Jacob Rollings  于2024年4月18日周四 11:06写道:

> Hello,
>
> I have a use case where I need to do a cache file deletion after a
> successful sunk operation(writing to db). My Flink pipeline is built using
> Java. I am contemplating using Java completableFuture.runasync() to perform
> the file deletion activity. I am wondering what issues this might cause in
> terms of thread management and next event processing.
>
> Also related to the same usecase, in another Flink pipeline. I might need
> to do this cache file deletion in a timed fashion. For example, every five
> minutes  I have to check for cache files that are older than currently
> opened cache file that is serving some data into the Sink function. All old
> cache files that are in closed status need to be deleted in a timely manner.
>
> All this deletion has to happen asynchronously without blocking the flow
> of events from Flink source to sink.
> Also, based on requirement, I cannot make the whole Sink operation async.
> I have to make the file based cache  deletion alone async inside the Sink
> function.
>
> Does Flink support timers or async blocks?
>
> Any inputs  will be highly helpful.
>
> Thanks,
> JB
>