Flink 1.18 support for flink stateful functions

2024-04-12 Thread Deshpande, Omkar via user
Hello,

Is there a plan to add support for flink 1.18 in flink stateful function?
Also, generally the stateful functions seem to be slow and lag behind the flink 
release cycle. Is the stateful function project going to be actively maintained?

Thanks,
Omkar


High Availability on Kubernetes

2021-10-25 Thread Deshpande, Omkar
Hello,

We are running flink on Kubernetes(Standalone) in application cluster mode. The 
job manager is deployed as a deployment.
We only deploy one instance/replica of job manager. So, the leader election 
service is not required.
And we have set flink task execution retries to infinite.

Do we still need a HA setup? We have tested our application without configuring 
the HA, and it seems to restore from checkpoints after failures.
Does the flink job manager keep the information that it would otherwise store 
in HA system, in memory?
If it does, then the only reason to configure HA is to achieve resiliency in 
case of pod evictions(caused by node failures or scheduling etc.)?

Thanks,
Omkar


Re: rpc invocation exceeds the maximum akka framesize

2021-09-28 Thread Deshpande, Omkar
Is the akka framesize a function of broadcast size?

From: Deshpande, Omkar
Sent: Monday, September 27, 2021 6:31 PM
To: user@flink.apache.org 
Cc: Benenson, Mikhail ; Hwang, Nick 
; Canchi, Srivathsan 
Subject: rpc invocation exceeds the maximum akka framesize

Hello,

We run a lot of flink applications. Some of them sometimes run into this error 
on Job Manager-
The rpc invocation size exceeds the maximum akka framesize

After we increase the framesize the application starts working again.
What factors determine the akka framesize? We sometimes see applications run 
without this issue for months and then run into this error.
How can we determine the framesize before running into this error?

Thanks,
Omkar


rpc invocation exceeds the maximum akka framesize

2021-09-27 Thread Deshpande, Omkar
Hello,

We run a lot of flink applications. Some of them sometimes run into this error 
on Job Manager-
The rpc invocation size exceeds the maximum akka framesize

After we increase the framesize the application starts working again.
What factors determine the akka framesize? We sometimes see applications run 
without this issue for months and then run into this error.
How can we determine the framesize before running into this error?

Thanks,
Omkar


Rocksdb - org.apache.flink.util.SerializedThrowable : bad entry in block

2021-01-26 Thread Deshpande, Omkar
Hello,

I am using flink 1.9 with beam 2.26 and rocksdb state backend. I am getting 
this exception -

org.apache.flink.util.SerializedThrowable: Caught exception while processing 
timer.
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:978)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:952)
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.SerializedThrowable: 
org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Error 
reading state.
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
... 7 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: 
java.lang.RuntimeException: Error reading state.
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at 
com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn$OnTimerInvoker$tsbufferexpiry$dHMtYnVmZmVyX2V4cGlyeQ.invokeOnTimer(Unknown
 Source)
at 
org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:235)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.onTimer(SimpleDoFnRunner.java:226)
at 
org.apache.beam.runners.core.StatefulDoFnRunner.onTimer(StatefulDoFnRunner.java:237)
at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.onTimer(DoFnRunnerWithMetricsUpdate.java:79)
at 
org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.onTimer(SimplePushbackSideInputDoFnRunner.java:119)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimer(DoFnOperator.java:1010)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:995)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:990)
at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
... 7 common frames omitted
Caused by: org.apache.flink.util.SerializedThrowable: Error reading state.
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.read(FlinkStateInternals.java:494)
at 
com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn.onEventBufferExpiry(SessionEnrichDoFn.java:92)
Caused by: org.apache.flink.util.SerializedThrowable: Error while retrieving 
data from RocksDB
at 
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
at 
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111)
at 
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.read(FlinkStateInternals.java:471)
at 
com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn.onEventBufferExpiry(SessionEnrichDoFn.java:92)
at 
com.intuit.data.platform.process.sessionization.transform.SessionEnrichDoFn$OnTimerInvoker$tsbufferexpiry$dHMtYnVmZmVyX2V4cGlyeQ.invokeOnTimer(Unknown
 Source)
at 
org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:235)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.onTimer(SimpleDoFnRunner.java:226)
at 
org.apache.beam.runners.core.StatefulDoFnRunner.onTimer(StatefulDoFnRunner.java:237)
at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.onTimer(DoFnRunnerWithMetricsUpdate.java:79)
at 
org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.onTimer(SimplePushbackSideInputDoFnRunner.java:119)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimer(DoFnOperator.java:1010)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:995)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:990)

Flink taskmanager id

2021-01-07 Thread Deshpande, Omkar
Hello,

I use flink on kubernetes. And the taskmanagers get assigned random uuids. Is 
there a way to explicitly configure them to use hostnames instead?


Omkar


Re: flink checkpoint timeout

2020-09-15 Thread Deshpande, Omkar
I have followed this 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html<https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_migration.html#container-cut-off-memory>
and I am using taskmanager.memory.flink.size now instead of 
taskmanager.heap.size

From: Deshpande, Omkar 
Sent: Monday, September 14, 2020 6:23 PM
To: user@flink.apache.org 
Subject: flink checkpoint timeout

This email is from an external sender.

Hello,

I recently upgraded from flink 1.9 to 1.10. The checkpointing succeeds first 
couple of times and then starts failing because of timeouts. The checkpoint 
time grows with every checkpoint and starts exceeding 10 minutes. I do not see 
any exceptions in the logs. I have enabled debug logging at "org.apache.flink" 
level. How do I investigate this? The garbage collection seems fine. There is 
no backpressure. This used to work as is with flink 1.9 without any issue.

Any pointers on how to investigate long time taken to complete checkpoint?

Omkar


flink checkpoint timeout

2020-09-14 Thread Deshpande, Omkar
Hello,

I recently upgraded from flink 1.9 to 1.10. The checkpointing succeeds first 
couple of times and then starts failing because of timeouts. The checkpoint 
time grows with every checkpoint and starts exceeding 10 minutes. I do not see 
any exceptions in the logs. I have enabled debug logging at "org.apache.flink" 
level. How do I investigate this? The garbage collection seems fine. There is 
no backpressure. This used to work as is with flink 1.9 without any issue.

Any pointers on how to investigate long time taken to complete checkpoint?

Omkar


Same kafka partition being consumed by multiple task managers.

2020-08-20 Thread Deshpande, Omkar
Hello,

I am running a streaming Beam app with the Flink runner(java).

  *   Beam 2.19

  *   Flink 1.9

Checkpoints and savepoints are configured to go to s3 and HA is enabled using 
Zookeeper.

I was running the app with 3 task managers. I took a savepoint and started the 
app with 6 task managers. My input topic has 12 partitions. With 3 pods, the 
partitions were distributed evenly. After restarting with the increased number 
of task managers, some partitions are being consumed by 2 task managers.
partition   task manager
0   4
1   4
2   1
3   1
4   3
5   3
6   4, 0
7   4, 0
8   1, 2
9   1, 2
10  3, 5
11  3, 5
 Looks like there were 3 task managers [1,3,4] and they correctly distributed 
partitions between them.
Then 3 new task managers were added [0,2,5] and partitions were not properly 
re-distributed.

Where could this metadata be coming from?

Omkar


allowNonRestoredState: metadata file in checkpoint dir missing

2020-07-31 Thread Deshpande, Omkar
Hello,

When deleting an operator we run our application with 
--allowNonRestoredState=true, as described in the 
documentation.
 When running with this flag, we have observed that the _metadata file is not 
generated in the checkpoint directory. So, if the application fails, we don’t 
have the ability to start from the checkpoint. And since the application has 
failed, we can’t take a savepoint as well.


Is _metadata file not being created in this case expected behavior?

How do we achieve resilience while using --allowNonRestoredState?


We are using Beam with the Flink runner(java).

  *   Beam 2.19

  *   Flink 1.9

Omkar


Re: Stopping flink application with /jobs/:jobid/savepoints or /jobs/:jobid/stop

2020-06-09 Thread Deshpande, Omkar
I have observed that state gets drained irrespective of the value of the 
"drain".

I am using -
org.apache.beam
beam-runners-flink-1.9
2.19.0

And I am running a kafka wordcount app with fixed window of 1 hour and when I 
stop the app with the stop 
endpoint<https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-jobid-stop>
 before 1 hour, the records get drained. I have tried with {"drain":true} and 
{"drain":false} in the body of the POST request. The drain behavior remains the 
same.


From: Kostas Kloudas 
Sent: Tuesday, June 9, 2020 4:48 AM
To: Deshpande, Omkar 
Cc: user@flink.apache.org ; Hwang, Nick 
; Benenson, Mikhail ; 
LeVeck, Matt ; Kathula, Sandeep 

Subject: Re: Stopping flink application with /jobs/:jobid/savepoints or 
/jobs/:jobid/stop

This email is from an external sender.


Hi Omkar,

For the first part of the question where you set the "drain" to false
and the state gets drained, this can be an issue on our side. Just to
clarify, no matter what is the value of the "drain", Flink always
takes a savepoint. Drain simply means that we also send MAX_WATERMARK
before taking the savepoint. Is this what you observe? I.e. that you
have an infinite input stream and even if you set drain to false, you
still see the MAX_WATERMARK?

For the second part of the question, the cancel-with-savepoint is a
deprecated command. But it is not removed for backwards compatibility.
So you can still have a cancel-with-savepoint in the way you
described. The difference between the deprecated cancel-with-savepoint
and the recommended stop-with-savepoint is that the
stop-with-savepoint guarantees that if you are using an exactly-once
sink, the side-effects are going to be committed to the sink before
the job exits. This was not the case for cancel-with-savepoint. For
more details, you can have a look at [1].

Cheers,
Kostas

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212

On Tue, Jun 9, 2020 at 4:40 AM Deshpande, Omkar
 wrote:
>
> Hello,
>
> When I try to stop the job with /jobs/:jobid/stop REST endpoint, the state 
> gets drained, even if I pass {"drain":false} in the body of the post request. 
> Is the value of drain flag true by default? Why is not getting used when I 
> pass {"drain":false}?
>
> And I can also stop a job using this endpoint /jobs/:jobid/savepoints with 
> {"cancel-job":"true"} in the body. In this case there the state is not 
> drained. What is the difference between these 2 endpoints? Is there a reason 
> to use one over the other?
>
> If I want to stop a job with savepoint but without draining the state which 
> endpoint should be used?
>
> Omkar


Stopping flink application with /jobs/:jobid/savepoints or /jobs/:jobid/stop

2020-06-09 Thread Deshpande, Omkar
Hello,

When I try to stop the job with /jobs/:jobid/stop REST 
endpoint,
 the state gets drained, even if I pass {"drain":false} in the body of the post 
request. Is the value of drain flag true by default? Why is not getting used 
when I pass {"drain":false}?

And I can also stop a job using 
this
 endpoint /jobs/:jobid/savepoints with {"cancel-job":"true"} in the body. In 
this case there the state is not drained. What is the difference between these 
2 endpoints? Is there a reason to use one over the other?

If I want to stop a job with savepoint but without draining the state which 
endpoint should be used?

Omkar