Re: Re: Checkpoint Error
Hi Yun, Thanks for the response. I checked the mounts and only the JM's and TM's are mounted with this EFS. Not sure how to debug this. Thanks On Sun, Mar 7, 2021 at 8:29 PM Yun Gao wrote: > Hi Navneeth, > > It seems from the stack that the exception is caused by the underlying EFS > problems ? Have you checked > if there are errors reported for EFS, or if there might be duplicate > mounting for the same EFS and others > have ever deleted the directory? > > Best, > Yun > > > --Original Mail -- > *Sender:*Navneeth Krishnan > *Send Date:*Sun Mar 7 15:44:59 2021 > *Recipients:*user > *Subject:*Re: Checkpoint Error > >> Hi All, >> >> Any suggestions? >> >> Thanks >> >> On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan < >> reachnavnee...@gmail.com> wrote: >> >>> Hi All, >>> >>> We are running our streaming job on flink 1.7.2 and we are noticing the >>> below error. Not sure what's causing it, any pointers would help. We have >>> 10 TM's checkpointing to AWS EFS. >>> >>> AsynchronousException{java.lang.Exception: Could not materialize checkpoint >>> 11 for operator Processor -> Sink: KafkaSink (34/42).}at >>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)at >>> >>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)at >>> >>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)at >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at >>> java.util.concurrent.FutureTask.run(FutureTask.java:266)at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at >>> java.lang.Thread.run(Thread.java:748)Caused by: java.lang.Exception: Could >>> not materialize checkpoint 11 for operator Processor -> Sink: KafkaSink >>> (34/42).at >>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)... >>> 6 moreCaused by: java.util.concurrent.ExecutionException: >>> java.io.IOException: Could not flush and close the file system output >>> stream to >>> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d >>> in order to obtain the stream state handleat >>> java.util.concurrent.FutureTask.report(FutureTask.java:122)at >>> java.util.concurrent.FutureTask.get(FutureTask.java:192)at >>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)at >>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)at >>> >>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)... >>> 5 moreCaused by: java.io.IOException: Could not flush and close the file >>> system output stream to >>> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d >>> in order to obtain the stream state handleat >>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)at >>> >>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)at >>> >>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)at >>> >>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)at >>> java.util.concurrent.FutureTask.run(FutureTask.java:266)at >>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)... >>> 7 moreCaused by: java.io.IOException: Stale file handleat >>> java.io.FileOutputStream.close0(Native Method)at >>> java.io.FileOutputStream.access$000(FileOutputStream.java:53)at >>> java.io.FileOutputStream$1.close(FileOutputStream.java:356)at >>> java.io.FileDescriptor.closeAll(FileDescriptor.java:212)at >>> java.io.FileOutputStream.close(FileOutputStream.java:354)at >>> org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)at >>> >>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)at >>> >>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)... >>> 12 more >>> >>> >>> Thanks >>> >>>
Re: Checkpoint Error
Hi All, Any suggestions? Thanks On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan wrote: > Hi All, > > We are running our streaming job on flink 1.7.2 and we are noticing the > below error. Not sure what's causing it, any pointers would help. We have > 10 TM's checkpointing to AWS EFS. > > AsynchronousException{java.lang.Exception: Could not materialize checkpoint > 11 for operator Processor -> Sink: KafkaSink (34/42).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 11 for > operator Processor -> Sink: KafkaSink (34/42). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) > ... 6 more > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) > ... 5 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696) > at > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50) > ... 7 more > Caused by: java.io.IOException: Stale file handle > at java.io.FileOutputStream.close0(Native Method) > at java.io.FileOutputStream.access$000(FileOutputStream.java:53) > at java.io.FileOutputStream$1.close(FileOutputStream.java:356) > at java.io.FileDescriptor.closeAll(FileDescriptor.java:212) > at java.io.FileOutputStream.close(FileOutputStream.java:354) > at > org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62) > at > org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312) > ... 12 more > > > Thanks > >
Checkpoint Error
Hi All, We are running our streaming job on flink 1.7.2 and we are noticing the below error. Not sure what's causing it, any pointers would help. We have 10 TM's checkpointing to AWS EFS. AsynchronousException{java.lang.Exception: Could not materialize checkpoint 11 for operator Processor -> Sink: KafkaSink (34/42).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 11 for operator Processor -> Sink: KafkaSink (34/42). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) ... 6 more Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) ... 5 more Caused by: java.io.IOException: Could not flush and close the file system output stream to file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d in order to obtain the stream state handle at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767) at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50) ... 7 more Caused by: java.io.IOException: Stale file handle at java.io.FileOutputStream.close0(Native Method) at java.io.FileOutputStream.access$000(FileOutputStream.java:53) at java.io.FileOutputStream$1.close(FileOutputStream.java:356) at java.io.FileDescriptor.closeAll(FileDescriptor.java:212) at java.io.FileOutputStream.close(FileOutputStream.java:354) at org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62) at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312) ... 12 more Thanks
Re: CICD
Thanks Vikash for the response. Yes thats very much feasible but we are planning to move to job/application cluster model where in the artifacts are bundled inside the container. When there is a new container image then we might have to do the following. - Take a savepoint - Upgrade the JM and TM container images and provide the save point path during start up I would like to know if this is the standard way or if there are some better options. We currently use terraform for managing the infrastructure and it would be greatly helpful if someone has already done this. Thanks On Sun, Jan 3, 2021 at 4:17 PM Vikash Dat wrote: > Could you not use the JM web address to utilize the rest api? You can > start/stop/save point/restore + upload new jars via the rest api. While I > did not run on ECS( ran on EMR) I was able to use the rest api to do > deployments. > > On Sun, Jan 3, 2021 at 19:09 Navneeth Krishnan > wrote: > >> Hi All, >> >> Currently we are using flink in session cluster mode and we manually >> deploy the jobs i.e. through the web UI. We use AWS ECS for running the >> docker container with 2 services definitions, one for JM and other for TM. >> How is everyone managing the CICD process? Is there a better way to run a >> job in job cluster mode and use jenkins to perform CICD? >> >> Any pointers on how this is being done would really help and greatly >> appreciated. >> >> Thanks, >> Navneeth >> >
CICD
Hi All, Currently we are using flink in session cluster mode and we manually deploy the jobs i.e. through the web UI. We use AWS ECS for running the docker container with 2 services definitions, one for JM and other for TM. How is everyone managing the CICD process? Is there a better way to run a job in job cluster mode and use jenkins to perform CICD? Any pointers on how this is being done would really help and greatly appreciated. Thanks, Navneeth
Tumbling Time Window
Hello All, First of all Happy New Year!! Thanks for the excellent community support. I have a job which requires a 2 seconds tumbling time window per key, For each user we wait for 2 seconds to collect enough data and proceed to further processing. My question is should I use the regular DSL windowing or write a custom process function which does the windowing. I have heard that the DSL window has more overhead versus the custom window function. What do you guys suggest and can someone provide an example of custom window function per key. Also given the window time is very less (2 secs) would there be more overhead in firing so many timers for each key? Thanks! Regards, Navneeth
Re: Caching
Thanks Dongwon. It was extremely helpful. I didn't quite understand how async io can be used here. It would be great if you can share some info on it. Also how are you propagating any changes to values? Regards, Navneeth On Thu, Nov 26, 2020 at 6:26 AM Dongwon Kim wrote: > Oops, I forgot to mention that when doing bulk insert into Redis, you'd > better open a pipeline with a 'transaction' property set to False [1]. > > Otherwise, API calls from your Flink job will be timeout. > > [1] https://github.com/andymccurdy/redis-py#pipelines > > On Thu, Nov 26, 2020 at 11:09 PM Dongwon Kim > wrote: > >> Hi Navneeth, >> >> I reported a similar issue to yours before [1] but I took the >> broadcasting approach at first. >> >> As you already anticipated, broadcasting is going to use more memory than >> your current approach based on a static object on each TM . >> >> And the broadcasted data will be treated as operator state and will be >> periodically checkpointed with serialization overhead & garbage collections. >> These are not negligible at all if you're not carefully choosing >> serialization strategy as explained in [2]. >> Even with the proper one, I've experienced mild back pressure whenever >> - checkpoint is in progress (AFAIK, incremental checkpoint has nothing to >> do with operator states) >> - cache is being broadcasted >> >> For that reason, I decided to populate data on Redis but it also calls >> for design decisions: >> - which Java client to use? Jedis [3]? Lettuce [4]? >> - how to invoke APIs calls inside Flink? synchronously or asynchronously? >> >> Currently I'm very satisfied with Lettuce with Flink's async io [5] with >> very small memory footprint and without worrying about serialization >> overhead and garbage collections. >> Lettuce supports asynchronous communication so it works perfectly with >> Flink's async io. >> I bet you'll be very disappointed with invoking Jedis synchronously >> inside ProcessFunction. >> >> Best, >> >> Dongwon >> >> [1] >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-share-large-data-across-task-managers-td38231.html >> [2] >> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html >> [3] https://github.com/redis/jedis >> [4] https://lettuce.io/ >> [5] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html >> >> On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan < >> reachnavnee...@gmail.com> wrote: >> >>> Hi All, >>> >>> We have a flink streaming job processing around 200k events per second. >>> The job requires a lot of less frequently changing data (sort of static but >>> there will be some changes over time, say 5% change once per day or so). >>> There are about 12 caches with some containing approximately 20k >>> entries whereas a few with about 2 million entries. >>> >>> In the current implementation we are using in-memory lazy loading static >>> cache to populate the data and the initialization happens in open function. >>> The reason to choose this approach is because we have allocated around 4GB >>> extra memory per TM for these caches and if a TM has 6 slots the cache can >>> be shared. >>> >>> Now the issue we have with this approach is everytime when a container >>> is restarted or a new job is deployed it has to populate the cache again. >>> Sometimes this lazy loading takes a while and it causes back pressure as >>> well. We were thinking to move this logic to the broadcast stream but since >>> the data has to be stored per slot it would increase the memory consumption >>> by a lot. >>> >>> Another option that we were thinking of is to replace the current near >>> far cache that uses rest api to load the data to redis based near far >>> cache. This will definitely reduce the overall loading time but still not >>> the perfect solution. >>> >>> Are there any recommendations on how this can be achieved effectively? >>> Also how is everyone overcoming this problem? >>> >>> Thanks, >>> Navneeth >>> >>>
Caching
Hi All, We have a flink streaming job processing around 200k events per second. The job requires a lot of less frequently changing data (sort of static but there will be some changes over time, say 5% change once per day or so). There are about 12 caches with some containing approximately 20k entries whereas a few with about 2 million entries. In the current implementation we are using in-memory lazy loading static cache to populate the data and the initialization happens in open function. The reason to choose this approach is because we have allocated around 4GB extra memory per TM for these caches and if a TM has 6 slots the cache can be shared. Now the issue we have with this approach is everytime when a container is restarted or a new job is deployed it has to populate the cache again. Sometimes this lazy loading takes a while and it causes back pressure as well. We were thinking to move this logic to the broadcast stream but since the data has to be stored per slot it would increase the memory consumption by a lot. Another option that we were thinking of is to replace the current near far cache that uses rest api to load the data to redis based near far cache. This will definitely reduce the overall loading time but still not the perfect solution. Are there any recommendations on how this can be achieved effectively? Also how is everyone overcoming this problem? Thanks, Navneeth
Re: Job Restart Failure
Hi All, Any feedback on how this can be resolved? This is causing downtime in production. Thanks On Tue, Oct 20, 2020 at 4:39 PM Navneeth Krishnan wrote: > Hi All, > > I'm facing an issue in our flink application. This happens in version > 1.4.0 and 1.7.2. We have both versions and we are seeing this problem on > both. We are running flink on ECS and checkpointing enabled to EFS. When > the pipeline restarts due to some node failure or any other reason, it just > keeps restarting until the retry attempts without this same error message. > When I checked the EFS volume I do see the file is still available but for > some reason flink is unable to recover the job. Any pointers will help. > Thanks > > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkException: Could not restore operator > state backend for StreamSource_cbc357ccb763df2852fee8c4fc7d55f2_(14/18) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:245) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143) > ... 5 more > Caused by: java.io.FileNotFoundException: > /mnt/checkpoints/150dee2a70cecdd41b63a06b42a95649/chk-52/76363f89-d19f-44aa-aaf9-b33d89ec7c6c > (No such file or directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at > org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50) > at > org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85) > at > org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) > at > org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:286) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:62) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) > ... 7 more > > > *EFS:* > > > [image: image.png] > > > > Thanks > >
Job Restart Failure
Hi All, I'm facing an issue in our flink application. This happens in version 1.4.0 and 1.7.2. We have both versions and we are seeing this problem on both. We are running flink on ECS and checkpointing enabled to EFS. When the pipeline restarts due to some node failure or any other reason, it just keeps restarting until the retry attempts without this same error message. When I checked the EFS volume I do see the file is still available but for some reason flink is unable to recover the job. Any pointers will help. Thanks java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_cbc357ccb763df2852fee8c4fc7d55f2_(14/18) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:245) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143) ... 5 more Caused by: java.io.FileNotFoundException: /mnt/checkpoints/150dee2a70cecdd41b63a06b42a95649/chk-52/76363f89-d19f-44aa-aaf9-b33d89ec7c6c (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.(FileInputStream.java:138) at org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50) at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85) at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) at org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:286) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:62) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) ... 7 more *EFS:* [image: image.png] Thanks
Adaptive load balancing
Hi All, We are currently using flink in production and use keyBy for performing a CPU intensive computation. There is a cache lookup for a set of keys and since keyBy cannot guarantee the data is sent to a single node we are basically replicating the cache on all nodes. This is causing more memory problems for us and we would like to explore some options to mitigate the current limitations. Is there a way to group a set of keys and send to a set of nodes so that we don't have to replicate the cache data on all nodes? Has someone tried implementing hashing with adaptive load balancing so that if a node is busy processing then the data can be routed effectively to other nodes which are free. Any suggestions are greatly appreciated. Thanks
Flink Migration
Hi All, We are currently on a very old version of flink 1.4.0 and it has worked pretty well. But lately we have been facing checkpoint timeout issues. We would like to minimize any changes to the current pipelines and go ahead with the migration. With that said our first pick was to migrate to 1.5.6 and later migrate to a newer version. Do you guys think a more recent version like 1.6 or 1.7 might work? We did try 1.8 but it requires some changes in the pipelines. When we tried 1.5.6 with docker compose we were unable to get the task manager attached to jobmanager. Are there some specific configurations required for newer versions? Logs: 8-28 07:36:30.834 [main] INFO org.apache.flink.runtime.util.LeaderRetrievalUtils - TaskManager will try to connect for 1 milliseconds before falling back to heuristics 2020-08-28 07:36:30.853 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Retrieved new target address jobmanager/172.21.0.8:6123. 2020-08-28 07:36:31.279 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Trying to connect to address jobmanager/172.21.0.8:6123 2020-08-28 07:36:31.280 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from address 'e6f9104cdc61/172.21.0.9': Connection refused (Connection refused) 2020-08-28 07:36:31.281 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from address '/172.21.0.9': Connection refused (Connection refused) 2020-08-28 07:36:31.281 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from address '/172.21.0.9': Connection refused (Connection refused) 2020-08-28 07:36:31.282 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from address '/127.0.0.1': Invalid argument (connect failed) 2020-08-28 07:36:31.283 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from address '/172.21.0.9': Connection refused (Connection refused) 2020-08-28 07:36:31.284 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from address '/127.0.0.1': Invalid argument (connect failed) 2020-08-28 07:36:31.684 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Trying to connect to address jobmanager/172.21.0.8:6123 2020-08-28 07:36:31.686 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from address 'e6f9104cdc61/172.21.0.9': Connection refused (Connection refused) 2020-08-28 07:36:31.687 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from address '/172.21.0.9': Connection refused (Connection refused) 2020-08-28 07:36:31.688 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from address '/172.21.0.9': Connection refused (Connection refused) 2020-08-28 07:36:31.688 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from address '/127.0.0.1': Invalid argument (connect failed) 2020-08-28 07:36:31.689 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from address '/172.21.0.9': Connection refused (Connection refused) 2020-08-28 07:36:31.690 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from address '/127.0.0.1': Invalid argument (connect failed) 2020-08-28 07:36:32.490 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Trying to connect to address jobmanager/172.21.0.8:6123 2020-08-28 07:36:32.491 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from address 'e6f9104cdc61/172.21.0.9': Connection refused (Connection refused) 2020-08-28 07:36:32.493 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from address '/172.21.0.9': Connection refused (Connection refused) 2020-08-28 07:36:32.494 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from address '/172.21.0.9': Connection refused (Connection refused) 2020-08-28 07:36:32.495 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from address '/127.0.0.1': Invalid argument (connect failed) 2020-08-28 07:36:32.496 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from address '/172.21.0.9': Connection refused (Connection refused) 2020-08-28 07:36:32.497 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Failed to connect from address '/127.0.0.1': Invalid argument (connect failed) 2020-08-28 07:36:34.099 [main] INFO org.apache.flink.runtime.net.ConnectionUtils - Trying to connect to address jobmanager/172.21.0.8:6123 2020-08-28 07:36:34.100 [main] INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner - TaskManager will use hostname/address 'e6f9104cdc61' (172.21.0.9) for communication. Flink Conf jobmanager.rpc.address: jobmanager rest.address: jobmanager Thanks
Tumbling window per key
Hi All, I was looking at the documentation for windows and got a little confused. As per my understanding tumbling window per key will create a non overlapping window based on when the data for that key arrived. For example consider a tumbling window of 30 seconds user1 - 10:01:01 user2 - 10:01:02 user1 - 10:01:05 user2 - 10:01:06 user2 - 10:01:08 Result: user1 (10:01:01 & 10:01:05) user2 (10:01:02 & 10:01:06) user2 (10:01:08) Is this the right understanding? But as per the below image in docs, it looks like the window is not per key, please correct me if i'm wrong. [image: image.png] Thanks
Re: Task Assignment
Hi Marta, Thanks for you response. What I'm looking for is something like data localization. If I have one TM which is processing a set of keys, I want to ensure all keys of the same type goes to the same TM rather than using hashing to find the downstream slot. I could use a common key to do this but I would have to parallelize as much as possible since the number of incoming messages is too large to narrow down to a single key and processing it. Thanks On Thu, Apr 23, 2020 at 2:02 AM Marta Paes Moreira wrote: > Hi, Navneeth. > > If you *key* your stream using stream.keyBy(…), this will logically split > your input and all the records with the same key will be processed in the > same operator instance. This is the default behavior in Flink for keyed > streams and transparently handled. > > You can read more about it in the documentation [1]. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-state-and-operator-state > > On Thu, Apr 23, 2020 at 7:44 AM Navneeth Krishnan < > reachnavnee...@gmail.com> wrote: > >> Hi All, >> >> Is there a way for an upstream operator to know how the downstream >> operator tasks are assigned? Basically I want to group my messages to be >> processed on slots in the same node based on some key. >> >> Thanks >> >
Task Assignment
Hi All, Is there a way for an upstream operator to know how the downstream operator tasks are assigned? Basically I want to group my messages to be processed on slots in the same node based on some key. Thanks
Re: Flink
Thanks a lot Timo. I will take a look at it. But does flink automatically scale up and down at this point with native integration? Thanks On Tue, Apr 14, 2020 at 11:27 PM Timo Walther wrote: > Hi Navneeth, > > it might be also worth to look into Ververica Plaform for this. The > community edition was published recently is free of charge. It provides > first class K8s support [1]. > > There is also a tutorial how to deploy it on EKS [2] (not the most > recent one through). > > Regards, > Timo > > [1] > > https://www.ververica.com/blog/announcing-ververica-platform-community-edition?utm_campaign=Ververica%20Platform%20-%20Community%20Edition&utm_content=123140986&utm_medium=social&utm_source=twitter&hss_channel=tw-2581958070 > [2] > > https://www.ververica.com/blog/how-to-get-started-with-data-artisans-platform-on-aws-eks > > > > On 15.04.20 03:57, Navneeth Krishnan wrote: > > Hi All, > > > > I'm very new to EKS and trying to deploy a flink job in cluster mode. > > Are there any good documentations on what are the steps to deploy on EKS? > > > > From my understanding, with flink 1.10 running it on EKS will > > automatically scale up and down with kubernetes integration based on the > > load. Is this correct? Do I have to do enable some configs to support > > this feature? > > > > How to use the lyft k8s operator when deploying on EKS? > > > > Thanks a lot, appreciate all the help. > > > > > > > > > >
Flink
Hi All, I'm very new to EKS and trying to deploy a flink job in cluster mode. Are there any good documentations on what are the steps to deploy on EKS? >From my understanding, with flink 1.10 running it on EKS will automatically scale up and down with kubernetes integration based on the load. Is this correct? Do I have to do enable some configs to support this feature? How to use the lyft k8s operator when deploying on EKS? Thanks a lot, appreciate all the help.
Re: Using s3 for checkpointing
Thanks David. It worked after adding the jar inside a folder. On Sat, Feb 1, 2020 at 2:37 AM David Magalhães wrote: > Did you put each inside a different folder with their name? Like > /opt/flink/plugins/s3-fs-presto/flink-s3-fs-presto-1.9.1.jar ? > > check > https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html > > On Sat, Feb 1, 2020, 07:42 Navneeth Krishnan > wrote: > >> Hi Arvid, >> >> Thanks for the response. >> >> I have both the jars under /opt/flink/plugins but I'm still getting the >> same error message. Also can someone please provide some pointers on how >> entropy works. How should I setup the directory structure? >> >> In the link that you have provided there is a aws-credentials.jar file, >> not sure where to get the jar since I don't see it in maven repo. >> >> *Plugins:* >> >> flink-s3-fs-hadoop-1.9.1.jar >> >> flink-s3-fs-presto-1.9.1.jar >> >> >> *flink-conf:* >> >> high-availability.storageDir: s3p://.../recovery >> >> >> >> *Error message:* >> >> Shutting StandaloneSessionClusterEntrypoint down with application status >> FAILED. Diagnostics java.io.IOException: Could not create FileSystem for >> highly available storage (high-availability.storageDir) >> >> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: >> Could not find a file system implementation for scheme 's3p'. The scheme is >> not directly supported by Flink and no Hadoop file system to support this >> scheme could be loaded. >> >> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: >> Hadoop is not in the classpath/dependencies. >> >> >> Thanks >> >> >> >> On Thu, Jan 30, 2020 at 2:37 AM Arvid Heise wrote: >> >>> Hi Navneeth, >>> >>> did you follow the plugin folder structure? [1] >>> >>> There is another plugin called flink-s3-fs-presto that you can use. >>> If you want to use both plugins, use s3a:// for s3-fs-hadoop (output) >>> and s3p:// for s3-fs-presto (checkpointing). >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-master/ops/plugins.html#isolation-and-plugin-structure >>> >>> On Thu, Jan 30, 2020 at 10:26 AM Navneeth Krishnan < >>> reachnavnee...@gmail.com> wrote: >>> >>>> Hi All, >>>> >>>> I'm trying to migrate from NFS to S3 for checkpointing and I'm facing >>>> few issues. I have flink running in docker with flink-s3-fs-hadoop jar >>>> copied to plugins folder. Even after having the jar I'm getting the >>>> following error: Caused by: >>>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is >>>> not in the classpath/dependencies. Am I missing something? >>>> >>>> In the documentation it says "Presto is the recommended file system >>>> for checkpointing to S3". How can I enable this? Is there a specific >>>> configuration that I need to do for this? >>>> >>>> Also, I couldn't figure out how the entropy injection works. Should I >>>> create the bucket with checkpoints folder and flink will automatically >>>> inject an entropy and create a per job checkpoint folder or should I create >>>> it? >>>> >>>> bucket/checkpoints/_entropy_/dashboard-job/ >>>> >>>> s3.entropy.key: _entropy_ >>>> s3.entropy.length: 4 (default) >>>> >>>> Thanks >>>> >>>
Re: Using s3 for checkpointing
Hi Arvid, Thanks for the response. I have both the jars under /opt/flink/plugins but I'm still getting the same error message. Also can someone please provide some pointers on how entropy works. How should I setup the directory structure? In the link that you have provided there is a aws-credentials.jar file, not sure where to get the jar since I don't see it in maven repo. *Plugins:* flink-s3-fs-hadoop-1.9.1.jar flink-s3-fs-presto-1.9.1.jar *flink-conf:* high-availability.storageDir: s3p://.../recovery *Error message:* Shutting StandaloneSessionClusterEntrypoint down with application status FAILED. Diagnostics java.io.IOException: Could not create FileSystem for highly available storage (high-availability.storageDir) Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies. Thanks On Thu, Jan 30, 2020 at 2:37 AM Arvid Heise wrote: > Hi Navneeth, > > did you follow the plugin folder structure? [1] > > There is another plugin called flink-s3-fs-presto that you can use. > If you want to use both plugins, use s3a:// for s3-fs-hadoop (output) and > s3p:// for s3-fs-presto (checkpointing). > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/ops/plugins.html#isolation-and-plugin-structure > > On Thu, Jan 30, 2020 at 10:26 AM Navneeth Krishnan < > reachnavnee...@gmail.com> wrote: > >> Hi All, >> >> I'm trying to migrate from NFS to S3 for checkpointing and I'm facing few >> issues. I have flink running in docker with flink-s3-fs-hadoop jar >> copied to plugins folder. Even after having the jar I'm getting the >> following error: Caused by: >> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is >> not in the classpath/dependencies. Am I missing something? >> >> In the documentation it says "Presto is the recommended file system for >> checkpointing to S3". How can I enable this? Is there a specific >> configuration that I need to do for this? >> >> Also, I couldn't figure out how the entropy injection works. Should I >> create the bucket with checkpoints folder and flink will automatically >> inject an entropy and create a per job checkpoint folder or should I create >> it? >> >> bucket/checkpoints/_entropy_/dashboard-job/ >> >> s3.entropy.key: _entropy_ >> s3.entropy.length: 4 (default) >> >> Thanks >> >
Using s3 for checkpointing
Hi All, I'm trying to migrate from NFS to S3 for checkpointing and I'm facing few issues. I have flink running in docker with flink-s3-fs-hadoop jar copied to plugins folder. Even after having the jar I'm getting the following error: Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies. Am I missing something? In the documentation it says "Presto is the recommended file system for checkpointing to S3". How can I enable this? Is there a specific configuration that I need to do for this? Also, I couldn't figure out how the entropy injection works. Should I create the bucket with checkpoints folder and flink will automatically inject an entropy and create a per job checkpoint folder or should I create it? bucket/checkpoints/_entropy_/dashboard-job/ s3.entropy.key: _entropy_ s3.entropy.length: 4 (default) Thanks
Re: Using redis cache in flink
Hi Yun, Thanks for the update. I can definitely use a redis cluster but what I don't understand is if I use a custom operator then redis cache will instantiated per operator instance. What I would like to ideally have is one redis cache instance per TM JVM. Since there isn't anyway to share data between task slots today in flink, I would like to use this approach to basically share common data. What I'm not sure is how can I ensure just one cache instance per TM JVM is created? Regards On Wed, Jan 8, 2020 at 12:46 AM Yun Tang wrote: > Hi Navneeth > > If you need the redis cache to be fault tolerant, I am afraid you have to > choose redis cluster since Flink might deploy task on another node which is > different from previous node after job failover. > > If you don't care about the fault tolerance, you could implement a > customized operator which launch redis. > > By the way, there existed a way to combine objects on heap in memory with > checkpoint mechanism to ensure fault tolerance, you could refer to [1] and > [2]. The basic idea is to cac > > [1] > https://github.com/apache/flink/blob/9df5c80e7e729f49595ef6814462165831fd1307/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java#L147 > [2] > https://github.com/apache/flink/blob/9df5c80e7e729f49595ef6814462165831fd1307/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchLocalGroupAggFunction.java#L89 > > > -- > *From:* Navneeth Krishnan > *Sent:* Wednesday, January 8, 2020 15:36 > *To:* Yun Tang > *Cc:* user > *Subject:* Re: Using redis cache in flink > > Hi Yun, > > Thanks, the way I want to use redis is like a cache not as state backend. > I would still have rocksdb state backend for other states. The reason to > use cache instead of managed state is because I’d get around 10k msgs per > task slot and I don’t have to get the state from rocksdb for each lookup. > In memory cache would be fine but to rebuild the state I want to use redis. > > Regards > > On Tue, Jan 7, 2020 at 11:21 PM Yun Tang wrote: > > Hi Navneeth > > If you wrap redis as a state backend, you cannot easily share data across > slots as Flink construct state backend per operator with local thread only. > > If you use a redis cluster as a externalized service to store your data, > you can share data across slots easily. However, compared with the reduced > cost of serialization, the introduce of network communicate cannot be > ignored. There exists trade-off here, and we cannot ensure there would be a > performance gain. Actually, I prefer the time used in CPU serialization is > much less than the time consumed through the network. > > Best > Yun Tang > -- > *From:* Navneeth Krishnan > *Sent:* Wednesday, January 8, 2020 12:33 > *To:* user > *Subject:* Using redis cache in flink > > Hi All, > > I want to use redis as near far cache to store data which are common > across slots i.e. share data across slots. This data is required for > processing every single message and it's better to store in a in memory > cache backed by redis rather than rocksdb since it has to be serialized for > every single get call. Do you guys think this is good solution or is there > any other better solution? Also, Is there any reference on how I can create > a centralized near far cache since the job and operators are distributed by > the job manager. > > Thanks > >
Re: Using redis cache in flink
Hi Yun, Thanks, the way I want to use redis is like a cache not as state backend. I would still have rocksdb state backend for other states. The reason to use cache instead of managed state is because I’d get around 10k msgs per task slot and I don’t have to get the state from rocksdb for each lookup. In memory cache would be fine but to rebuild the state I want to use redis. Regards On Tue, Jan 7, 2020 at 11:21 PM Yun Tang wrote: > Hi Navneeth > > If you wrap redis as a state backend, you cannot easily share data across > slots as Flink construct state backend per operator with local thread only. > > If you use a redis cluster as a externalized service to store your data, > you can share data across slots easily. However, compared with the reduced > cost of serialization, the introduce of network communicate cannot be > ignored. There exists trade-off here, and we cannot ensure there would be a > performance gain. Actually, I prefer the time used in CPU serialization is > much less than the time consumed through the network. > > Best > Yun Tang > -- > *From:* Navneeth Krishnan > *Sent:* Wednesday, January 8, 2020 12:33 > *To:* user > *Subject:* Using redis cache in flink > > Hi All, > > I want to use redis as near far cache to store data which are common > across slots i.e. share data across slots. This data is required for > processing every single message and it's better to store in a in memory > cache backed by redis rather than rocksdb since it has to be serialized for > every single get call. Do you guys think this is good solution or is there > any other better solution? Also, Is there any reference on how I can create > a centralized near far cache since the job and operators are distributed by > the job manager. > > Thanks >
Using redis cache in flink
Hi All, I want to use redis as near far cache to store data which are common across slots i.e. share data across slots. This data is required for processing every single message and it's better to store in a in memory cache backed by redis rather than rocksdb since it has to be serialized for every single get call. Do you guys think this is good solution or is there any other better solution? Also, Is there any reference on how I can create a centralized near far cache since the job and operators are distributed by the job manager. Thanks
Re: Checkpoints issue and job failing
Thanks Vino & Piotr, sure, will upgrade the flink version and monitor it to see if the problem still exist. Thanks On Mon, Jan 6, 2020 at 12:39 AM Piotr Nowojski wrote: > Hi, > > From the top of my head I don’t remember anything particular, however > release 1.4.0 came with quite a lot of deep change which had it’s fair > share number of bugs, that were subsequently fixed in later releases. > > Because 1.4.x tree is no longer supported I would strongly recommend to > first upgrade to a more recent Flink version. If that’s not possible, I > would at least upgrade to the latest release from 1.4.x tree (1.4.2). > > Piotrek > > On 6 Jan 2020, at 07:25, vino yang wrote: > > Hi Navneeth, > > Since the file still exists, this exception is very strange. > > I want to ask, does it happen by accident or frequently? > > Another concern is that since the 1.4 version is very far away, all > maintenance and response are not as timely as the recent versions. I > personally recommend upgrading as soon as possible. > > I can ping @Piotr Nowojski and see if it is > possible to explain the cause of this problem. > > Best, > Vino > > Navneeth Krishnan 于2020年1月4日周六 上午1:03写道: > >> Thanks Congxian & Vino. >> >> Yes, the file do exist and I don't see any problem in accessing it. >> >> Regarding flink 1.9, we haven't migrated yet but we are planning to do. >> Since we have to test it might take sometime. >> >> Thanks >> >> On Fri, Jan 3, 2020 at 2:14 AM Congxian Qiu >> wrote: >> >>> Hi >>> >>> Do you have ever check that this problem exists on Flink 1.9? >>> >>> Best, >>> Congxian >>> >>> >>> vino yang 于2020年1月3日周五 下午3:54写道: >>> >>>> Hi Navneeth, >>>> >>>> Did you check if the path contains in the exception is really can not >>>> be found? >>>> >>>> Best, >>>> Vino >>>> >>>> Navneeth Krishnan 于2020年1月3日周五 上午8:23写道: >>>> >>>>> Hi All, >>>>> >>>>> We are running into checkpoint timeout issue more frequently in >>>>> production and we also see the below exception. We are running flink 1.4.0 >>>>> and the checkpoints are saved on NFS. Can someone suggest how to overcome >>>>> this? >>>>> >>>>> >>>>> >>>>> java.lang.IllegalStateException: Could not initialize operator state >>>>> backend. >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302) >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >>>>> at java.lang.Thread.run(Thread.java:748) >>>>> Caused by: java.io.FileNotFoundException: >>>>> /mnt/checkpoints/02c4f8d5c11921f363b98c5959cc4f06/chk-101/e71d8eaf-ff4a-4783-92bd-77e3d8978e01 >>>>> (No such file or directory) >>>>> at java.io.FileInputStream.open0(Native Method) >>>>> at java.io.FileInputStream.open(FileInputStream.java:195) >>>>> at java.io.FileInputStream.(FileInputStream.java:138) >>>>> at >>>>> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50) >>>>> >>>>> >>>>> Thanks >>>>> >>>>> >
Re: Checkpoints issue and job failing
Thanks Congxian & Vino. Yes, the file do exist and I don't see any problem in accessing it. Regarding flink 1.9, we haven't migrated yet but we are planning to do. Since we have to test it might take sometime. Thanks On Fri, Jan 3, 2020 at 2:14 AM Congxian Qiu wrote: > Hi > > Do you have ever check that this problem exists on Flink 1.9? > > Best, > Congxian > > > vino yang 于2020年1月3日周五 下午3:54写道: > >> Hi Navneeth, >> >> Did you check if the path contains in the exception is really can not be >> found? >> >> Best, >> Vino >> >> Navneeth Krishnan 于2020年1月3日周五 上午8:23写道: >> >>> Hi All, >>> >>> We are running into checkpoint timeout issue more frequently in >>> production and we also see the below exception. We are running flink 1.4.0 >>> and the checkpoints are saved on NFS. Can someone suggest how to overcome >>> this? >>> >>> [image: image.png] >>> >>> java.lang.IllegalStateException: Could not initialize operator state >>> backend. >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: java.io.FileNotFoundException: >>> /mnt/checkpoints/02c4f8d5c11921f363b98c5959cc4f06/chk-101/e71d8eaf-ff4a-4783-92bd-77e3d8978e01 >>> (No such file or directory) >>> at java.io.FileInputStream.open0(Native Method) >>> at java.io.FileInputStream.open(FileInputStream.java:195) >>> at java.io.FileInputStream.(FileInputStream.java:138) >>> at >>> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50) >>> >>> >>> Thanks >>> >>>
Checkpoints issue and job failing
Hi All, We are running into checkpoint timeout issue more frequently in production and we also see the below exception. We are running flink 1.4.0 and the checkpoints are saved on NFS. Can someone suggest how to overcome this? [image: image.png] java.lang.IllegalStateException: Could not initialize operator state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.FileNotFoundException: /mnt/checkpoints/02c4f8d5c11921f363b98c5959cc4f06/chk-101/e71d8eaf-ff4a-4783-92bd-77e3d8978e01 (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.(FileInputStream.java:138) at org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50) Thanks
Re: Flink vs Kafka streams
Thanks Congxian. Yes, its been very hard to manage the cluster and thats why we are trying to evaluate alternate choices. If anyone has found better methods to deploy and scale, it would be great to know so that we can adopt the same as well. Thanks On Fri, Nov 8, 2019 at 1:56 AM Congxian Qiu wrote: > Hi > > From your description, seems the big problem is scale in and out, and > there maybe a big downtime for trigger savepoint and restore from the > savepoint. > > Previously, we have proposed a feature named stop-with-checkpoint[1] same > as the stop-with-savepoint, but triggering a checkpoint instead of > savepoint, if you use incremental checkpoint, this can improve the speed > for much. Currently, as this feature did not merged, you can try to restore > from the retained checkpoint from previous job[2] > > For scale in and scale out, if the restore time cost too much, you can > measure the time of restore, if spends too much time on downloading states, > you can try the multi-thread download feature[3]. > > [1] https://issues.apache.org/jira/browse/FLINK-12619 > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint > [3] https://issues.apache.org/jira/browse/FLINK-10461 > Best, > Congxian > > > Navneeth Krishnan 于2019年11月8日周五 下午3:38写道: > >> Hello All, >> >> I have a streaming job running in production which is processing over 2 >> billion events per day and it does some heavy processing on each event. We >> have been facing some challenges in managing flink in production like >> scaling in and out, restarting the job with savepoint etc. Flink provides a >> lot of features which seemed as an obvious choice at that time but now with >> all the operational overhead we are thinking should we still use flink for >> our stream processing requirements or choose kafka streams. >> >> We currently deploy flink on ECR. Bringing up a new cluster for another >> stream job is too expensive but on the flip side running it on the same >> cluster becomes difficult since there are no ways to say this job has to be >> run on a dedicated server versus this can run on a shared instance. Also >> savepoint point, cancel and submit a new job results in some downtime. The >> most critical part being there is no shared state among all tasks sort of a >> global state. We sort of achieve this today using an external redis cache >> but that incurs cost as well. >> >> If we are moving to kafka streams, it makes our deployment life much >> easier, each new stream job will be a microservice that can scale >> independently. With global state it's much easier to share state without >> using external cache. But the disadvantage is we have to rely on the >> partitions for parallelism. Although this might initially sound easier, >> when we need to scale much higher this will become a bottleneck. >> >> Do you guys have any suggestions on this? We need to decide which way to >> move forward and any suggestions would be of much greater help. >> >> Thanks >> >
Flink vs Kafka streams
Hello All, I have a streaming job running in production which is processing over 2 billion events per day and it does some heavy processing on each event. We have been facing some challenges in managing flink in production like scaling in and out, restarting the job with savepoint etc. Flink provides a lot of features which seemed as an obvious choice at that time but now with all the operational overhead we are thinking should we still use flink for our stream processing requirements or choose kafka streams. We currently deploy flink on ECR. Bringing up a new cluster for another stream job is too expensive but on the flip side running it on the same cluster becomes difficult since there are no ways to say this job has to be run on a dedicated server versus this can run on a shared instance. Also savepoint point, cancel and submit a new job results in some downtime. The most critical part being there is no shared state among all tasks sort of a global state. We sort of achieve this today using an external redis cache but that incurs cost as well. If we are moving to kafka streams, it makes our deployment life much easier, each new stream job will be a microservice that can scale independently. With global state it's much easier to share state without using external cache. But the disadvantage is we have to rely on the partitions for parallelism. Although this might initially sound easier, when we need to scale much higher this will become a bottleneck. Do you guys have any suggestions on this? We need to decide which way to move forward and any suggestions would be of much greater help. Thanks
Re: ProcessFunction Timer
I can use filtering to do it but I preferred process function because I don't have to do a KeyBy again to do the windowing or use reinterpret. The problem I'm having is if I use the processFunction and registered a timer and before the timer is fired if I have more input records, how can I avoid creating more timers and just use one timer to collect the data and forward it. I was thinking about using a local variable but it wouldn't work across keys. The other approach is to have a value state to indicate if the timer is registered or not but I'm thinking is this the only way or is there a better approach? Thanks On Fri, Oct 18, 2019 at 6:19 AM Andrey Zagrebin wrote: > Hi Navneeth, > > You could also apply filtering on the incoming records before windowing. > This might save you some development effort but I do not know full details > of your requirement whether filtering is sufficient. > In general, you can use timers as you suggested as the windowing itself > works in a similar way. > > Best, > Andrey > > On Thu, Oct 17, 2019 at 11:10 PM Navneeth Krishnan < > reachnavnee...@gmail.com> wrote: > >> Hi All, >> >> I'm currently using a tumbling window of 5 seconds using >> TumblingTimeWindow but due to change in requirements I would not have to >> window every incoming data. With that said I'm planning to use process >> function to achieve this selective windowing. >> >> I looked at the example provided in the documentation and I'm not clear >> on how I can implement the windowing. >> >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html >> >> Basically what I want is keep collecting data until it reaches 5 seconds >> from the time the first data came in for the key and then forward it to the >> next operator. I will be using ListState to add the entries and then >> register a timer when the list is empty. When the timer runs then collect >> all entries and forward it, also remove entries from the list. Do you guys >> think this will suffice or anything else has to be done? >> >> Also I will have about 1M keys, then would there be any performance >> impact in creating these many timers? I believe the timers are >> automatically removed after they are fired or should I do anything extra to >> remove these timers? >> >> Thanks >> >
ProcessFunction Timer
Hi All, I'm currently using a tumbling window of 5 seconds using TumblingTimeWindow but due to change in requirements I would not have to window every incoming data. With that said I'm planning to use process function to achieve this selective windowing. I looked at the example provided in the documentation and I'm not clear on how I can implement the windowing. https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html Basically what I want is keep collecting data until it reaches 5 seconds from the time the first data came in for the key and then forward it to the next operator. I will be using ListState to add the entries and then register a timer when the list is empty. When the timer runs then collect all entries and forward it, also remove entries from the list. Do you guys think this will suffice or anything else has to be done? Also I will have about 1M keys, then would there be any performance impact in creating these many timers? I believe the timers are automatically removed after they are fired or should I do anything extra to remove these timers? Thanks
Re: Broadcast state
Ya, there will not be a problem of duplicates. But what I'm trying to achieve is if there a large static state which needs to be present just one per node rather than storing it per slot that would be ideal. The reason being is that the state is quite large around 100GB of mostly static data and it is not needed at per slot level. It can be at per instance level where each slot can read from this shared memory. Thanks On Wed, Oct 9, 2019 at 12:13 AM Congxian Qiu wrote: > Hi, > > After using Redis, why there need to care about eliminate duplicated data, > if you specify the same key, then Redis will do the deduplicate things. > > Best, > Congxian > > > Fabian Hueske 于2019年10月2日周三 下午5:30写道: > >> Hi, >> >> State is always associated with a single task in Flink. >> The state of a task cannot be accessed by other tasks of the same >> operator or tasks of other operators. >> This is true for every type of state, including broadcast state. >> >> Best, Fabian >> >> >> Am Di., 1. Okt. 2019 um 08:22 Uhr schrieb Navneeth Krishnan < >> reachnavnee...@gmail.com>: >> >>> Hi, >>> >>> I can use redis but I’m still having hard time figuring out how I can >>> eliminate duplicate data. Today without broadcast state in 1.4 I’m using >>> cache to lazy load the data. I thought the broadcast state will be similar >>> to that of kafka streams where I have read access to the state across the >>> pipeline. That will indeed solve a lot of problems. Is there some way I can >>> do the same with flink? >>> >>> Thanks! >>> >>> On Mon, Sep 30, 2019 at 10:36 PM Congxian Qiu >>> wrote: >>> >>>> Hi, >>>> >>>> Could you use some cache system such as HBase or Reids to storage this >>>> data, and query from the cache if needed? >>>> >>>> Best, >>>> Congxian >>>> >>>> >>>> Navneeth Krishnan 于2019年10月1日周二 上午10:15写道: >>>> >>>>> Thanks Oytun. The problem with doing that is the same data will be >>>>> have to be stored multiple times wasting memory. In my case there will >>>>> around million entries which needs to be used by at least two operators >>>>> for >>>>> now. >>>>> >>>>> Thanks >>>>> >>>>> On Mon, Sep 30, 2019 at 5:42 PM Oytun Tez wrote: >>>>> >>>>>> This is how we currently use broadcast state. Our states are >>>>>> re-usable (code-wise), every operator that wants to consume basically >>>>>> keeps >>>>>> the same descriptor state locally by processBroadcastElement'ing into a >>>>>> local state. >>>>>> >>>>>> I am open to suggestions. I see this as a hard drawback of dataflow >>>>>> programming or Flink framework? >>>>>> >>>>>> >>>>>> >>>>>> --- >>>>>> Oytun Tez >>>>>> >>>>>> *M O T A W O R D* >>>>>> The World's Fastest Human Translation Platform. >>>>>> oy...@motaword.com — www.motaword.com >>>>>> >>>>>> >>>>>> On Mon, Sep 30, 2019 at 8:40 PM Oytun Tez wrote: >>>>>> >>>>>>> You can re-use the broadcasted state (along with its descriptor) >>>>>>> that comes into your KeyedBroadcastProcessFunction, in another operator >>>>>>> downstream. that's basically duplicating the broadcasted state whichever >>>>>>> operator you want to use, every time. >>>>>>> >>>>>>> >>>>>>> >>>>>>> --- >>>>>>> Oytun Tez >>>>>>> >>>>>>> *M O T A W O R D* >>>>>>> The World's Fastest Human Translation Platform. >>>>>>> oy...@motaword.com — www.motaword.com >>>>>>> >>>>>>> >>>>>>> On Mon, Sep 30, 2019 at 8:29 PM Navneeth Krishnan < >>>>>>> reachnavnee...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi All, >>>>>>>> >>>>>>>> Is it possible to access a broadcast state across the pipeline? For >>>>>>>> example, say I have a KeyedBroadcastProcessFunction which adds the >>>>>>>> incoming >>>>>>>> data to state and I have downstream operator where I need the same >>>>>>>> state as >>>>>>>> well, would I be able to just read the broadcast state with a readonly >>>>>>>> view. I know this is possible in kafka streams. >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>
Re: Broadcast state
Hi, I can use redis but I’m still having hard time figuring out how I can eliminate duplicate data. Today without broadcast state in 1.4 I’m using cache to lazy load the data. I thought the broadcast state will be similar to that of kafka streams where I have read access to the state across the pipeline. That will indeed solve a lot of problems. Is there some way I can do the same with flink? Thanks! On Mon, Sep 30, 2019 at 10:36 PM Congxian Qiu wrote: > Hi, > > Could you use some cache system such as HBase or Reids to storage this > data, and query from the cache if needed? > > Best, > Congxian > > > Navneeth Krishnan 于2019年10月1日周二 上午10:15写道: > >> Thanks Oytun. The problem with doing that is the same data will be have >> to be stored multiple times wasting memory. In my case there will around >> million entries which needs to be used by at least two operators for now. >> >> Thanks >> >> On Mon, Sep 30, 2019 at 5:42 PM Oytun Tez wrote: >> >>> This is how we currently use broadcast state. Our states are re-usable >>> (code-wise), every operator that wants to consume basically keeps the same >>> descriptor state locally by processBroadcastElement'ing into a local state. >>> >>> I am open to suggestions. I see this as a hard drawback of dataflow >>> programming or Flink framework? >>> >>> >>> >>> --- >>> Oytun Tez >>> >>> *M O T A W O R D* >>> The World's Fastest Human Translation Platform. >>> oy...@motaword.com — www.motaword.com >>> >>> >>> On Mon, Sep 30, 2019 at 8:40 PM Oytun Tez wrote: >>> >>>> You can re-use the broadcasted state (along with its descriptor) that >>>> comes into your KeyedBroadcastProcessFunction, in another operator >>>> downstream. that's basically duplicating the broadcasted state whichever >>>> operator you want to use, every time. >>>> >>>> >>>> >>>> --- >>>> Oytun Tez >>>> >>>> *M O T A W O R D* >>>> The World's Fastest Human Translation Platform. >>>> oy...@motaword.com — www.motaword.com >>>> >>>> >>>> On Mon, Sep 30, 2019 at 8:29 PM Navneeth Krishnan < >>>> reachnavnee...@gmail.com> wrote: >>>> >>>>> Hi All, >>>>> >>>>> Is it possible to access a broadcast state across the pipeline? For >>>>> example, say I have a KeyedBroadcastProcessFunction which adds the >>>>> incoming >>>>> data to state and I have downstream operator where I need the same state >>>>> as >>>>> well, would I be able to just read the broadcast state with a readonly >>>>> view. I know this is possible in kafka streams. >>>>> >>>>> Thanks >>>>> >>>>
Re: Broadcast state
Thanks Oytun. The problem with doing that is the same data will be have to be stored multiple times wasting memory. In my case there will around million entries which needs to be used by at least two operators for now. Thanks On Mon, Sep 30, 2019 at 5:42 PM Oytun Tez wrote: > This is how we currently use broadcast state. Our states are re-usable > (code-wise), every operator that wants to consume basically keeps the same > descriptor state locally by processBroadcastElement'ing into a local state. > > I am open to suggestions. I see this as a hard drawback of dataflow > programming or Flink framework? > > > > --- > Oytun Tez > > *M O T A W O R D* > The World's Fastest Human Translation Platform. > oy...@motaword.com — www.motaword.com > > > On Mon, Sep 30, 2019 at 8:40 PM Oytun Tez wrote: > >> You can re-use the broadcasted state (along with its descriptor) that >> comes into your KeyedBroadcastProcessFunction, in another operator >> downstream. that's basically duplicating the broadcasted state whichever >> operator you want to use, every time. >> >> >> >> --- >> Oytun Tez >> >> *M O T A W O R D* >> The World's Fastest Human Translation Platform. >> oy...@motaword.com — www.motaword.com >> >> >> On Mon, Sep 30, 2019 at 8:29 PM Navneeth Krishnan < >> reachnavnee...@gmail.com> wrote: >> >>> Hi All, >>> >>> Is it possible to access a broadcast state across the pipeline? For >>> example, say I have a KeyedBroadcastProcessFunction which adds the incoming >>> data to state and I have downstream operator where I need the same state as >>> well, would I be able to just read the broadcast state with a readonly >>> view. I know this is possible in kafka streams. >>> >>> Thanks >>> >>
Broadcast state
Hi All, Is it possible to access a broadcast state across the pipeline? For example, say I have a KeyedBroadcastProcessFunction which adds the incoming data to state and I have downstream operator where I need the same state as well, would I be able to just read the broadcast state with a readonly view. I know this is possible in kafka streams. Thanks
Re: Running flink on AWS ECS
Thanks Terry, the reason why I asked this is because somewhere I saw running one slot per container is beneficial. I couldn’t find the where I saw that. Also I think running it with multiple slots will reduce IPC since some of the data will be processed writhing the same JVM. Thanks On Wed, Sep 25, 2019 at 1:16 AM Terry Wang wrote: > Hi, Navneeth, > > I think both is ok. > IMO, run one container with number of slots same as virtual cores may be > better for slots can share the Flink Framework and thus reduce memory cost. > > Best, > Terry Wang > > > > > 在 2019年9月25日,下午3:26,Navneeth Krishnan 写道: > > > > Hi All, > > > > I’m currently running flink on amazon ecs and I have assigned task slots > based on vcpus per instance. Is it beneficial to run a separate container > with one slot each or one container with number of slots same as virtual > cores? > > > > Thanks > >
Running flink on AWS ECS
Hi All, I’m currently running flink on amazon ecs and I have assigned task slots based on vcpus per instance. Is it beneficial to run a separate container with one slot each or one container with number of slots same as virtual cores? Thanks
Operator state
Hi All, Is there a way to share operator state among operators? For example, I have an operator which has union state and the same state data is required in a downstream operator. If not, is there a recommended way to share the state? Thanks
RocksDB KeyValue store
Hi All, I looked at the RocksDB KV store implementation and I found that deserialization has to happen for each key lookup. Given a scenario where the key lookup has to happen for every single message would it still be a good idea to store it in rocksdb store or would in-memory store/cache be more efficient? I know if the data is stored in KV store it will automatically distribute when scale up/scale down happens and its fault tolerant. For example, if there are 1M user events and a user config of size 1KB is persisted into rocksdb then for each event would the state have to be deserialized? Wouldn't this create so many garbage? Also, is there is per machine sort of state store which can be used for all keys that are sent to that task manager? Thanks
Re: Event time window eviction
Thanks Taher. Are there any examples for this? In my scenario I would have data coming in and it might stop for sometime but I need the window to end after the duration. Also, I believe in version 1.3 the event time will progress only if all partitions in a kafka topic pass the event time. Is that still the case? If there is data in only few partitions will the event time progress? Thanks On Mon, Jul 29, 2019 at 10:51 AM taher koitawala wrote: > I believe the approach to this is wrong... For fixing windows we can write > our custom triggers to fire them... However what I'm not convinced with is > switching between event and processing time. > Write a custom triggers and fire the event time window if you > don't see any activity. That's the only way. > > On Mon, Jul 29, 2019, 11:07 PM Navneeth Krishnan > wrote: > >> Hi All, >> >> Any suggestions? >> >> Thanks >> >> On Thu, Jul 25, 2019 at 11:45 PM Navneeth Krishnan < >> reachnavnee...@gmail.com> wrote: >> >>> Hi All, >>> >>> I'm working on a very short tumbling window for 1 second per key. What I >>> want to achieve is if the event time per key doesn't progress after a >>> second I want to evict the window, basically a combination of event time >>> and processing time. I'm currently achieving it by registering a processing >>> time timer but is there a way to emit some global punctuator which can be >>> used to evict all keys window data. >>> >>> The issue with registering processing time timer for every key is >>> causing too much JVM pressure. Any suggestions on how this could be >>> implemented? >>> >>> Thanks >>> >>
Re: Event time window eviction
Hi All, Any suggestions? Thanks On Thu, Jul 25, 2019 at 11:45 PM Navneeth Krishnan wrote: > Hi All, > > I'm working on a very short tumbling window for 1 second per key. What I > want to achieve is if the event time per key doesn't progress after a > second I want to evict the window, basically a combination of event time > and processing time. I'm currently achieving it by registering a processing > time timer but is there a way to emit some global punctuator which can be > used to evict all keys window data. > > The issue with registering processing time timer for every key is causing > too much JVM pressure. Any suggestions on how this could be implemented? > > Thanks >
Event time window eviction
Hi All, I'm working on a very short tumbling window for 1 second per key. What I want to achieve is if the event time per key doesn't progress after a second I want to evict the window, basically a combination of event time and processing time. I'm currently achieving it by registering a processing time timer but is there a way to emit some global punctuator which can be used to evict all keys window data. The issue with registering processing time timer for every key is causing too much JVM pressure. Any suggestions on how this could be implemented? Thanks
Even key distribution workload
Hi All, Currently I have a keyBy user and I see uneven load distribution since some of the users would have very high load versus some users having very few messages. Is there a recommended way to achieve even distribution of workload? Has someone else encountered this problem and what was the workaround? Thanks
Re: Checkpoint failure
Hi All, Any pointers on the below checkpoint failure scenario. Appreciate all the help. Thanks Thanks On Sun, Jul 7, 2019 at 9:23 PM Navneeth Krishnan wrote: > Hi All, > > Occasionally I run into failed checkpoints error where 2 or 3 consecutive > checkpoints fails after running for a minute and then it recovers. This is > causing delay in processing the incoming data since there is huge amount of > data buffered during the failed checkpoints. I don't see any errors in the > taskmanager logs but here is the error in the jobmanager log. The state > size is around 100 mb. > > *Checkpoint configuration:* > Option Value > Checkpointing Mode Exactly Once > Interval 1m 0s > Timeout 1m 0s > Minimum Pause Between Checkpoints 5s > Maximum Concurrent Checkpoints 1 > Persist Checkpoints Externally Enabled (retain on cancellation) > *Jobmanager Log:* > > 2019-07-05 17:53:54,125 [flink-akka.actor.default-dispatcher-465901] WARN > o.a.f.r.c.CheckpointCoordinator - Received late message for now expired > checkpoint attempt 9867 from 79515b6550d2c223701be0a9c870995f of job > 00ff93caa4cc9464bd41e1d050fcf65c. > 2019-07-05 17:53:54,141 [flink-akka.actor.default-dispatcher-465901] WARN > o.a.f.r.c.CheckpointCoordinator - Received late message for now expired > checkpoint attempt 9867 from 630984cdd5e66b4d9ea95a91cb4d23f6 of job > 00ff93caa4cc9464bd41e1d050fcf65c. > 2019-07-05 17:53:54,168 [flink-akka.actor.default-dispatcher-465901] WARN > o.a.f.r.c.CheckpointCoordinator - Received late message for now expired > checkpoint attempt 9867 from e12ed2e185a37559f93181905a52ebeb of job > 00ff93caa4cc9464bd41e1d050fcf65c. > 2019-07-05 17:53:54,215 [flink-akka.actor.default-dispatcher-465901] WARN > o.a.f.r.c.CheckpointCoordinator - Received late message for now expired > checkpoint attempt 9867 from 1fede192e2ff11e0905d98ff5ff6f9ce of job > 00ff93caa4cc9464bd41e1d050fcf65c. > 2019-07-05 17:53:54,223 [flink-akka.actor.default-dispatcher-465901] WARN > o.a.f.r.c.CheckpointCoordinator - Received late message for now expired > checkpoint attempt 9867 from d4e895eb20cc259c95b249cd0252930f of job > 00ff93caa4cc9464bd41e1d050fcf65c. > 2019-07-05 17:53:54,310 [flink-akka.actor.default-dispatcher-465901] WARN > o.a.f.r.c.CheckpointCoordinator - Received late message for now expired > checkpoint attempt 9867 from be5c711d7b37ed6d804dc447db91 of job > 00ff93caa4cc9464bd41e1d050fcf65c. > 2019-07-05 17:53:54,351 [flink-akka.actor.default-dispatcher-465901] WARN > o.a.f.r.c.CheckpointCoordinator - Received late message for now expired > checkpoint attempt 9867 from 1ed52695cc407f2f143d2bb5d23cbdbb of job > 00ff93caa4cc9464bd41e1d050fcf65c. > 2019-07-05 17:53:54,398 [flink-akka.actor.default-dispatcher-465901] WARN > o.a.f.r.c.CheckpointCoordinator - Received late message for now expired > checkpoint attempt 9867 from 2e43cf968ad399c0b8426239a7dd081c of job > 00ff93caa4cc9464bd41e1d050fcf65c. > 2019-07-05 17:53:54,959 [flink-akka.actor.default-dispatcher-465868] INFO > o.a.f.r.c.CheckpointCoordinator - Completed checkpoint 9868 (279307042 > bytes in 50707 ms). > 2019-07-05 17:54:04,174 [Checkpoint Timer] INFO > o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 9869 @ 1562349244171 > 2019-07-05 17:54:10,709 [flink-akka.actor.default-dispatcher-465905] INFO > o.a.f.r.c.CheckpointCoordinator - Completed checkpoint 9869 (253638470 > bytes in 6430 ms). > 2019-07-05 17:55:04,174 [Checkpoint Timer] INFO > o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 9870 @ 1562349304171 > 2019-07-05 17:55:09,816 [flink-akka.actor.default-dispatcher-465913] INFO > o.a.f.r.c.CheckpointCoordinator - Completed checkpoint 9870 (138649543 > bytes in 5551 ms). > 2019-07-05 17:56:04,174 [Checkpoint Timer] INFO > o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 9871 @ 1562349364171 > > Thanks >
Checkpoint failure
Hi All, Occasionally I run into failed checkpoints error where 2 or 3 consecutive checkpoints fails after running for a minute and then it recovers. This is causing delay in processing the incoming data since there is huge amount of data buffered during the failed checkpoints. I don't see any errors in the taskmanager logs but here is the error in the jobmanager log. The state size is around 100 mb. *Checkpoint configuration:* Option Value Checkpointing Mode Exactly Once Interval 1m 0s Timeout 1m 0s Minimum Pause Between Checkpoints 5s Maximum Concurrent Checkpoints 1 Persist Checkpoints Externally Enabled (retain on cancellation) *Jobmanager Log:* 2019-07-05 17:53:54,125 [flink-akka.actor.default-dispatcher-465901] WARN o.a.f.r.c.CheckpointCoordinator - Received late message for now expired checkpoint attempt 9867 from 79515b6550d2c223701be0a9c870995f of job 00ff93caa4cc9464bd41e1d050fcf65c. 2019-07-05 17:53:54,141 [flink-akka.actor.default-dispatcher-465901] WARN o.a.f.r.c.CheckpointCoordinator - Received late message for now expired checkpoint attempt 9867 from 630984cdd5e66b4d9ea95a91cb4d23f6 of job 00ff93caa4cc9464bd41e1d050fcf65c. 2019-07-05 17:53:54,168 [flink-akka.actor.default-dispatcher-465901] WARN o.a.f.r.c.CheckpointCoordinator - Received late message for now expired checkpoint attempt 9867 from e12ed2e185a37559f93181905a52ebeb of job 00ff93caa4cc9464bd41e1d050fcf65c. 2019-07-05 17:53:54,215 [flink-akka.actor.default-dispatcher-465901] WARN o.a.f.r.c.CheckpointCoordinator - Received late message for now expired checkpoint attempt 9867 from 1fede192e2ff11e0905d98ff5ff6f9ce of job 00ff93caa4cc9464bd41e1d050fcf65c. 2019-07-05 17:53:54,223 [flink-akka.actor.default-dispatcher-465901] WARN o.a.f.r.c.CheckpointCoordinator - Received late message for now expired checkpoint attempt 9867 from d4e895eb20cc259c95b249cd0252930f of job 00ff93caa4cc9464bd41e1d050fcf65c. 2019-07-05 17:53:54,310 [flink-akka.actor.default-dispatcher-465901] WARN o.a.f.r.c.CheckpointCoordinator - Received late message for now expired checkpoint attempt 9867 from be5c711d7b37ed6d804dc447db91 of job 00ff93caa4cc9464bd41e1d050fcf65c. 2019-07-05 17:53:54,351 [flink-akka.actor.default-dispatcher-465901] WARN o.a.f.r.c.CheckpointCoordinator - Received late message for now expired checkpoint attempt 9867 from 1ed52695cc407f2f143d2bb5d23cbdbb of job 00ff93caa4cc9464bd41e1d050fcf65c. 2019-07-05 17:53:54,398 [flink-akka.actor.default-dispatcher-465901] WARN o.a.f.r.c.CheckpointCoordinator - Received late message for now expired checkpoint attempt 9867 from 2e43cf968ad399c0b8426239a7dd081c of job 00ff93caa4cc9464bd41e1d050fcf65c. 2019-07-05 17:53:54,959 [flink-akka.actor.default-dispatcher-465868] INFO o.a.f.r.c.CheckpointCoordinator - Completed checkpoint 9868 (279307042 bytes in 50707 ms). 2019-07-05 17:54:04,174 [Checkpoint Timer] INFO o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 9869 @ 1562349244171 2019-07-05 17:54:10,709 [flink-akka.actor.default-dispatcher-465905] INFO o.a.f.r.c.CheckpointCoordinator - Completed checkpoint 9869 (253638470 bytes in 6430 ms). 2019-07-05 17:55:04,174 [Checkpoint Timer] INFO o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 9870 @ 1562349304171 2019-07-05 17:55:09,816 [flink-akka.actor.default-dispatcher-465913] INFO o.a.f.r.c.CheckpointCoordinator - Completed checkpoint 9870 (138649543 bytes in 5551 ms). 2019-07-05 17:56:04,174 [Checkpoint Timer] INFO o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 9871 @ 1562349364171 Thanks
Flink forward talks
Hi All, Where can I get the videos of latest flink forward talks? Thanks, Navneeth
Flink Pipeline - CICD
Hi All, We have some streaming jobs in production and today we manually deploy the flink jobs in each region/environment. Before we start automating it I just wanted to check if anyone has already created a CICD script for Jenkins or other CICD tools to deploy the latest JAR on to running flink clusters? Any pointers would help. Thanks Regards, Navneeth
Connect keyed stream with broadcast
Hi, Is this feature present in flink 1.5? I have a requirement to connect a keyed stream and broadcast stream. https://issues.apache.org/jira/browse/FLINK-3659 Thanks, Navneeth
SideOutput Issue
Hi All, I'm having issues with creating side outputs. There are two input sources (both from kafka) and they are connected and fed into a co-process function. Inside the co-process, the regular data stream outputs a POJO and in processElement2 there is a periodic timer which creates the side output. When I start the job I get the below exception. Is there something that I'm doing wrong? I used the below example to implement the side output. https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java processElement2 ctx.output("side-output", POJO); Job dataStream.getSideOutput("side-output").print(); 2018-04-03 10:18:38.821 [Co-Flat Map (4/8)] INFO org.apache.flink.runtime.taskmanager.Task - Co-Flat Map (4/8) (20b92b7a8cdd1e63963886de0895882c) switched from CREATED to DEPLOYING. 2018-04-03 10:18:38.821 [Co-Process (1/8)] INFO org.apache.flink.runtime.taskmanager.Task - Co-Process (1/8) (fd8f971eea2e103e340d2955b384eaa3) switched from RUNNING to FAILED. java.lang.NullPointerException: null at org.apache.flink.streaming.api.collector.selector.DirectedOutput.(DirectedOutput.java:74) at org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.(CopyingDirectedOutput.java:40) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329) at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) 2018-04-03 10:18:38.880 [Co-Process (7/8)] INFO org.apache.flink.runtime.taskmanager.Task - Co-Process (7/8) (a86274f9ac49b71f00d218a1533cbd51) switched from RUNNING to FAILED. java.lang.NullPointerException: null at org.apache.flink.streaming.api.collector.selector.DirectedOutput.(DirectedOutput.java:74) at org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.(CopyingDirectedOutput.java:40) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329) at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) Thanks
Record timestamp from kafka
Hi, Is there way to get the kafka timestamp in deserialization schema? All records are written to kafka with timestamp and I would like to set that timestamp to every record that is ingested. Thanks.
Job restart hook
Hi, Is there a way for a script to be called whenever a job gets restarted? My scenario is lets say there are 20 slots and the job runs on all 20 slots. After a while a task manager goes down and now there are only 14 slots and I need to readjust the parallelism of my job to ensure the job runs until the lost TM comes up again. It would be great to know how others are handling this situation. Thanks, Navneeth
Re: Event time window questions
Thanks Sendoh. Is there a way to advance watermark even when there are no incoming events. What exactly does setAutoWatermarkInterval do? Also I don't see the watermark displayed in flink dashboard. Will the watermark advance only when there is data from all consuming kafka topic and partitions? I have 3 topics with 3 partitions in each topic. Thanks. Regards, Navneeth On Tue, Jan 23, 2018 at 9:32 AM, Sendoh wrote: > Hi, > > you can write your own trigger and window, and implement whatever logic > there. > There are some examples > https://github.com/apache/flink/blob/1875cac03042dad4a4c47b0de8364f > 02fbe457c6/flink-streaming-java/src/main/java/org/apache/ > flink/streaming/api/windowing/triggers/ > > If you don't see any event, it means window is not triggered. > > It would mean Watermark is not increasing. The issue can be the timestamp > is > not extracted correctly. > Or, if you miss the trigger if use the window function doesn't have it. > > Cheers, > > Sendoh > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ >
Re: Timer & Window Memory Consumption
Thanks Fabian but for 1.5k messages per second per TM there are several million Internal & TimerWindow objects created within a period of 5 seconds. Is there a way to get debug this issue? Regards, Navneeth On Tue, Jan 23, 2018 at 2:09 AM, Fabian Hueske wrote: > Hi, > > TimeWindows and Timers are created for each window, i.e., every 5 seconds > for every distinct key that a task is processing. > Event-time windows are completed and cleaned up when a watermark is > received that passes the window end timestamp. > Therefore, there might be more than one window per key depending on the > watermarks. > > Hope this helps, > Fabian > > 2018-01-21 6:48 GMT+01:00 Navneeth Krishnan : > >> Hi, >> >> I'm facing issues with frequent young generation garbage collections in >> my task manager which happens approximately every few seconds. I have 3 >> task managers with 12GB heap allocated on each and I have set the config to >> use G1GC. My program ingests binary data from kafka source and the message >> rate is around 4.5k msgs/sec with around 400 bytes per msg. Below are the >> operators used in the program. >> >> kafka src -> keyby -> CoProcess -> keyby -> Tumbling Window (5secs) -> >> FlatMap -> Sink >> >> I captured the below histograms at 5 second intervals and analyzed the >> heap as well. It looks like a lot InternalTimer and TimeWindow objects are >> created. >> >> Also, I see a high usage in org.apache.flink.streaming. >> api.operators.HeapInternalTimerService. >> >> *Window code:* >> dataStream.keyBy(new MessageKeySelector()) >> .window(TumblingEventTimeWindows.of(Time.seconds(5))) >> .apply(new Aggregate()); >> >> *Captured at time T:* >> >> num #instances #bytes class name >> -- >>1: 2074427 481933816 [B >>2:357192 339368592 [D >>3: 12759222 204147552 java.lang.Integer >>4: 31416 85151832 [I >>5:900982 83872240 [C >>6:631888 20220416 java.util.HashMap$Node >>7:804203 19300872 java.lang.String >>8:541651 17332832 org.apache.flink.streaming.api >> .operators.InternalTimer >>9:540252 17288064 org.apache.flink.streaming.api >> .windowing.windows.TimeWindow >> >> >> *Captured at T1 (T + 5 seconds):* >> >> num #instances #bytes class name >> -- >>1: 12084258 2282849264 <(228)%20284-9264> [B >>2: 1922018 1828760896 [D >>3: 68261427 1092182832 java.lang.Integer >>4: 2712099 291488736 [C >>5: 54201 98798976 [I >>6: 2028250 48678000 java.lang.String >>7: 66080 43528136 [[B >>8: 1401915 35580168 [Ljava.lang.Object; >>9:949062 30369984 java.util.HashMap$Node >> 10:570832 18266624 org.apache.flink.streaming.api >> .operators.InternalTimer >> 11:549979 17599328 org.apache.flink.streaming.api >> .windowing.windows.TimeWindow >> >> >> *Captured at T2 (T1+ 5 seconds):* >> >> num #instances #bytes class name >> -- >>1: 9911982 2920384472 [B >>2: 1584406 1510958520 [D >>3: 56087337 897397392 java.lang.Integer >>4: 26080337 834570784 java.util.HashMap$Node >>5: 25756748 824215936 org.apache.flink.streaming.api >> .operators.InternalTimer >>6: 25740086 823682752 org.apache.flink.streaming.api >> .windowing.windows.TimeWindow >> >> Thanks. >> >> >
Re: Network memory segments
Thanks Chesnay. On Tue, Jan 23, 2018 at 6:54 AM, Chesnay Schepler wrote: > I could reproduce this locally and opened a JIRA > <https://issues.apache.org/jira/browse/FLINK-8496>. > > > On 21.01.2018 04:32, Navneeth Krishnan wrote: > > Hi, > > We recently upgraded from flink 1.3 to 1.4 and in the task manager UI it > shows there are 0 memory segments whereas in 1.3 I think it was default > 32k. I have even tried adding the below config but still it shows 0. > > taskmanager.network.numberOfBuffers: 2048 > > [image: Inline image 1] > > Regards, > Navneeth > > >
Re: Scaling Flink
Hi, Any suggestions would really help. Thanks. On Mon, Jan 15, 2018 at 12:07 AM, Navneeth Krishnan < reachnavnee...@gmail.com> wrote: > Hi All, > > Has anyone tried scaling out flink cluster on EMR based on CPU usage/ > kafka lag/ back pressure monitoring? If so can you provide some insights on > how it could be achieved and sample scripts if possible. Thanks a lot. > > Thanks, > Navneeth >
Event time window questions
Hi, I am having few issues with event time windowing. Here is my scenario, data is ingested from a kafka consumer and then keyed by user followed by a Tumbling event window for 10 seconds. The max lag tolerance limit is 1 second. I have the BoundedOutOfOrdernessGenerator that extends AssignerWithPeriodicWatermarks to assign watermarks. When the data is ingested even after receiving multiple messages per user the window never gets evicted. What am I missing here? https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html .window(TumblingEventTimeWindows.of(Time.seconds(10))) The other issue I am having is there will be scenarios where there is just one message per user for more than a minute. In that case I want the window content to be evicted after the defined window interval of 10 seconds. Is there a way to evict the window data even when there is no more incoming data for that key? I have tried setAutoWatermarkInterval(1) but still no luck. How do I get the current watermark to be displayed in the flink dashboard UI under watermarks sections? Currently it shows no watermarks. Also is there a way to count the number of messages that missed the time window due to late arrival? Thanks and appreciate all the help.
Timer & Window Memory Consumption
Hi, I'm facing issues with frequent young generation garbage collections in my task manager which happens approximately every few seconds. I have 3 task managers with 12GB heap allocated on each and I have set the config to use G1GC. My program ingests binary data from kafka source and the message rate is around 4.5k msgs/sec with around 400 bytes per msg. Below are the operators used in the program. kafka src -> keyby -> CoProcess -> keyby -> Tumbling Window (5secs) -> FlatMap -> Sink I captured the below histograms at 5 second intervals and analyzed the heap as well. It looks like a lot InternalTimer and TimeWindow objects are created. Also, I see a high usage in org.apache.flink.streaming.api.operators.HeapInternalTimerService. *Window code:* dataStream.keyBy(new MessageKeySelector()) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply(new Aggregate()); *Captured at time T:* num #instances #bytes class name -- 1: 2074427 481933816 [B 2:357192 339368592 [D 3: 12759222 204147552 java.lang.Integer 4: 31416 85151832 [I 5:900982 83872240 [C 6:631888 20220416 java.util.HashMap$Node 7:804203 19300872 java.lang.String 8:541651 17332832 org.apache.flink.streaming.api.operators.InternalTimer 9:540252 17288064 org.apache.flink.streaming.api.windowing.windows.TimeWindow *Captured at T1 (T + 5 seconds):* num #instances #bytes class name -- 1: 12084258 2282849264 [B 2: 1922018 1828760896 [D 3: 68261427 1092182832 java.lang.Integer 4: 2712099 291488736 [C 5: 54201 98798976 [I 6: 2028250 48678000 java.lang.String 7: 66080 43528136 [[B 8: 1401915 35580168 [Ljava.lang.Object; 9:949062 30369984 java.util.HashMap$Node 10:570832 18266624 org.apache.flink.streaming.api.operators.InternalTimer 11:549979 17599328 org.apache.flink.streaming.api.windowing.windows.TimeWindow *Captured at T2 (T1+ 5 seconds):* num #instances #bytes class name -- 1: 9911982 2920384472 [B 2: 1584406 1510958520 [D 3: 56087337 897397392 java.lang.Integer 4: 26080337 834570784 java.util.HashMap$Node 5: 25756748 824215936 org.apache.flink.streaming.api.operators.InternalTimer 6: 25740086 823682752 org.apache.flink.streaming.api.windowing.windows.TimeWindow Thanks.
Network memory segments
Hi, We recently upgraded from flink 1.3 to 1.4 and in the task manager UI it shows there are 0 memory segments whereas in 1.3 I think it was default 32k. I have even tried adding the below config but still it shows 0. taskmanager.network.numberOfBuffers: 2048 [image: Inline image 1] Regards, Navneeth
Scaling Flink
Hi All, Has anyone tried scaling out flink cluster on EMR based on CPU usage/ kafka lag/ back pressure monitoring? If so can you provide some insights on how it could be achieved and sample scripts if possible. Thanks a lot. Thanks, Navneeth
Static Variables
Hi, I have a requirement to initialize few guava caches per jvm and some static helper classes. I tried few options but nothing worked. Need some help. Thanks a lot. 1. Operator level static variables: public static Cache loadingCache; public void open(Configuration parameters) throws Exception { if (loadingCache == null) initializeCache(); } The cache object is null on each operator slot and it gets initialized on every call to open method. 2. Initialize in operator class constructor: public FlatMapFunction(ParameterTool parameterTool) { this. parameterTool = parameterTool; initializeCache(); } The cache doesn't seem to be initialized when accessed inside the task manager. Thanks.
Re: Flink Kafka Producer Exception
Hi, I'm receiving this error and due to which I'm not able to run my job. Any help is greatly appreciated. Thanks. On Tue, Dec 12, 2017 at 10:21 AM, Navneeth Krishnan < reachnavnee...@gmail.com> wrote: > Hi, > > I have a kafka source and sink in my pipeline and when I start my job I > get this error and the job goes to failed state. I checked the kafka node > and everything looks good. Any suggestion on what is happening here? Thanks. > > java.lang.Exception: Failed to send data to Kafka: The server disconnected > before a response was received. > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:443) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:420) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:394) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:612) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:598) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at com.transformations.MyProcessor.flatMap(MyProcessor.java:115) > at com.transformations.MyProcessor.flatMap(MyProcessor.java:47) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > >
Re: Custom Metrics
Thanks Pitor. I have couple more questions related to metrics. I use Influx db reporter to report flink metrics and I see a lot of metrics are bring reported. Is there a way to select only a subset of metrics that we need to monitor the application? Also, Is there a way to specify custom metics scope? Basically I register metrics like below, add a custom metric group and then add a meter per user. I would like this to be reported as measurement "Users" and tags with user id. This way I can easily visualize the data in grafana or any other tool by selecting the measurement and group by tag. Is there a way to report like that instead of host, process_type, tm_id, job_name, task_name & subtask_index? metricGroup.addGroup("Users") .meter(userId, new DropwizardMeterWrapper(new com.codahale.metrics.Meter())); Thanks a bunch. On Mon, Dec 11, 2017 at 11:12 PM, Piotr Nowojski wrote: > Hi, > > Reporting once per 10 seconds shouldn’t create problems. Best to try it > out. Let us know if you get into some troubles :) > > Piotrek > > On 11 Dec 2017, at 18:23, Navneeth Krishnan > wrote: > > Thanks Piotr. > > Yes, passing the metric group should be sufficient. The subcomponents will > not be able to provide the list of metrics to register since the metrics > are created based on incoming data by tenant. Also I am planning to have > the metrics reported every 10 seconds and hope it shouldn't be a problem. > We use influx and grafana to plot the metrics. > > The option 2 that I had in mind was to collect all metrics and use influx > db sink to report it directly inside the pipeline. But it seems reporting > per node might not be possible. > > > On Mon, Dec 11, 2017 at 3:14 AM, Piotr Nowojski > wrote: > >> Hi, >> >> I’m not sure if I completely understand your issue. >> >> 1. >> - You don’t have to pass RuntimeContext, you can always pass just the >> MetricGroup or ask your components/subclasses “what metrics do you want to >> register” and register them at the top level. >> - Reporting tens/hundreds/thousands of metrics shouldn’t be an issue for >> Flink, as long as you have a reasonable reporting interval. However keep in >> mind that Flink only reports your metrics and you still need something to >> read/handle/process/aggregate your metrics >> 2. >> I don’t think that reporting per node/jvm is possible with Flink’s metric >> system. For that you would need some other solution, like report your >> metrics using JMX (directly register MBeans from your code) >> >> Piotrek >> >> > On 10 Dec 2017, at 18:51, Navneeth Krishnan >> wrote: >> > >> > Hi, >> > >> > I have a streaming pipeline running on flink and I need to collect >> metrics to identify how my algorithm is performing. The entire pipeline is >> multi-tenanted and I also need metrics per tenant. Lets say there would be >> around 20 metrics to be captured per tenant. I have the following ideas for >> implemention but any suggestions on which one might be better will help. >> > >> > 1. Use flink metric group and register a group per tenant at the >> operator level. The disadvantage of this approach for me is I need the >> runtimecontext parameter to register a metric and I have various subclasses >> to which I need to pass this object to limit the metric scope within the >> operator. Also there will be too many metrics reported if there are higher >> number of subtasks. >> > How is everyone accessing flink state/ metrics from other classes where >> you don't have access to runtimecontext? >> > >> > 2. Use a custom singleton metric registry to add and send these metrics >> using custom sink. Instead of using flink metric group to collect metrics >> per operatior - subtask, collect per jvm and use influx sink to send the >> metric data. What i'm not sure in this case is how to collect only once per >> node/jvm. >> > >> > Thanks a bunch in advance. >> >> > >
Flink Kafka Producer Exception
Hi, I have a kafka source and sink in my pipeline and when I start my job I get this error and the job goes to failed state. I checked the kafka node and everything looks good. Any suggestion on what is happening here? Thanks. java.lang.Exception: Failed to send data to Kafka: The server disconnected before a response was received. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:443) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:420) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:394) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:598) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.transformations.MyProcessor.flatMap(MyProcessor.java:115) at com.transformations.MyProcessor.flatMap(MyProcessor.java:47) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748)
Re: Custom Metrics
Thanks Piotr. Yes, passing the metric group should be sufficient. The subcomponents will not be able to provide the list of metrics to register since the metrics are created based on incoming data by tenant. Also I am planning to have the metrics reported every 10 seconds and hope it shouldn't be a problem. We use influx and grafana to plot the metrics. The option 2 that I had in mind was to collect all metrics and use influx db sink to report it directly inside the pipeline. But it seems reporting per node might not be possible. On Mon, Dec 11, 2017 at 3:14 AM, Piotr Nowojski wrote: > Hi, > > I’m not sure if I completely understand your issue. > > 1. > - You don’t have to pass RuntimeContext, you can always pass just the > MetricGroup or ask your components/subclasses “what metrics do you want to > register” and register them at the top level. > - Reporting tens/hundreds/thousands of metrics shouldn’t be an issue for > Flink, as long as you have a reasonable reporting interval. However keep in > mind that Flink only reports your metrics and you still need something to > read/handle/process/aggregate your metrics > 2. > I don’t think that reporting per node/jvm is possible with Flink’s metric > system. For that you would need some other solution, like report your > metrics using JMX (directly register MBeans from your code) > > Piotrek > > > On 10 Dec 2017, at 18:51, Navneeth Krishnan > wrote: > > > > Hi, > > > > I have a streaming pipeline running on flink and I need to collect > metrics to identify how my algorithm is performing. The entire pipeline is > multi-tenanted and I also need metrics per tenant. Lets say there would be > around 20 metrics to be captured per tenant. I have the following ideas for > implemention but any suggestions on which one might be better will help. > > > > 1. Use flink metric group and register a group per tenant at the > operator level. The disadvantage of this approach for me is I need the > runtimecontext parameter to register a metric and I have various subclasses > to which I need to pass this object to limit the metric scope within the > operator. Also there will be too many metrics reported if there are higher > number of subtasks. > > How is everyone accessing flink state/ metrics from other classes where > you don't have access to runtimecontext? > > > > 2. Use a custom singleton metric registry to add and send these metrics > using custom sink. Instead of using flink metric group to collect metrics > per operatior - subtask, collect per jvm and use influx sink to send the > metric data. What i'm not sure in this case is how to collect only once per > node/jvm. > > > > Thanks a bunch in advance. > >
Custom Metrics
Hi, I have a streaming pipeline running on flink and I need to collect metrics to identify how my algorithm is performing. The entire pipeline is multi-tenanted and I also need metrics per tenant. Lets say there would be around 20 metrics to be captured per tenant. I have the following ideas for implemention but any suggestions on which one might be better will help. 1. Use flink metric group and register a group per tenant at the operator level. The disadvantage of this approach for me is I need the runtimecontext parameter to register a metric and I have various subclasses to which I need to pass this object to limit the metric scope within the operator. Also there will be too many metrics reported if there are higher number of subtasks. How is everyone accessing flink state/ metrics from other classes where you don't have access to runtimecontext? 2. Use a custom singleton metric registry to add and send these metrics using custom sink. Instead of using flink metric group to collect metrics per operatior - subtask, collect per jvm and use influx sink to send the metric data. What i'm not sure in this case is how to collect only once per node/jvm. Thanks a bunch in advance.
Passing Configuration & State
Hi All, I have developed a streaming pipeline in java and I need to pass some of the configuration parameters that are passed during program startup to user functions. I used the below link as reference. https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html I have tried withParameters & setGlobalJobParameters but that doesn't seem to work. The parameters are blank inside my user function when deployed in a cluster. I have also tried passing the parameters inside the constructor of my user function and this seem to work on local but not in cluster mode, again the parameters are blank. Is there a recommended way to pass the program parameters to user function classes? Also, I have scenario where the state created inside a user function has to passed around to multiple classes. Is there a state registry or something from which I can retrieve a registered state and use or should I implement my own? Thanks in advance. Regards, Navneeth
Reading Yarn Application Name in flink
Hi All, Is there a way to read the yarn application id/ name within flink so that the logs can be sent to an external logging stack like ELK or CloudWatch merged by the application? Thanks, Navneeth
[no subject]
Hello All, I have an in-memory cache created inside a user function and I need to assign the max capacity for it. Since the program can be run on any hardware, I'm thinking if I cloud assign based on flink's allocated managed memory. Is there a way to get the flink managed memory size inside a user function? If not are there any other options? Thanks, Navneeth
Running flink on YARN
Hello, I'm running flink on AWS EMR and I would like to know how I can pass a custom log4j properties file. I changed the log4j.properties file in flink conf directory but it doesn't seem like the changes are reflected. Thanks. I'm using the below command to start my flink job. > flink run -m yarn-cluster Regards, Navneeth
Re: Flink on EMR
Hi All, Any suggestions? Thanks. On Mon, Sep 25, 2017 at 10:14 PM, Navneeth Krishnan < reachnavnee...@gmail.com> wrote: > Hello All, > > I'm trying to deploy flink on AWS EMR and I'm very new to EMR. I'm running > into multiple issues and need some help. > > *Issue1:* > > How did others resolve this multiple bindings issue? > > > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1505848894978_0007/filecache/11/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1505848894978_0007/filecache/12/location-compute-1.0-SNAPSHOT-all.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > > > *Issue2:* > > Running the below command runs the pipeline but the task manager is allocated > with only 5GB memory instead of 8GB memory. Any reason why? > flink run -m yarn-cluster -yn 4 -yjm 2048 -ytm 8192 ./my-pipeline.jar > > > *Issue3:* > > How to provide the checkpoint directory? By just providing this > "hdfs:///checkpoints/" will it work or should I provide any master node host > name? > > > *Issue 4:* > > How can I get the task manager logs? Should I use log aggregation in hadoop > yarn or send it to cloud watch? > > > Also if there any best practices to be used while running flink on yarn, > please let me know. > > > Thanks a lot. > > > Regards, > > Navneeth > >
Re: Flink on EMR
Hi, I’m using the default flink package that comes with EMR. I’m facing the issue while running my pipeline. Thanks. On Mon, Sep 25, 2017 at 11:09 PM Jörn Franke wrote: > Amazon EMR has already a Flink package. You just need to check the > checkbox. I would not install it on your own. > I think you can find it in the advanced options. > > On 26. Sep 2017, at 07:14, Navneeth Krishnan > wrote: > > Hello All, > > I'm trying to deploy flink on AWS EMR and I'm very new to EMR. I'm running > into multiple issues and need some help. > > *Issue1:* > > How did others resolve this multiple bindings issue? > > > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1505848894978_0007/filecache/11/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1505848894978_0007/filecache/12/location-compute-1.0-SNAPSHOT-all.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > > > *Issue2:* > > Running the below command runs the pipeline but the task manager is allocated > with only 5GB memory instead of 8GB memory. Any reason why? > flink run -m yarn-cluster -yn 4 -yjm 2048 -ytm 8192 ./my-pipeline.jar > > > *Issue3:* > > How to provide the checkpoint directory? By just providing this > "hdfs:///checkpoints/" will it work or should I provide any master node host > name? > > > *Issue 4:* > > How can I get the task manager logs? Should I use log aggregation in hadoop > yarn or send it to cloud watch? > > > Also if there any best practices to be used while running flink on yarn, > please let me know. > > > Thanks a lot. > > > Regards, > > Navneeth > >
Re: Broadcast Config through Connected Stream
Thanks a lot Aljoscha. That helps. On Mon, Sep 25, 2017 at 4:47 AM, Aljoscha Krettek wrote: > Hi, > > I think this is a valid approach, you can even use "operator state" in > your map function to make the broadcast config state stateful. > > Another approach would be to use internal APIs to hack an operator that > has a keyed stream on one input and a broadcast stream on the second input. > You can see that approach in action in the Beam Flink Runner [1] but I > would strongly recommend against doing that because it is using internal > APIs and if the other approach works for you I would stay with that. > > Best, > Aljoscha > > [1] https://github.com/apache/beam/blob/be9fb29901cf4a1ae7b4a9d8e9f25f > 4ea78359fd/runners/flink/src/main/java/org/apache/beam/runners/flink/ > FlinkStreamingTransformTranslators.java#L488 > > On 15. Sep 2017, at 07:04, Navneeth Krishnan > wrote: > > Hi, > > Any suggestions on this could be achieved? > > Thanks > > On Thu, Sep 7, 2017 at 8:02 AM, Navneeth Krishnan < > reachnavnee...@gmail.com> wrote: > >> Hi All, >> >> Any suggestions on this would really help. >> >> Thanks. >> >> On Tue, Sep 5, 2017 at 2:42 PM, Navneeth Krishnan < >> reachnavnee...@gmail.com> wrote: >> >>> Hi All, >>> >>> I looked into an earlier email about the topic broadcast config through >>> connected stream and I couldn't find the conclusion. >>> >>> I can't do the below approach since I need the config to be published to >>> all operator instances but I need keyed state for external querying. >>> >>> streamToBeConfigured.connect(configMessageStream) >>> .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector) >>> .flatMap(new FunctionWithConfigurableState()) >>> .addSink(...); >>> >>> One of the resolution I found in that mail chain was below. I can use >>> this to solve my issue but is this still the recommended approach? >>> >>> stream1.connect(stream2) >>> .map(new MergeStreamsMapFunction()) // Holds transient state >>> of the last ConfigMessage and maps Stream1's data to a Tuple2>> ConfigMessage> >>> .keyBy(new SomeIdKeySelector()) // KeyBy Id to allow >>> for ValueStateDescriptors and semantically correct partitioning according >>> to business logic >>> .flatMap(new StatefulFlatMapFunction()) // Save latest >>> received ConfigMessage-Value in ValueStateDescriptor here >>> .addSink(...); >>> >>> Thanks, >>> Navneeth >>> >> >> > >
Flink on EMR
Hello All, I'm trying to deploy flink on AWS EMR and I'm very new to EMR. I'm running into multiple issues and need some help. *Issue1:* How did others resolve this multiple bindings issue? SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1505848894978_0007/filecache/11/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1505848894978_0007/filecache/12/location-compute-1.0-SNAPSHOT-all.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] *Issue2:* Running the below command runs the pipeline but the task manager is allocated with only 5GB memory instead of 8GB memory. Any reason why? flink run -m yarn-cluster -yn 4 -yjm 2048 -ytm 8192 ./my-pipeline.jar *Issue3:* How to provide the checkpoint directory? By just providing this "hdfs:///checkpoints/" will it work or should I provide any master node host name? *Issue 4:* How can I get the task manager logs? Should I use log aggregation in hadoop yarn or send it to cloud watch? Also if there any best practices to be used while running flink on yarn, please let me know. Thanks a lot. Regards, Navneeth
Re: Queryable State
No, it doesn't work even if I increase the timeout. The state being fetched is a Map of data and has around 100 entries in it. I have a single job manager and 3 task managers with 16 slots each running on AWS EC2. final TypeSerializer keySerializer = TypeInformation.of(new TypeHint() {}).createSerializer(new ExecutionConfig()); final TypeSerializer> valueSerializer = TypeInformation.of(new TypeHint>() {}).createSerializer(new ExecutionConfig()); final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace( key, keySerializer, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE); final FiniteDuration duration = new FiniteDuration(5, TimeUnit.MINUTES); On Fri, Sep 15, 2017 at 6:44 AM, Kostas Kloudas wrote: > Hi Navneeth, > > If you increase the timeout, everything works ok? > I suppose from your config that you are running in standalone mode, right? > > Any other information about the job (e.g. code and/or size of state being > fetched) and > the cluster setup that can help us pin down the problem, would be > appreciated. > > Thanks, > Kostas > > On Sep 13, 2017, at 7:12 PM, Navneeth Krishnan > wrote: > > Hi, > > I am sure I have provided the right job manager details because the > connection timeout ip is the task manager where the state is kept. I guess > the client is able to reach the job manager and figure out where the state > is. Also if I provide a wrong state name, I'm receiving unknown state > exception. I couldn't find why there is a timeout and a warning message is > logged in the job manager. > > On Wed, Sep 13, 2017 at 4:20 AM, Biplob Biswas > wrote: > >> Hi, >> >> >> are you sure your jobmanager is running and is accessible from the >> supplied >> hostname and port? If you can start up the FLink UI of the job which >> creates >> your queryable state, it should have the details of the job manager and >> the >> port to be used in this queryable client job. >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/ >> > > >
Re: Broadcast Config through Connected Stream
Hi, Any suggestions on this could be achieved? Thanks On Thu, Sep 7, 2017 at 8:02 AM, Navneeth Krishnan wrote: > Hi All, > > Any suggestions on this would really help. > > Thanks. > > On Tue, Sep 5, 2017 at 2:42 PM, Navneeth Krishnan < > reachnavnee...@gmail.com> wrote: > >> Hi All, >> >> I looked into an earlier email about the topic broadcast config through >> connected stream and I couldn't find the conclusion. >> >> I can't do the below approach since I need the config to be published to >> all operator instances but I need keyed state for external querying. >> >> streamToBeConfigured.connect(configMessageStream) >> .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector) >> .flatMap(new FunctionWithConfigurableState()) >> .addSink(...); >> >> One of the resolution I found in that mail chain was below. I can use >> this to solve my issue but is this still the recommended approach? >> >> stream1.connect(stream2) >> .map(new MergeStreamsMapFunction()) // Holds transient state >> of the last ConfigMessage and maps Stream1's data to a Tuple2> ConfigMessage> >> .keyBy(new SomeIdKeySelector()) // KeyBy Id to allow >> for ValueStateDescriptors and semantically correct partitioning according >> to business logic >> .flatMap(new StatefulFlatMapFunction()) // Save latest >> received ConfigMessage-Value in ValueStateDescriptor here >> .addSink(...); >> >> Thanks, >> Navneeth >> > >
Re: Queryable State
Hi, Any idea on how to solve this issue? Thanks On Wed, Sep 13, 2017 at 10:12 AM, Navneeth Krishnan < reachnavnee...@gmail.com> wrote: > Hi, > > I am sure I have provided the right job manager details because the > connection timeout ip is the task manager where the state is kept. I guess > the client is able to reach the job manager and figure out where the state > is. Also if I provide a wrong state name, I'm receiving unknown state > exception. I couldn't find why there is a timeout and a warning message is > logged in the job manager. > > On Wed, Sep 13, 2017 at 4:20 AM, Biplob Biswas > wrote: > >> Hi, >> >> >> are you sure your jobmanager is running and is accessible from the >> supplied >> hostname and port? If you can start up the FLink UI of the job which >> creates >> your queryable state, it should have the details of the job manager and >> the >> port to be used in this queryable client job. >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/ >> > >
Re: Queryable State
Hi, I am sure I have provided the right job manager details because the connection timeout ip is the task manager where the state is kept. I guess the client is able to reach the job manager and figure out where the state is. Also if I provide a wrong state name, I'm receiving unknown state exception. I couldn't find why there is a timeout and a warning message is logged in the job manager. On Wed, Sep 13, 2017 at 4:20 AM, Biplob Biswas wrote: > Hi, > > > are you sure your jobmanager is running and is accessible from the supplied > hostname and port? If you can start up the FLink UI of the job which > creates > your queryable state, it should have the details of the job manager and the > port to be used in this queryable client job. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ >
Re: Queryable State
Hi All, Any suggestions would really be helpful. Thanks On Sun, Sep 10, 2017 at 12:04 AM, Navneeth Krishnan < reachnavnee...@gmail.com> wrote: > Hi All, > > I'm running a streaming job on flink 1.3.2 with few queryable states. > There are 3 task managers and a job manager. I'm getting timeout exception > when trying to query a state and also a warning message in the job manager > log. > > *Client:* > final Configuration config = new Configuration(); > > config.setString(JobManagerOptions.ADDRESS, jobMgrHost); > config.setInteger(JobManagerOptions.PORT, > JobManagerOptions.PORT.defaultValue()); > > final HighAvailabilityServices highAvailabilityServices = > HighAvailabilityServicesUtils.createHighAvailabilityServices( > config, > Executors.newSingleThreadScheduledExecutor(), > > HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); > > QueryableStateClient client = new QueryableStateClient(config, > highAvailabilityServices); > > > *Exception:* > Exception in thread "main" io.netty.channel.ConnectTimeoutException: > connection timed out: /172.31.18.170:43537 > at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run( > AbstractNioChannel.java:212) > at io.netty.util.concurrent.PromiseTask$RunnableAdapter. > call(PromiseTask.java:38) > at io.netty.util.concurrent.ScheduledFutureTask.run( > ScheduledFutureTask.java:120) > at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks( > SingleThreadEventExecutor.java:357) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) > at io.netty.util.concurrent.SingleThreadEventExecutor$2. > run(SingleThreadEventExecutor.java:111) > at java.lang.Thread.run(Thread.java:745) > > *Job Manager:* > 2017-09-10 06:55:41,599 WARN akka.remote.ReliableDeliverySupervisor >- Association with remote system [akka.tcp:// > flink@127.0.0.1:64344] has failed, address is now gated for [5000] ms. > Reason: [Disassociated] > > Thanks, > Navneeth > > >
Queryable State
Hi All, I'm running a streaming job on flink 1.3.2 with few queryable states. There are 3 task managers and a job manager. I'm getting timeout exception when trying to query a state and also a warning message in the job manager log. *Client:* final Configuration config = new Configuration(); config.setString(JobManagerOptions.ADDRESS, jobMgrHost); config.setInteger(JobManagerOptions.PORT, JobManagerOptions.PORT.defaultValue()); final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( config, Executors.newSingleThreadScheduledExecutor(), HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices); *Exception:* Exception in thread "main" io.netty.channel.ConnectTimeoutException: connection timed out: /172.31.18.170:43537 at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212) at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) *Job Manager:* 2017-09-10 06:55:41,599 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp:// flink@127.0.0.1:64344] has failed, address is now gated for [5000] ms. Reason: [Disassociated] Thanks, Navneeth
Re: State Issue
Sorry my bad, figured out it was a change done at our end which created different keys. Thanks. On Fri, Sep 8, 2017 at 5:32 PM, Navneeth Krishnan wrote: > Hi, > > I'm experiencing a wired issue where any data put into map state when > retrieved with the same key is returning as null and hence it puts the same > value again and again. I used rocksdb state backend but tried with Memory > state backend too but the issue still exist. > > Each time when I set the key and value into MapState it creates a new map > I couldn't access the previous value. But when I iterate over the MapState > keys and values, I can see the same key added multiple times. > > Each put operation goes through the code lines marked in red. > > *NestedMapsStateTable.java* > > S get(K key, int keyGroupIndex, N namespace) { > >checkKeyNamespacePreconditions(key, namespace); > >Map> namespaceMap = getMapForKeyGroup(keyGroupIndex); > > > > * if (namespaceMap == null) { return null; }* > >Map keyedMap = namespaceMap.get(namespace); > >if (keyedMap == null) { > return null; >} > >return keyedMap.get(key); > } > > > *HeapMapState.java* > > @Override > public void put(UK userKey, UV userValue) { > >HashMap userMap = stateTable.get(currentNamespace); > > > > * if (userMap == null) { userMap = new HashMap<>(); > stateTable.put(currentNamespace, userMap); }* > >userMap.put(userKey, userValue); > } > > > *My Code:* > > *open()* > > MapStateDescriptor testStateDescriptor = new > MapStateDescriptor<>("test-state", > TypeInformation.of(new TypeHint() {}), TypeInformation.of(new > TypeHint() {})); > > testState = getRuntimeContext().getMapState(testStateDescriptor); > > > *flatMap:* > > if(testState.contains(user)){ > *// DO Something* > } else { > testState.put(user, userInfo); > } > > > streamEnv.setStateBackend(new MemoryStateBackend()); > > streamEnv.setParallelism(1); > > > Thanks > >
State Issue
Hi, I'm experiencing a wired issue where any data put into map state when retrieved with the same key is returning as null and hence it puts the same value again and again. I used rocksdb state backend but tried with Memory state backend too but the issue still exist. Each time when I set the key and value into MapState it creates a new map I couldn't access the previous value. But when I iterate over the MapState keys and values, I can see the same key added multiple times. Each put operation goes through the code lines marked in red. *NestedMapsStateTable.java* S get(K key, int keyGroupIndex, N namespace) { checkKeyNamespacePreconditions(key, namespace); Map> namespaceMap = getMapForKeyGroup(keyGroupIndex); * if (namespaceMap == null) { return null; }* Map keyedMap = namespaceMap.get(namespace); if (keyedMap == null) { return null; } return keyedMap.get(key); } *HeapMapState.java* @Override public void put(UK userKey, UV userValue) { HashMap userMap = stateTable.get(currentNamespace); * if (userMap == null) { userMap = new HashMap<>(); stateTable.put(currentNamespace, userMap); }* userMap.put(userKey, userValue); } *My Code:* *open()* MapStateDescriptor testStateDescriptor = new MapStateDescriptor<>("test-state", TypeInformation.of(new TypeHint() {}), TypeInformation.of(new TypeHint() {})); testState = getRuntimeContext().getMapState(testStateDescriptor); *flatMap:* if(testState.contains(user)){ *// DO Something* } else { testState.put(user, userInfo); } streamEnv.setStateBackend(new MemoryStateBackend()); streamEnv.setParallelism(1); Thanks
Re: Broadcast Config through Connected Stream
Hi All, Any suggestions on this would really help. Thanks. On Tue, Sep 5, 2017 at 2:42 PM, Navneeth Krishnan wrote: > Hi All, > > I looked into an earlier email about the topic broadcast config through > connected stream and I couldn't find the conclusion. > > I can't do the below approach since I need the config to be published to > all operator instances but I need keyed state for external querying. > > streamToBeConfigured.connect(configMessageStream) > .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector) > .flatMap(new FunctionWithConfigurableState()) > .addSink(...); > > One of the resolution I found in that mail chain was below. I can use this > to solve my issue but is this still the recommended approach? > > stream1.connect(stream2) > .map(new MergeStreamsMapFunction()) // Holds transient state > of the last ConfigMessage and maps Stream1's data to a Tuple2 ConfigMessage> > .keyBy(new SomeIdKeySelector()) // KeyBy Id to allow > for ValueStateDescriptors and semantically correct partitioning according > to business logic > .flatMap(new StatefulFlatMapFunction()) // Save latest > received ConfigMessage-Value in ValueStateDescriptor here > .addSink(...); > > Thanks, > Navneeth >
Re: State Maintenance
Will I be able to use both queryable MapState and union list state while implementing the CheckpointedFunction interface? Because one of my major requirement on that operator is to provide a queryable state and in order to compute that state we need the common static state across all parallel operator instances. Thanks. On Thu, Sep 7, 2017 at 12:44 AM, Fabian Hueske wrote: > Hi Navneeth, > > there's a lower level state interface that should address your > requirements: OperatorStateStore.getUnionListState() > > This union list state is similar to the regular operator list state, but > instead of splitting the list for recovery and giving out splits to > operator instance, it restores the complete list on each operator instance. > So it basically does a broadcast restore. If all operator have the same > state, only one instance checkpoints its state and this state is restored > to all other instances in case of a failure. This should also work with > rescaling. > The operator instance to checkpoint can be identified by > (RuntimeContext.getIndexOfThisSubtask > == 0). > > The OperatorStateStore is a bit hidden. You have to implement the > CheckpointedFunction interface. When > CheckpointedFunction.initializeState(FunctionInitializationContext > context) is called context has a method getOperatorStateStore(). > > I'd recommend to have a look at the detailed JavaDocs of all involved > classes and methods. > > Hope this helps, > Fabian > > > 2017-09-05 19:35 GMT+02:00 Navneeth Krishnan : > >> Thanks Gordon for your response. I have around 80 parallel flatmap >> operator instances and each instance requires 3 states. Out of which one is >> user state in which each operator will have unique user's data and I need >> this data to be queryable. The other two states are kind of static states >> which are only modified when there an update message in config stream. This >> static data could easily be around 2GB and in my previous approach I used >> operator state where the data is retrieved inside open method across all >> operator instances whereas checkpointed only inside one of the operator >> instance. >> >> One of the issue that I have is if I change the operator parallelism how >> would it affect the internal state? >> >> >> On Tue, Sep 5, 2017 at 5:36 AM, Tzu-Li (Gordon) Tai >> wrote: >> >>> Hi Navneeth, >>> >>> Answering your three questions separately: >>> >>> 1. Yes. Your MapState will be backed by RocksDB, so when removing an >>> entry >>> from the map state, the state will be removed from the local RocksDB as >>> well. >>> >>> 2. If state classes are not POJOs, they will be serialized by Kryo, >>> unless a >>> custom serializer is specifically specified otherwise. You can take a >>> look >>> at this document on how to do that [1]. >>> >>> 3. I might need to know more information to be able to suggest properly >>> for >>> this one. How are you using the "huge state values"? From what you >>> described, it seems like you only need it on one of the parallel >>> instances, >>> so I'm a bit curious on what they are actually used for. Are they needed >>> when processing your records? >>> >>> Cheers, >>> Gordon >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >>> dev/stream/state.html#custom-serialization-for-managed-state >>> >>> >>> >>> -- >>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>> ble.com/ >>> >> >> >
MapState Default Value
Hi, Is there a reason behind removing the default value option in MapStateDescriptor? I was using it in the earlier version to initialize guava cache with loader etc and in the new version by default an empty map is returned. Thanks
Broadcast Config through Connected Stream
Hi All, I looked into an earlier email about the topic broadcast config through connected stream and I couldn't find the conclusion. I can't do the below approach since I need the config to be published to all operator instances but I need keyed state for external querying. streamToBeConfigured.connect(configMessageStream) .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector) .flatMap(new FunctionWithConfigurableState()) .addSink(...); One of the resolution I found in that mail chain was below. I can use this to solve my issue but is this still the recommended approach? stream1.connect(stream2) .map(new MergeStreamsMapFunction()) // Holds transient state of the last ConfigMessage and maps Stream1's data to a Tuple2 .keyBy(new SomeIdKeySelector()) // KeyBy Id to allow for ValueStateDescriptors and semantically correct partitioning according to business logic .flatMap(new StatefulFlatMapFunction()) // Save latest received ConfigMessage-Value in ValueStateDescriptor here .addSink(...); Thanks, Navneeth
Re: State Maintenance
Thanks Gordon for your response. I have around 80 parallel flatmap operator instances and each instance requires 3 states. Out of which one is user state in which each operator will have unique user's data and I need this data to be queryable. The other two states are kind of static states which are only modified when there an update message in config stream. This static data could easily be around 2GB and in my previous approach I used operator state where the data is retrieved inside open method across all operator instances whereas checkpointed only inside one of the operator instance. One of the issue that I have is if I change the operator parallelism how would it affect the internal state? On Tue, Sep 5, 2017 at 5:36 AM, Tzu-Li (Gordon) Tai wrote: > Hi Navneeth, > > Answering your three questions separately: > > 1. Yes. Your MapState will be backed by RocksDB, so when removing an entry > from the map state, the state will be removed from the local RocksDB as > well. > > 2. If state classes are not POJOs, they will be serialized by Kryo, unless > a > custom serializer is specifically specified otherwise. You can take a look > at this document on how to do that [1]. > > 3. I might need to know more information to be able to suggest properly for > this one. How are you using the "huge state values"? From what you > described, it seems like you only need it on one of the parallel instances, > so I'm a bit curious on what they are actually used for. Are they needed > when processing your records? > > Cheers, > Gordon > > [1] > https://ci.apache.org/projects/flink/flink-docs- > release-1.3/dev/stream/state.html#custom-serialization-for-managed-state > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ >
Re: Process Function
Thanks a lot everyone. I have the user data ingested from kafka and it is keyed by userid. There are around 80 parallel flatmap operator instances after keyby and there are around few million users. The map state includes userid as the key and some value. I guess I will try the approach that Aljoscha has mentioned and see how it works. On Tue, Sep 5, 2017 at 8:17 AM, Aljoscha Krettek wrote: > Hi, > > This is mostly correct, but you cannot register a timer in open() because > we don't have an active key there. Only in process() and onTimer() can you > register a timer. > > In your case, I would suggest to somehow clamp the timestamp to the > nearest 2 minute (or whatever) interval or to keep an extra ValueState that > tells you whether you already registered a timer. > > Best, > Aljoscha > > On 5. Sep 2017, at 16:55, Kien Truong wrote: > > Hi, > > You can register a processing time timer inside the onTimer and the open > function to have a timer that run periodically. > > Pseudo-code example: > > ValueState lastRuntime; > > void open() { > ctx.timerService().registerProcessingTimeTimer(current.timestamp + 6); > } > > void onTimer() { > // Run the periodic task > if (lastRuntime.get() + 6 == timeStamp) { > periodicTask(); > } > // Re-register the processing time timer timer > lastRuntime.setValue(timeStamp); > ctx.timerService().registerProcessingTimeTimer(current.timestamp + 6); > } > > void periodicTask() > > > For the second question, timer are already scoped by key, so you can keep > a lastModified variable as a ValueState, > then compare it to the timestamp provided by the timer to see if the > current key should be evicted. > Checkout the example on the ProcessFunction page. > > https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/ > process_function.html > > Best regards, > Kien > > On 9/5/2017 11:49 AM, Navneeth Krishnan wrote: > > Hi All, > > I have a streaming pipeline which is keyed by userid and then to a flatmap > function. I need to clear the state after sometime and I was looking at > process function for it. > > Inside the process element function if I register a timer wouldn't it > create a timer for each incoming message? > > // schedule the next timer 60 seconds from the current event time > ctx.timerService().registerEventTimeTimer(current.timestamp + 6); > > How can I get something like a clean up task that runs every 2 mins and > evicts all stale data? Also is there a way to get the key inside onTimer > function so that I know which key has to be evicted? > > Thanks, > Navneeth > > >
Process Function
Hi All, I have a streaming pipeline which is keyed by userid and then to a flatmap function. I need to clear the state after sometime and I was looking at process function for it. Inside the process element function if I register a timer wouldn't it create a timer for each incoming message? // schedule the next timer 60 seconds from the current event time ctx.timerService().registerEventTimeTimer(current.timestamp + 6); How can I get something like a clean up task that runs every 2 mins and evicts all stale data? Also is there a way to get the key inside onTimer function so that I know which key has to be evicted? Thanks, Navneeth
State Maintenance
Hi All, I have couple of questions regarding state maintenance in flink. - I have a connected stream and then a keyby operator followed by a flatmap function. I use MapState and keys get added by data from stream1 and removed by messges from stream2. Stream2 acts as a control stream in my pipeline. My question is when the keys are removed will the state in rocksdb also be removed? How does rocks db get the most recent state? - Can I use guava cache in MapState like MapState>? Do I have to write a serializer to persist data from guava cache? - One of my downstream operator requires keyed state because I need to query the state value but it also has two huge state values that are basically the same across all parallel operator instances. Initially I used operator state and checkpoint only in the 0th index of operator and other instances would not checkpoint the same data. How can I achieve this in Keyed State? Each operator will have around 10GB of same data. Not sure if this will be a problem in future. Thanks, Navneeth