Flink docker on k8s job submission timeout

2021-11-10 Thread dhanesh arole
Hello all,

We are trying to run a Flink job in standalone mode using the official
docker image on k8s. As per this documentation
<https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#advanced-customization>
we
have created our custom docker image that extends from the official image
and does some pre start actions. And finally does `exec
/docker-entrypoint.sh standalone-job "$1"` to run the job manager. We have
ensured that flink-conf.yaml is present at expected path
i.e. $FLINK_HOME"/conf/flink-conf.yaml and have setup
JOB_MANAGER_RPC_ADDRESS from pod IP.

We submit our job for execution in application's main thread using
`StreamExecutionEnvironment#executeAsync`. But while submitting the job we
are consistently getting AskTimeout exception from dispatcher#SubmitJob. (
see logs below )

Based on some previous answers on mailing lists and issues, we tried
increasing "web.timeout" and "akka.ask.timeout" but neither of that helped.
It seems like the timeout value used for this particular future is
hardcoded in code. somewhere. Would be great if someone can  provide some
help / pointers on what we are missing or things that we should check for.

Error logs:










*Caused by: java.util.concurrent.TimeoutException: Invocation of public
abstract java.util.concurrent.CompletableFuture
org.apache.flink.runtime.dispatcher.DispatcherGateway.submitJob(org.apache.flink.runtime.jobgraph.JobGraph,org.apache.flink.api.common.time.Time)
timed out. at org.apache.flink.runtime.rpc.akka.$Proxy31.submitJob(Unknown
Source) ~[?:1.13.2] at
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.lambda$submitJob$6(EmbeddedExecutor.java:183)
~[flink-dist_2.12-1.13.2.jar:1.13.2] at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source)
~[?:?] at java.util.concurrent.CompletableFuture.postComplete(Unknown
Source) ~[?:?] at java.util.concurrent.CompletableFuture.complete(Unknown
Source) ~[?:?] at
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
~[flink-dist_2.12-1.13.2.jar:1.13.2] at
java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
~[?:?] at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
Source) ~[?*.
.
.
.
.










*Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/rpc/dispatcher_1#2019478781]] after [6 ms].
Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage].
A typical reason for `AskTimeoutException` is that the recipient actor
didn't send a reply. at
akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
~[flink-dist_2.12-1.13.2.jar:1.13.2] at
akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
~[flink-dist_2.12-1.13.2.jar:1.13.2] at
akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
~[flink-dist_2.12-1.13.2.jar:1.13.2] at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
~[flink-dist_2.12-1.13.2.jar:1.13.2] at
scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
~[flink-dist_2.12-1.13.2.jar:1.13.2] at
scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
~[flink-dist_2.12-1.13.2.jar:1.13.2] at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
~[flink-dist_2.12-1.13.2.jar:1.13.2] at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
~[flink.jar:?] at
akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
~[flink.jar:?] at
akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
~[flink.jar:?]*


-
Dhanesh Arole


Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-22 Thread dhanesh arole
Hi,

Questions that @matth...@ververica.com  asked are
very valid and might provide more leads. But if you haven't already then
it's worth trying to use jemalloc / tcmalloc. We had similar problems with
slow growth in TM memory resulting in pods getting OOMed by k8s. After
switching to jemalloc, the memory foot print improved dramatically.


-
Dhanesh Arole ( Sent from mobile device. Pardon me for typos )



On Thu, Apr 22, 2021 at 1:39 PM Matthias Pohl 
wrote:

> Hi,
> I have a few questions about your case:
> * What is the option you're referring to for the bounded shuffle? That
> might help to understand what streaming mode solution you're looking for.
> * What does the job graph look like? Are you assuming that it's due to a
> shuffling operation? Could you provide the logs to get a better
> understanding of your case?
> * Do you observe the same memory increase for other TaskManager nodes?
> * Are you expecting to reach the memory limits considering that you
> mentioned a "big state size"? Would increasing the memory limit be an
> option or do you fear that it's caused by some memory leak?
>
> Bet,
> Matthias
>
> On Fri, Apr 16, 2021 at 10:24 AM 马阳阳  wrote:
>
>> The Flink version we used is 1.12.0.
>>
>> 马阳阳
>> ma_yang_y...@163.com
>>
>> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=%E9%A9%AC%E9%98%B3%E9%98%B3=ma_yang_yang%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmdd35b1dbe9f6ac559bc7315871d3e51a.jpg=%5B%22ma_yang_yang%40163.com%22%5D>
>> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
>>
>> On 04/16/2021 16:07,马阳阳 
>> wrote:
>>
>> Hi, community,
>> When running a Flink streaming job with big state size, one task manager
>> process was killed by the yarn node manager. The following log is from the
>> yarn node manager:
>>
>> 2021-04-16 11:51:23,013 WARN
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>> Container
>> [pid=521232,containerID=container_e157_1618223445363_16943_01_10] is
>> running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0
>> GB of 12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used.
>> Killing container.
>>
>> When searching solution for this problem, I found that there is a option
>> for this that worked for bounded shuffle. So is there a way to get rid of
>> this in streaming mode?
>>
>> PS:
>> memory related options:
>> taskmanager.memory.process.size:12288m
>> taskmanager.memory.managed.fraction:0.7
>>
>>


Re: Flink Savepoint fault tolerance

2021-04-21 Thread dhanesh arole
Hi Arvid,

Thanks for taking time to answer this. Yeah, we are also using save points
as only restore mechanism If job parallelism needs to be changed or some
job graph properties need to be updated. Otherwise during other rolling
deployments of task manager pods or job manager pods we solely rely on
previously completed checkpoints.


On Wed, 21 Apr 2021 at 15:05 Arvid Heise  wrote:

> Hi Dhanesh,
>
> We recommend to use savepoints only for migrations, investigations, A/B
> testing, and time travel and rely completely on checkpoints for fault
> tolerance. Are you using it differently?
>
> Currently, we are triggering savepoints using REST apis. And query the
>> status of savepoint by the returned handle. In case there is a network
>> issue because of which we couldn't receive response then in that case how
>> to find out if the savepoint in the previous request was triggered or not?
>> Is there a way to add "idempotency-key" to each API request so that we can
>> safely retry triggering savepoint? By doing this, we want to avoid multiple
>> triggers of consecutive savepoints during job upgrades.
>>
> I think you'd have to use your logging system and have a metric/trigger on
> the respective line. I don't think there is any REST API for that.
>
> Our workflow for capturing savepoint looks like this - call POST
>> /savepoint endpoint. Use the returned trigger handle to periodically poll
>> the status of savepoint. Once the savepoint is completed then restore the
>> job from that savepoint. We are running our flink clusters in k8s. Since
>> pod IPs can get restarted / migrated quite often in k8s, it's possible that
>> the JM pod that was used to capture the savepoint happens to be recycled
>> before completion of savepoint. In that case, we can't query the status of
>> triggered savepoint from the previously returned handle. As neither the
>> newly created JM pod or any other standby JMs have information about this
>> savepoint. I couldn't find any config that makes Flink persist state of
>> ongoing savepoints to an external store which will allow users to query the
>> status of savepoint via any available JM instance in HA setup.
>>
> Not an expert on K8s but couldn't you expose the JM as a K8s service. That
> should follow the migration automatically.
>
> If one of the TMs crashes during ongoing checkpoint then I believe that
>> checkpoint is marked as failed and on the next checkpoint interval Flink
>> triggers a new checkpoint by looking at the previously completed checkpoint
>> counter. The next checkpoint attempt might get acknowledged by all
>> operators and marked as completed. Is that correct? In case of savepoints
>> this is not possible. So how does flink resume the savepoint capturing
>> process in case of job restarts or TM failures?
>>
> Savepoints have to be triggered anew. Savepoints are meant as a purely
> manual feature. Again, you could automate it, if you look at the logs.
>
> Best,
>
> Arvid
>
>
> On Fri, Apr 16, 2021 at 12:33 PM dhanesh arole 
> wrote:
>
>> Hello all,
>>
>> I had 2 questions regarding savepoint fault tolerance.
>>
>> Job manager restart:
>>
>>- Currently, we are triggering savepoints using REST apis. And query
>>the status of savepoint by the returned handle. In case there is a network
>>issue because of which we couldn't receive response then in that case how
>>to find out if the savepoint in the previous request was triggered or not?
>>Is there a way to add "idempotency-key" to each API request so that we can
>>safely retry triggering savepoint? By doing this, we want to avoid 
>> multiple
>>triggers of consecutive savepoints during job upgrades.
>>- Our workflow for capturing savepoint looks like this - call POST
>>/savepoint endpoint. Use the returned trigger handle to periodically poll
>>the status of savepoint. Once the savepoint is completed then restore the
>>job from that savepoint. We are running our flink clusters in k8s. Since
>>pod IPs can get restarted / migrated quite often in k8s, it's possible 
>> that
>>the JM pod that was used to capture the savepoint happens to be recycled
>>before completion of savepoint. In that case, we can't query the status of
>>triggered savepoint from the previously returned handle. As neither the
>>newly created JM pod or any other standby JMs have information about this
>>savepoint. I couldn't find any config that makes Flink persist state of
>>ongoing savepoints to an external store which will allow users to query 
>> the
>>stat

Flink Savepoint fault tolerance

2021-04-16 Thread dhanesh arole
Hello all,

I had 2 questions regarding savepoint fault tolerance.

Job manager restart:

   - Currently, we are triggering savepoints using REST apis. And query the
   status of savepoint by the returned handle. In case there is a network
   issue because of which we couldn't receive response then in that case how
   to find out if the savepoint in the previous request was triggered or not?
   Is there a way to add "idempotency-key" to each API request so that we can
   safely retry triggering savepoint? By doing this, we want to avoid multiple
   triggers of consecutive savepoints during job upgrades.
   - Our workflow for capturing savepoint looks like this - call POST
   /savepoint endpoint. Use the returned trigger handle to periodically poll
   the status of savepoint. Once the savepoint is completed then restore the
   job from that savepoint. We are running our flink clusters in k8s. Since
   pod IPs can get restarted / migrated quite often in k8s, it's possible that
   the JM pod that was used to capture the savepoint happens to be recycled
   before completion of savepoint. In that case, we can't query the status of
   triggered savepoint from the previously returned handle. As neither the
   newly created JM pod or any other standby JMs have information about this
   savepoint. I couldn't find any config that makes Flink persist state of
   ongoing savepoints to an external store which will allow users to query the
   status of savepoint via any available JM instance in HA setup.


Task manager restart:

   - If one of the TMs crashes during ongoing checkpoint then I believe
   that checkpoint is marked as failed and on the next checkpoint interval
   Flink triggers a new checkpoint by looking at the previously completed
   checkpoint counter. The next checkpoint attempt might get acknowledged by
   all operators and marked as completed. Is that correct? In case of
   savepoints this is not possible. So how does flink resume the savepoint
   capturing process in case of job restarts or TM failures?
   - I am sure this must be already handled but just wanted to confirm and
   get help in finding relevant code references for this so I can dig deeper
   for understanding it in depth from an educational point of view.


-
Dhanesh Arole ( Sent from mobile device. Pardon me for typos )


Re: Task manager local state data after crash / recovery

2021-04-09 Thread dhanesh arole
Thanks a lot for answering it in detail. This makes sense and cleared lots
of doubt.

On Fri, 9 Apr 2021 at 13:02 Till Rohrmann  wrote:

> Hi Dhanesh,
>
> The way local state works in Flink currently is the following: The user
> configures a `taskmanager.state.local.root-dirs` or the tmp directory is
> used where Flink creates a "localState" directory. This is the base
> directory for all local state. Within this directory a TaskManager creates
> for every allocation a sub directory using the `AllocationID`. Inside this
> directory, Flink then stores the local state artefacts.
>
> When Flink frees an allocation, then the corresponding directory is
> deleted. In case that the process is being killed via a SIGTERM signal,
> Flink also registers a shut down hook which tries to delete all directories
> for the known `AllocationIDs`. If the shut down hooks do not run (e.g.
> killed via SIGKILL), then Flink leaves some residual state.
>
> Now the problem is what happens if the TaskManager process is restarted on
> the same machine. In this case, Flink will simply use the same local state
> directory but it ignores existing allocation id sub directories. The reason
> is that Flink does not know whether these allocation id sub directories are
> not used by another Flink process running on the same machine. In order to
> make this decision Flink would have to know that it is the owner of these
> sub directories. This could work if each TaskManager process is started
> with a unique ID and if this ID is reused across restart attempts. This is
> currently not for every deployment the case.
>
> Long story short, it is currently expected that Flink can leave some
> residual state in case of a hard process stop. Cleaning this state up is at
> the moment unfortunately the responsibility of the user.
>
> Cheers,
> Till
>
> On Tue, Apr 6, 2021 at 4:55 PM dhanesh arole 
> wrote:
>
>> Hey all,
>>
>> We are running a stateful stream processing job on k8s using per-job
>> standalone deployment entrypoint. Flink version: 1.12.1
>>
>> *Problem*: We have observed that whenever a task manager is either
>> gracefully shut down or killed ( due to OOM, k8s worker node drain out etc
>> ) it doesn't clean up the rocksdb state directories from the local disk.
>> But when the task manager restarts and it receives new task allocation from
>> the resource manager it rebuilds its local state for those tasks from the
>> previous completed checkpoint. Over the period of time after multiple
>> restarts, the task manager's local disk ends up accumulating lots of such
>> orphan rocksdb directories.
>>
>> *Questions*: This isn't causing any functional issues to us, but it adds
>> up lots of repeated ops overhead of cleaning these disks periodically. As a
>> workaround, we are thinking of cleaning the local rocksdb directories
>> except for the *taskmanager.state.local.root-dirs *before starting the
>> task manager java process. Since, during every task manager restart keyed
>> state backends for allocated tasks are anyway restored we feel it is the
>> safest option atm and will solve our problem of ever growing disk on task
>> manager pods. Is it safe to do so or are there any other consequences of
>> it? Is there any config or restart policy that takes care of cleaning up
>> such stale rocksdb directories during the statebackend restore process?.
>>
>> A sort of similar clean up is required when local task recovery is
>> enabled. Whenever the task manager is not shut down gracefully the old
>> localState doesn't get cleaned up on the next restart. This also causes
>> lots of disk space wastage. It's easier to delete rocksdb working
>> directories from previou run, but not so straightforward for the localState
>> as one has to figure out which one of them are actually stale allocation
>> IDs and clean only those one. Or check the latest completed checkpoint and
>> delete all localStates directories for older checkpoints and
>> allocation-ids. Is there any other solution to this problem? Also would
>> like to learn from other users how are you handling these operational tasks
>> currently?
>>
>> configurations:
>>
>> state.backend.local-recovery: true
>> taskmanager.state.local.root-dirs: /data/flink/
>>
>> RocksDb backend DB storage path:  /data/flink ( set programmatically )
>>
>>
>> -
>> Dhanesh Arole
>>
> --
- Dhanesh ( sent from my mobile device. Pardon me for any typos )


Re: How to know if task-local recovery kicked in for some nodes?

2021-04-07 Thread dhanesh arole
Hi Till,

You are right. To give you more context about our setup, we are running 1
task slot per task manager and total number of task manager replicas equal
to job parallelism. The issue actually exacerbates during rolling
deployment of task managers as each TM goes offline and comes back online
again after some time. So during bouncing of every TM pod somehow task
allocation changes and finally job stabilises once all TMs are restarted.
Maybe a proper blue green setup would allow us to make the best use of
local recovery during restart of TMs. But during intermittent failures of
one of the TMs local recovery works as expected on the other healthy TM
instances ( I.e it does not download from remote ).

On Wed, 7 Apr 2021 at 10:35 Till Rohrmann  wrote:

> Hi Dhanesh,
>
> if some of the previously used TMs are still available, then Flink should
> try to redeploy tasks onto them also in case of a global failover. Only
> those tasks which have been executed on the lost TaskManager will need new
> slots and have to download the state from the remote storage.
>
> Cheers,
> Till
>
> On Tue, Apr 6, 2021 at 5:35 PM dhanesh arole 
> wrote:
>
>> Hi Sonam,
>>
>> We have a similar setup. What I have observed is, when the task manager
>> pod gets killed and restarts again ( i.e. the entire task manager process
>> restarts ) then local recovery doesn't happen. Task manager restore process
>> actually downloads the latest completed checkpoint from the remote state
>> handle even when the older localState data is available. This happens
>> because with every run allocation-ids for tasks running on task manager
>> change as task manager restart causes global job failure and restart.
>>
>> Local recovery - i.e task restore process using locally stored checkpoint
>> data kicks in when the task manager process is alive but due to some other
>> reason ( like timeout from sink or external dependency ) one of the tasks
>> fails and the flink job gets restarted by the job manager.
>>
>> Please CMIIW
>>
>>
>> -
>> Dhanesh Arole
>>
>> On Tue, Apr 6, 2021 at 11:35 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Sonam,
>>>
>>> The easiest way to see whether local state has been used for recovery is
>>> the recovery time. Apart from that you can also look for "Found registered
>>> local state for checkpoint {} in subtask ({} - {} - {}" in the logs which
>>> is logged on debug. This indicates that the local state is available.
>>> However, it does not say whether it is actually used. E.g. when doing a
>>> rescaling operation we change the assignment of key group ranges which
>>> prevents local state from being used. However in case of a recovery the
>>> above-mentioned log message should indicate that we use local state
>>> recovery.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Apr 6, 2021 at 11:31 AM Tzu-Li (Gordon) Tai 
>>> wrote:
>>>
>>>> Hi Sonam,
>>>>
>>>> Pulling in Till (cc'ed), I believe he would likely be able to help you
>>>> here.
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>> On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal 
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> We are experimenting with task local recovery and I wanted to know
>>>>> whether there is a way to validate that some tasks of the job recovered
>>>>> from the local state rather than the remote state.
>>>>>
>>>>> We've currently set this up to have 2 Task Managers with 2 slots each,
>>>>> and we run a job with parallelism 4. To simulate failure, we kill one of
>>>>> the Task Manager pods (we run on Kubernetes). I want to see if the local
>>>>> state of the other Task Manager was used or not. I do understand that the
>>>>> state for the killed Task Manager will need to be fetched from the
>>>>> checkpoint.
>>>>>
>>>>> Also, do you have any suggestions on how to test such failure
>>>>> scenarios in a better way?
>>>>>
>>>>> Thanks,
>>>>> Sonam
>>>>>
>>>> --
- Dhanesh ( sent from my mobile device. Pardon me for any typos )


Re: Flink Taskmanager failure recovery and large state

2021-04-07 Thread dhanesh arole
Hi Yaroslav,

We faced similar issues in our large stateful stream processing job. I had
asked question
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RocksDB-StateBuilder-unexpected-exception-td42397.html>
about it
on a user mailing list a few days back. Based on the reply to my question,
we figured that this happens when the task manager has just come back
online and is trying to rebuild / restore its state, but meanwhile another
task manager gets restarted or killed. In this situation job manager
cancels the job, as a result all task managers also start cancelling the
tasks that they are running atm. As a part of cancellation flow, channel
buffer through which flink TM writes to the disk gets closed. But there's
already state rebuilding happening concurrently using that channelBuffer.
This causes the channelClosed exception.

As a solution to this problem, we increased *akka.ask.timeout *to 10m. This
gives enough room to task managers to wait for rpc responses from other
task managers during restart. As a result TM becomes more lenient in
marking other TM as failed and cancelling the job in the first place.

-
Dhanesh Arole



On Tue, Apr 6, 2021 at 7:55 PM Robert Metzger  wrote:

> Hey Yaroslav,
>
> GCS is a somewhat popular filesystem that should work fine with Flink.
>
> It seems that the initial scale of a bucket is 5000 read requests per
> second (https://cloud.google.com/storage/docs/request-rate), your job
> should be at roughly the same rate (depending on how fast your job restarts
> in the restart loop).
>
> You could try to tweak the GCS configuration parameters, such as
> increasing "fs.gs.http.read-timeout" and "fs.gs.http.max.retry". (see
> https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md
> for all available options)
>
>
> The "ExecutionGraphException: The execution attempt
> 6f3651a49344754a1e7d1fb20cf2cba3 was not found." error looks weird, but it
> should not cause the restarts.
>
>
> On Fri, Apr 2, 2021 at 2:15 AM Guowei Ma  wrote:
>
>> Hi, Yaroslav
>>
>> AFAIK Flink does not retry if the download checkpoint from the storage
>> fails. On the other hand the FileSystem already has this retry mechanism
>> already. So I think there is no need for flink to retry.
>> I am not very sure but from the log it seems that the gfs's retry is
>> interrupted by some reason. So I think we could get more insight if we
>> could find the first fail cause.
>>
>> Best,
>> Guowei
>>
>>
>> On Fri, Apr 2, 2021 at 12:07 AM Yaroslav Tkachenko <
>> yaroslav.tkache...@shopify.com> wrote:
>>
>>> Hi Guowei,
>>>
>>> I thought Flink can support any HDFS-compatible object store like the
>>> majority of Big Data frameworks. So we just added
>>> "flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2"
>>> dependencies to the classpath, after that using "gs" prefix seems to be
>>> possible:
>>>
>>> state.checkpoints.dir: gs:///flink-checkpoints
>>> state.savepoints.dir: gs:///flink-savepoints
>>>
>>> And yes, I noticed that retries logging too, but I'm not sure if it's
>>> implemented on the Flink side or the GCS connector side? Probably need to
>>> dive deeper into the source code. And if it's implemented on the GCS
>>> connector side, will Flink wait for all the retries? That's why I asked
>>> about the potential timeout on the Flink side.
>>>
>>> The JM log doesn't have much besides from what I already posted. It's
>>> hard for me to share the whole log, but the RocksDB initialization part can
>>> be relevant:
>>>
>>> 16:03:41.987 [cluster-io-thread-3] INFO
>>>  org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to
>>> configure application-defined state backend:
>>> RocksDBStateBackend{checkpointStreamBackend=File State Backend
>>> (checkpoints: 'gs:///flink-checkpoints', savepoints:
>>> 'gs:///flink-savepoints', asynchronous: TRUE, fileStateThreshold:
>>> 1048576), localRocksDbDirectories=[/rocksdb],
>>> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
>>> writeBatchSize=2097152}
>>> 16:03:41.988 [cluster-io-thread-3] INFO
>>>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
>>> predefined options: FLASH_SSD_OPTIMIZED.
>>> 16:03:41.988 [cluster-io-thread-3] INFO
>>>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
>>> application-defined options factory:
>>> DefaultConfigurableOptionsFactory{configure

Re: How to know if task-local recovery kicked in for some nodes?

2021-04-06 Thread dhanesh arole
Hi Sonam,

We have a similar setup. What I have observed is, when the task manager pod
gets killed and restarts again ( i.e. the entire task manager process
restarts ) then local recovery doesn't happen. Task manager restore process
actually downloads the latest completed checkpoint from the remote state
handle even when the older localState data is available. This happens
because with every run allocation-ids for tasks running on task manager
change as task manager restart causes global job failure and restart.

Local recovery - i.e task restore process using locally stored checkpoint
data kicks in when the task manager process is alive but due to some other
reason ( like timeout from sink or external dependency ) one of the tasks
fails and the flink job gets restarted by the job manager.

Please CMIIW


-
Dhanesh Arole

On Tue, Apr 6, 2021 at 11:35 AM Till Rohrmann  wrote:

> Hi Sonam,
>
> The easiest way to see whether local state has been used for recovery is
> the recovery time. Apart from that you can also look for "Found registered
> local state for checkpoint {} in subtask ({} - {} - {}" in the logs which
> is logged on debug. This indicates that the local state is available.
> However, it does not say whether it is actually used. E.g. when doing a
> rescaling operation we change the assignment of key group ranges which
> prevents local state from being used. However in case of a recovery the
> above-mentioned log message should indicate that we use local state
> recovery.
>
> Cheers,
> Till
>
> On Tue, Apr 6, 2021 at 11:31 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Sonam,
>>
>> Pulling in Till (cc'ed), I believe he would likely be able to help you
>> here.
>>
>> Cheers,
>> Gordon
>>
>> On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal 
>> wrote:
>>
>>> Hello,
>>>
>>> We are experimenting with task local recovery and I wanted to know
>>> whether there is a way to validate that some tasks of the job recovered
>>> from the local state rather than the remote state.
>>>
>>> We've currently set this up to have 2 Task Managers with 2 slots each,
>>> and we run a job with parallelism 4. To simulate failure, we kill one of
>>> the Task Manager pods (we run on Kubernetes). I want to see if the local
>>> state of the other Task Manager was used or not. I do understand that the
>>> state for the killed Task Manager will need to be fetched from the
>>> checkpoint.
>>>
>>> Also, do you have any suggestions on how to test such failure scenarios
>>> in a better way?
>>>
>>> Thanks,
>>> Sonam
>>>
>>


Task manager local state data after crash / recovery

2021-04-06 Thread dhanesh arole
Hey all,

We are running a stateful stream processing job on k8s using per-job
standalone deployment entrypoint. Flink version: 1.12.1

*Problem*: We have observed that whenever a task manager is either
gracefully shut down or killed ( due to OOM, k8s worker node drain out etc
) it doesn't clean up the rocksdb state directories from the local disk.
But when the task manager restarts and it receives new task allocation from
the resource manager it rebuilds its local state for those tasks from the
previous completed checkpoint. Over the period of time after multiple
restarts, the task manager's local disk ends up accumulating lots of such
orphan rocksdb directories.

*Questions*: This isn't causing any functional issues to us, but it adds up
lots of repeated ops overhead of cleaning these disks periodically. As a
workaround, we are thinking of cleaning the local rocksdb directories
except for the *taskmanager.state.local.root-dirs *before starting the task
manager java process. Since, during every task manager restart keyed state
backends for allocated tasks are anyway restored we feel it is the safest
option atm and will solve our problem of ever growing disk on task manager
pods. Is it safe to do so or are there any other consequences of it? Is
there any config or restart policy that takes care of cleaning up such
stale rocksdb directories during the statebackend restore process?.

A sort of similar clean up is required when local task recovery is enabled.
Whenever the task manager is not shut down gracefully the old localState
doesn't get cleaned up on the next restart. This also causes lots of disk
space wastage. It's easier to delete rocksdb working directories from
previou run, but not so straightforward for the localState as one has to
figure out which one of them are actually stale allocation IDs and clean
only those one. Or check the latest completed checkpoint and delete all
localStates directories for older checkpoints and allocation-ids. Is there
any other solution to this problem? Also would like to learn from other
users how are you handling these operational tasks currently?

configurations:

state.backend.local-recovery: true
taskmanager.state.local.root-dirs: /data/flink/

RocksDb backend DB storage path:  /data/flink ( set programmatically )


-
Dhanesh Arole


Re: RocksDB StateBuilder unexpected exception

2021-03-23 Thread dhanesh arole
Hi Matthias,

Thanks for taking to help us with this.

You are right there were lots of task cancellations, as this exception
causes the job to get restarted, triggering cancellations.


-
Dhanesh Arole


On Tue, Mar 23, 2021 at 9:27 AM Matthias Pohl 
wrote:

> Hi Danesh,
> thanks for reaching out to the Flink community. Checking the code, it
> looks like the OutputStream is added to a CloseableRegistry before writing
> to it [1].
>
> My suspicion is - based on the exception cause - that the
> CloseableRegistry got triggered while restoring the state. I tried to track
> down the source of the CloseableRegistry. It looks like it's handed down
> from the StreamTask [2].
>
> The StreamTask closes the CloseableRegistry either when cancelling is
> triggered or in the class' finalize method. Have you checked the logs to
> see whether there was some task cancellation logged?
>
> Best,
> Matthias
>
> [1]
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L132
> [2]
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L269
>
> On Fri, Mar 19, 2021 at 5:07 PM dhanesh arole 
> wrote:
>
>> Hello Hivemind,
>>
>> We are running a stateful streaming job. Each task manager instance hosts
>> around ~100GB of data. During restart of task managers we encountered
>> following errors, because of which the job is not able to restart.
>> Initially we thought it might be due to failing status checks of attached
>> EBS volumes or burst balance exhaustion but AWS console is not indicating
>> any issue with EBS volumes. Is there anything that else that we need to
>> look at which can potentially cause this exception? Also it's quite unclear
>> what exactly is the cause of the exception, any help on that would be much
>> appreciated.
>>
>> Flink version: 1.12.2_scala_2.11
>> Environment: Kubernetes on AWS
>> Volume Type: EBS, gp2 300GiB
>>
>> *ERROR
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder []
>> - Caught unexpected exception.*
>> *java.nio.channels.ClosedChannelException: null*
>> * at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
>> ~[?:?]*
>> * at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]*
>> * at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]*
>> * at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]*
>> * at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
>> ~[?:?]*
>> * at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> ~[?:?]*
>> * at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> ~[?:?]*
>> * at java.lang.Thread.run(Thread.java:830) [?:?]*
>> *2021-03-19 15:26:10,385 WARN
>>  org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] -
>> Exception while restoring keyed state backend for
>> KeyedCoProcessOperator_55a6c4a5d36b0124ad78cbf6bd864bba_(2/8) from
>> alternative (1/1), will retry while more alternatives are available.*
>> *org.apache.flink.runtime.state.BackendBuildingException: Caught
>> unexpected exception.*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:362)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]*
>> * at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.l

RocksDB StateBuilder unexpected exception

2021-03-19 Thread dhanesh arole
) ~[?:?]*
* at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]*
* at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]*
* at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]*
* at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]*
* at
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141)
~[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
~[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
~[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
~[?:?]*
* at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
~[?:?]*
* at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
~[?:?]*


-
Dhanesh Arole


Re: Flink grpc-netty-shaded NoClassDefFoundError

2019-10-22 Thread dhanesh arole
Just to give you more context,

We are using `com.google.cloud.bigtable` as well in our job dep. Could it
be due to shaded plugin issue with `bigtable-hbase-2.x` ?


-
Dhanesh Arole ( Sent from mobile device. Pardon me for typos )



On Tue, Oct 22, 2019 at 2:06 PM dhanesh arole 
wrote:

> Hello all,
>
> We are running on Flink 1.6.2 and have a couple of streaming jobs running
> on it. We are intermittently seeing  *Java.lang.NoClassDefFoundError* with
> below stack trace[1]. The Flink job was working fine but due to recent this
> errors, task managers are now frequently crashing and causing to restart
> the job. We haven't changed any dep version / Flink version, so we are not
> sure if it's related to version mismatch. This is only happening with
> classes from *io.grpc.netty.shaded* package. Even the classes that
> FlinkClassLoader is not able to find are actually present in fat jar built
> using shadowJar - (validated by looking at *jar tf flink-job.jar).*
>
> Has anyone faced such issue before? Any pointers on how to debug this
> would be really helpful.
>
> I am attaching output of './gradlew dep' as well for reference, so as to
> validate if we don't have some wrong or flaky dependencies.
>
> [1] Stack trace: Task manager Java.lang.NoClassDefFoundError
>
> "java.lang.NoClassDefFoundError:
> io/grpc/netty/shaded/io/netty/channel/AbstractChannel$AbstractUnsafe$8
> at
> io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.deregister(AbstractChannel.java:817)
> at
> io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.fireChannelInactiveAndDeregister(AbstractChannel.java:777)
> at
> io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:760)
> at
> io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:615)
> at
> io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.closeOnRead(AbstractNioByteChannel.java:105)
> at
> io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:171)
> at
> io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
> at
> io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
> at
> io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
> at
> io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
> at
> io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
> at
> io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassNotFoundException:
> io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe$8
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 13 more
> "
>


Flink cluster on k8s with rocksdb state backend

2019-10-17 Thread dhanesh arole
Hello all,

I am trying to provision a Flink cluster on k8s. Some of the jobs in our
existing cluster use RocksDB state backend. I wanted to take a look at the
Flink helm chart or deployment manifests that provision task managers with
dynamic PV and how they manage it. We are running on kops managed k8s
cluster on AWS (!EKS). Also, some pointers on expected pain points,
surprises, monitoring strategies would be really helpful.


Thanks & Regards
-
Dhanesh