Re: Re: Checkpoint Error

2021-03-07 Thread Navneeth Krishnan
Hi Yun,

Thanks for the response. I checked the mounts and only the JM's and TM's
are mounted with this EFS. Not sure how to debug this.

Thanks

On Sun, Mar 7, 2021 at 8:29 PM Yun Gao  wrote:

> Hi Navneeth,
>
> It seems from the stack that the exception is caused by the underlying EFS
> problems ? Have you checked
> if there are errors reported for EFS, or if there might be duplicate
> mounting for the same EFS and others
> have ever deleted the directory?
>
> Best,
> Yun
>
>
> --Original Mail --
> *Sender:*Navneeth Krishnan 
> *Send Date:*Sun Mar 7 15:44:59 2021
> *Recipients:*user 
> *Subject:*Re: Checkpoint Error
>
>> Hi All,
>>
>> Any suggestions?
>>
>> Thanks
>>
>> On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan <
>> reachnavnee...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> We are running our streaming job on flink 1.7.2 and we are noticing the
>>> below error. Not sure what's causing it, any pointers would help. We have
>>> 10 TM's checkpointing to AWS EFS.
>>>
>>> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
>>> 11 for operator Processor -> Sink: KafkaSink (34/42).}at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)at
>>>  
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)at
>>>  
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)at
>>>  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at 
>>> java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at
>>>  
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at
>>>  java.lang.Thread.run(Thread.java:748)Caused by: java.lang.Exception: Could 
>>> not materialize checkpoint 11 for operator Processor -> Sink: KafkaSink 
>>> (34/42).at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)...
>>>  6 moreCaused by: java.util.concurrent.ExecutionException: 
>>> java.io.IOException: Could not flush and close the file system output 
>>> stream to 
>>> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
>>>  in order to obtain the stream state handleat 
>>> java.util.concurrent.FutureTask.report(FutureTask.java:122)at 
>>> java.util.concurrent.FutureTask.get(FutureTask.java:192)at 
>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)at 
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)at
>>>  
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)...
>>>  5 moreCaused by: java.io.IOException: Could not flush and close the file 
>>> system output stream to 
>>> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
>>>  in order to obtain the stream state handleat 
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)at
>>>  
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)at
>>>  
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)at
>>>  
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)at
>>>  java.util.concurrent.FutureTask.run(FutureTask.java:266)at 
>>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)... 
>>> 7 moreCaused by: java.io.IOException: Stale file handleat 
>>> java.io.FileOutputStream.close0(Native Method)at 
>>> java.io.FileOutputStream.access$000(FileOutputStream.java:53)at 
>>> java.io.FileOutputStream$1.close(FileOutputStream.java:356)at 
>>> java.io.FileDescriptor.closeAll(FileDescriptor.java:212)at 
>>> java.io.FileOutputStream.close(FileOutputStream.java:354)at 
>>> org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)at
>>>  
>>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)at
>>>  
>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)...
>>>  12 more
>>>
>>>
>>> Thanks
>>>
>>>


Re: Checkpoint Error

2021-03-06 Thread Navneeth Krishnan
Hi All,

Any suggestions?

Thanks

On Mon, Jan 18, 2021 at 7:38 PM Navneeth Krishnan 
wrote:

> Hi All,
>
> We are running our streaming job on flink 1.7.2 and we are noticing the
> below error. Not sure what's causing it, any pointers would help. We have
> 10 TM's checkpointing to AWS EFS.
>
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
> 11 for operator Processor -> Sink: KafkaSink (34/42).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 11 for 
> operator Processor -> Sink: KafkaSink (34/42).
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
>  in order to obtain the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>   ... 5 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)
>   at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>   ... 7 more
> Caused by: java.io.IOException: Stale file handle
>   at java.io.FileOutputStream.close0(Native Method)
>   at java.io.FileOutputStream.access$000(FileOutputStream.java:53)
>   at java.io.FileOutputStream$1.close(FileOutputStream.java:356)
>   at java.io.FileDescriptor.closeAll(FileDescriptor.java:212)
>   at java.io.FileOutputStream.close(FileOutputStream.java:354)
>   at 
> org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)
>   at 
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)
>   ... 12 more
>
>
> Thanks
>
>


Checkpoint Error

2021-01-18 Thread Navneeth Krishnan
Hi All,

We are running our streaming job on flink 1.7.2 and we are noticing the
below error. Not sure what's causing it, any pointers would help. We have
10 TM's checkpointing to AWS EFS.

AsynchronousException{java.lang.Exception: Could not materialize
checkpoint 11 for operator Processor -> Sink: KafkaSink (34/42).}
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 11
for operator Processor -> Sink: KafkaSink (34/42).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.io.IOException: Could not flush and close the file system output
stream to 
file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
in order to obtain the stream state handle
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.io.IOException: Could not flush and close the file
system output stream to
file:/mnt/checkpoints/a300d1b0fd059f3f83ce35a8042e89c8/chk-11/1cd768bd-3408-48a9-ad48-b005f66b130d
in order to obtain the stream state handle
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)
at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 more
Caused by: java.io.IOException: Stale file handle
at java.io.FileOutputStream.close0(Native Method)
at java.io.FileOutputStream.access$000(FileOutputStream.java:53)
at java.io.FileOutputStream$1.close(FileOutputStream.java:356)
at java.io.FileDescriptor.closeAll(FileDescriptor.java:212)
at java.io.FileOutputStream.close(FileOutputStream.java:354)
at 
org.apache.flink.core.fs.local.LocalDataOutputStream.close(LocalDataOutputStream.java:62)
at 
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)
... 12 more


Thanks


Re: CICD

2021-01-03 Thread Navneeth Krishnan
Thanks Vikash for the response. Yes thats very much feasible but we are
planning to move to job/application cluster model where in the artifacts
are bundled inside the container. When there is a new container image then
we might have to do the following.
- Take a savepoint
- Upgrade the JM and TM container images and provide the save point path
during start up

I would like to know if this is the standard way or if there are some
better options. We currently use terraform for managing the
infrastructure and it would be greatly helpful if someone has already done
this.

Thanks

On Sun, Jan 3, 2021 at 4:17 PM Vikash Dat  wrote:

> Could you not use the JM web address to utilize the rest api? You can
> start/stop/save point/restore + upload new jars via the rest api. While I
> did not run on ECS( ran on EMR) I was able to use the rest api to do
> deployments.
>
> On Sun, Jan 3, 2021 at 19:09 Navneeth Krishnan 
> wrote:
>
>> Hi All,
>>
>> Currently we are using flink in session cluster mode and we manually
>> deploy the jobs i.e. through the web UI. We use AWS ECS for running the
>> docker container with 2 services definitions, one for JM and other for TM.
>> How is everyone managing the CICD process? Is there a better way to run a
>> job in job cluster mode and use jenkins to perform CICD?
>>
>> Any pointers on how this is being done would really help and greatly
>> appreciated.
>>
>> Thanks,
>> Navneeth
>>
>


CICD

2021-01-03 Thread Navneeth Krishnan
Hi All,

Currently we are using flink in session cluster mode and we manually deploy
the jobs i.e. through the web UI. We use AWS ECS for running the docker
container with 2 services definitions, one for JM and other for TM. How is
everyone managing the CICD process? Is there a better way to run a job in
job cluster mode and use jenkins to perform CICD?

Any pointers on how this is being done would really help and greatly
appreciated.

Thanks,
Navneeth


Tumbling Time Window

2021-01-03 Thread Navneeth Krishnan
Hello All,

First of all Happy New Year!! Thanks for the excellent community support.

I have a job which requires a 2 seconds tumbling time window per key, For
each user we wait for 2 seconds to collect enough data and proceed to
 further processing. My question is should I use the regular DSL windowing
or write a custom process function which does the windowing. I have heard
that the DSL window has more overhead versus the custom window function.

What do you guys suggest and can someone provide an example of custom
window function per key. Also given the window time is very less (2 secs)
would there be more overhead in firing so many timers for each key?

Thanks!

Regards,
Navneeth


Re: Caching

2020-11-26 Thread Navneeth Krishnan
Thanks Dongwon. It was extremely helpful. I didn't quite understand how
async io can be used here. It would be great if you can share some info on
it.

Also how are you propagating any changes to values?

Regards,
Navneeth

On Thu, Nov 26, 2020 at 6:26 AM Dongwon Kim  wrote:

> Oops, I forgot to mention that when doing bulk insert into Redis, you'd
> better open a pipeline with a 'transaction' property set to False [1].
>
> Otherwise, API calls from your Flink job will be timeout.
>
> [1] https://github.com/andymccurdy/redis-py#pipelines
>
> On Thu, Nov 26, 2020 at 11:09 PM Dongwon Kim 
> wrote:
>
>> Hi Navneeth,
>>
>> I reported a similar issue to yours before [1] but I took the
>> broadcasting approach at first.
>>
>> As you already anticipated, broadcasting is going to use more memory than
>> your current approach based on a static object on each TM .
>>
>> And the broadcasted data will be treated as operator state and will be
>> periodically checkpointed with serialization overhead & garbage collections.
>> These are not negligible at all if you're not carefully choosing
>> serialization strategy as explained in [2].
>> Even with the proper one, I've experienced mild back pressure whenever
>> - checkpoint is in progress (AFAIK, incremental checkpoint has nothing to
>> do with operator states)
>> - cache is being broadcasted
>>
>> For that reason, I decided to populate data on Redis but it also calls
>> for design decisions:
>> - which Java client to use? Jedis [3]? Lettuce [4]?
>> - how to invoke APIs calls inside Flink? synchronously or asynchronously?
>>
>> Currently I'm very satisfied with Lettuce with Flink's async io [5] with
>> very small memory footprint and without worrying about serialization
>> overhead and garbage collections.
>> Lettuce supports asynchronous communication so it works perfectly with
>> Flink's async io.
>> I bet you'll be very disappointed with invoking Jedis synchronously
>> inside ProcessFunction.
>>
>> Best,
>>
>> Dongwon
>>
>> [1]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Better-way-to-share-large-data-across-task-managers-td38231.html
>> [2]
>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
>> [3] https://github.com/redis/jedis
>> [4] https://lettuce.io/
>> [5]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
>>
>> On Thu, Nov 26, 2020 at 5:31 PM Navneeth Krishnan <
>> reachnavnee...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> We have a flink streaming job processing around 200k events per second.
>>> The job requires a lot of less frequently changing data (sort of static but
>>> there will be some changes over time, say 5% change once per day or so).
>>> There are about 12 caches with some containing approximately 20k
>>> entries whereas a few with about 2 million entries.
>>>
>>> In the current implementation we are using in-memory lazy loading static
>>> cache to populate the data and the initialization happens in open function.
>>> The reason to choose this approach is because we have allocated around 4GB
>>> extra memory per TM for these caches and if a TM has 6 slots the cache can
>>> be shared.
>>>
>>> Now the issue we have with this approach is everytime when a container
>>> is restarted or a new job is deployed it has to populate the cache again.
>>> Sometimes this lazy loading takes a while and it causes back pressure as
>>> well. We were thinking to move this logic to the broadcast stream but since
>>> the data has to be stored per slot it would increase the memory consumption
>>> by a lot.
>>>
>>> Another option that we were thinking of is to replace the current near
>>> far cache that uses rest api to load the data to redis based near far
>>> cache. This will definitely reduce the overall loading time but still not
>>> the perfect solution.
>>>
>>> Are there any recommendations on how this can be achieved effectively?
>>> Also how is everyone overcoming this problem?
>>>
>>> Thanks,
>>> Navneeth
>>>
>>>


Caching

2020-11-26 Thread Navneeth Krishnan
Hi All,

We have a flink streaming job processing around 200k events per second. The
job requires a lot of less frequently changing data (sort of static but
there will be some changes over time, say 5% change once per day or so).
There are about 12 caches with some containing approximately 20k
entries whereas a few with about 2 million entries.

In the current implementation we are using in-memory lazy loading static
cache to populate the data and the initialization happens in open function.
The reason to choose this approach is because we have allocated around 4GB
extra memory per TM for these caches and if a TM has 6 slots the cache can
be shared.

Now the issue we have with this approach is everytime when a container is
restarted or a new job is deployed it has to populate the cache again.
Sometimes this lazy loading takes a while and it causes back pressure as
well. We were thinking to move this logic to the broadcast stream but since
the data has to be stored per slot it would increase the memory consumption
by a lot.

Another option that we were thinking of is to replace the current near far
cache that uses rest api to load the data to redis based near far cache.
This will definitely reduce the overall loading time but still not the
perfect solution.

Are there any recommendations on how this can be achieved effectively? Also
how is everyone overcoming this problem?

Thanks,
Navneeth


Re: Job Restart Failure

2020-10-21 Thread Navneeth Krishnan
Hi All,

Any feedback on how this can be resolved? This is causing downtime in
production.

Thanks



On Tue, Oct 20, 2020 at 4:39 PM Navneeth Krishnan 
wrote:

> Hi All,
>
> I'm facing an issue in our flink application. This happens in version
> 1.4.0 and 1.7.2. We have both versions and we are seeing this problem on
> both. We are running flink on ECS and checkpointing enabled to EFS. When
> the pipeline restarts due to some node failure or any other reason, it just
> keeps restarting until the retry attempts without this same error message.
> When I checked the EFS volume I do see the file is still available but for
> some reason flink is unable to recover the job. Any pointers will help.
> Thanks
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
> state backend for StreamSource_cbc357ccb763df2852fee8c4fc7d55f2_(14/18) from 
> any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:245)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143)
>   ... 5 more
> Caused by: java.io.FileNotFoundException: 
> /mnt/checkpoints/150dee2a70cecdd41b63a06b42a95649/chk-52/76363f89-d19f-44aa-aaf9-b33d89ec7c6c
>  (No such file or directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
>   at 
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
>   at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
>   at 
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
>   at 
> org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:286)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:62)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>   ... 7 more
>
>
> *EFS:*
>
>
> [image: image.png]
>
>
>
> Thanks
>
>


Job Restart Failure

2020-10-20 Thread Navneeth Krishnan
Hi All,

I'm facing an issue in our flink application. This happens in version 1.4.0
and 1.7.2. We have both versions and we are seeing this problem on both. We
are running flink on ECS and checkpointing enabled to EFS. When the
pipeline restarts due to some node failure or any other reason, it just
keeps restarting until the retry attempts without this same error message.
When I checked the EFS volume I do see the file is still available but for
some reason flink is unable to recover the job. Any pointers will help.
Thanks

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore
operator state backend for
StreamSource_cbc357ccb763df2852fee8c4fc7d55f2_(14/18) from any of the
1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:245)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143)
... 5 more
Caused by: java.io.FileNotFoundException:
/mnt/checkpoints/150dee2a70cecdd41b63a06b42a95649/chk-52/76363f89-d19f-44aa-aaf9-b33d89ec7c6c
(No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.(FileInputStream.java:138)
at 
org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
at 
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
at 
org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:286)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:62)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
... 7 more


*EFS:*


[image: image.png]



Thanks


Adaptive load balancing

2020-09-22 Thread Navneeth Krishnan
Hi All,

We are currently using flink in production and use keyBy for performing a
CPU intensive computation. There is a cache lookup for a set of keys and
since keyBy cannot guarantee the data is sent to a single node we are
basically replicating the cache on all nodes. This is causing more memory
problems for us and we would like to explore some options to mitigate the
current limitations.

Is there a way to group a set of keys and send to a set of nodes so that we
don't have to replicate the cache data on all nodes?

Has someone tried implementing hashing with adaptive load balancing so that
if a node is busy processing then the data can be routed effectively to
other nodes which are free.

Any suggestions are greatly appreciated.

Thanks


Flink Migration

2020-08-28 Thread Navneeth Krishnan
Hi All,

We are currently on a very old version of flink 1.4.0 and it has worked
pretty well. But lately we have been facing checkpoint timeout issues. We
would like to minimize any changes to the current pipelines and go ahead
with the migration. With that said our first pick was to migrate to 1.5.6
and later migrate to a newer version.

Do you guys think a more recent version like 1.6 or 1.7 might work? We did
try 1.8 but it requires some changes in the pipelines.

When we tried 1.5.6 with docker compose we were unable to get the task
manager attached to jobmanager. Are there some specific configurations
required for newer versions?

Logs:

8-28 07:36:30.834 [main] INFO
org.apache.flink.runtime.util.LeaderRetrievalUtils  - TaskManager will try
to connect for 1 milliseconds before falling back to heuristics

2020-08-28 07:36:30.853 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Retrieved new target
address jobmanager/172.21.0.8:6123.

2020-08-28 07:36:31.279 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Trying to connect to
address jobmanager/172.21.0.8:6123

2020-08-28 07:36:31.280 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
address 'e6f9104cdc61/172.21.0.9': Connection refused (Connection refused)

2020-08-28 07:36:31.281 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
address '/172.21.0.9': Connection refused (Connection refused)

2020-08-28 07:36:31.281 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
address '/172.21.0.9': Connection refused (Connection refused)

2020-08-28 07:36:31.282 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
address '/127.0.0.1': Invalid argument (connect failed)

2020-08-28 07:36:31.283 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
address '/172.21.0.9': Connection refused (Connection refused)

2020-08-28 07:36:31.284 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
address '/127.0.0.1': Invalid argument (connect failed)

2020-08-28 07:36:31.684 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Trying to connect to
address jobmanager/172.21.0.8:6123

2020-08-28 07:36:31.686 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
address 'e6f9104cdc61/172.21.0.9': Connection refused (Connection refused)

2020-08-28 07:36:31.687 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
address '/172.21.0.9': Connection refused (Connection refused)

2020-08-28 07:36:31.688 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
address '/172.21.0.9': Connection refused (Connection refused)

2020-08-28 07:36:31.688 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
address '/127.0.0.1': Invalid argument (connect failed)

2020-08-28 07:36:31.689 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
address '/172.21.0.9': Connection refused (Connection refused)

2020-08-28 07:36:31.690 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
address '/127.0.0.1': Invalid argument (connect failed)

2020-08-28 07:36:32.490 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Trying to connect to
address jobmanager/172.21.0.8:6123

2020-08-28 07:36:32.491 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
address 'e6f9104cdc61/172.21.0.9': Connection refused (Connection refused)

2020-08-28 07:36:32.493 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
address '/172.21.0.9': Connection refused (Connection refused)

2020-08-28 07:36:32.494 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
address '/172.21.0.9': Connection refused (Connection refused)

2020-08-28 07:36:32.495 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
address '/127.0.0.1': Invalid argument (connect failed)

2020-08-28 07:36:32.496 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
address '/172.21.0.9': Connection refused (Connection refused)

2020-08-28 07:36:32.497 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Failed to connect from
address '/127.0.0.1': Invalid argument (connect failed)

2020-08-28 07:36:34.099 [main] INFO
org.apache.flink.runtime.net.ConnectionUtils  - Trying to connect to
address jobmanager/172.21.0.8:6123

2020-08-28 07:36:34.100 [main] INFO
org.apache.flink.runtime.taskexecutor.TaskManagerRunner  - TaskManager will
use hostname/address 'e6f9104cdc61' (172.21.0.9) for communication.


Flink Conf

jobmanager.rpc.address: jobmanager

rest.address: jobmanager


Thanks


Tumbling window per key

2020-05-12 Thread Navneeth Krishnan
Hi All,

I was looking at the documentation for windows and got a little confused.
As per my understanding tumbling window per key will create a non
overlapping window based on when the data for that key arrived. For example
consider a tumbling window of 30 seconds
user1 - 10:01:01
user2 - 10:01:02
user1 - 10:01:05
user2 - 10:01:06
user2 - 10:01:08

Result:
user1 (10:01:01 & 10:01:05)
user2 (10:01:02 & 10:01:06)
user2 (10:01:08)

Is this the right understanding?

But as per the below image in docs, it looks like the window is not per
key, please correct me if i'm wrong.
[image: image.png]

Thanks


Re: Task Assignment

2020-04-23 Thread Navneeth Krishnan
Hi Marta,

Thanks for you response. What I'm looking for is something like data
localization. If I have one TM which is processing a set of keys, I want to
ensure all keys of the same type goes to the same TM rather than using
hashing to find the downstream slot. I could use a common key to do this
but I would have to parallelize as much as possible since the number of
incoming messages is too large to narrow down to a single key and
processing it.

Thanks

On Thu, Apr 23, 2020 at 2:02 AM Marta Paes Moreira 
wrote:

> Hi, Navneeth.
>
> If you *key* your stream using stream.keyBy(…), this will logically split
> your input and all the records with the same key will be processed in the
> same operator instance. This is the default behavior in Flink for keyed
> streams and transparently handled.
>
> You can read more about it in the documentation [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-state-and-operator-state
>
> On Thu, Apr 23, 2020 at 7:44 AM Navneeth Krishnan <
> reachnavnee...@gmail.com> wrote:
>
>> Hi All,
>>
>> Is there a way for an upstream operator to know how the downstream
>> operator tasks are assigned? Basically I want to group my messages to be
>> processed on slots in the same node based on some key.
>>
>> Thanks
>>
>


Task Assignment

2020-04-22 Thread Navneeth Krishnan
Hi All,

Is there a way for an upstream operator to know how the downstream
operator tasks are assigned? Basically I want to group my messages to be
processed on slots in the same node based on some key.

Thanks


Re: Flink

2020-04-22 Thread Navneeth Krishnan
Thanks a lot Timo. I will take a look at it. But does flink automatically
scale up and down at this point with native integration?

Thanks

On Tue, Apr 14, 2020 at 11:27 PM Timo Walther  wrote:

> Hi Navneeth,
>
> it might be also worth to look into Ververica Plaform for this. The
> community edition was published recently is free of charge. It provides
> first class K8s support [1].
>
> There is also a tutorial how to deploy it on EKS [2] (not the most
> recent one through).
>
> Regards,
> Timo
>
> [1]
>
> https://www.ververica.com/blog/announcing-ververica-platform-community-edition?utm_campaign=Ververica%20Platform%20-%20Community%20Edition&utm_content=123140986&utm_medium=social&utm_source=twitter&hss_channel=tw-2581958070
> [2]
>
> https://www.ververica.com/blog/how-to-get-started-with-data-artisans-platform-on-aws-eks
>
>
>
> On 15.04.20 03:57, Navneeth Krishnan wrote:
> > Hi All,
> >
> > I'm very new to EKS and trying to deploy a flink job in cluster mode.
> > Are there any good documentations on what are the steps to deploy on EKS?
> >
> >  From my understanding, with flink 1.10 running it on EKS will
> > automatically scale up and down with kubernetes integration based on the
> > load. Is this correct? Do I have to do enable some configs to support
> > this feature?
> >
> > How to use the lyft k8s operator when deploying on EKS?
> >
> > Thanks a lot, appreciate all the help.
> >
> >
> >
> >
>
>


Flink

2020-04-14 Thread Navneeth Krishnan
Hi All,

I'm very new to EKS and trying to deploy a flink job in cluster mode. Are
there any good documentations on what are the steps to deploy on EKS?

>From my understanding, with flink 1.10 running it on EKS will automatically
scale up and down with kubernetes integration based on the load. Is this
correct? Do I have to do enable some configs to support this feature?

How to use the lyft k8s operator when deploying on EKS?

Thanks a lot, appreciate all the help.


Re: Using s3 for checkpointing

2020-02-09 Thread Navneeth Krishnan
Thanks David. It worked after adding the jar inside a folder.

On Sat, Feb 1, 2020 at 2:37 AM David Magalhães 
wrote:

> Did you put each inside a different folder with their name? Like
> /opt/flink/plugins/s3-fs-presto/flink-s3-fs-presto-1.9.1.jar ?
>
> check
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html
>
> On Sat, Feb 1, 2020, 07:42 Navneeth Krishnan 
> wrote:
>
>> Hi Arvid,
>>
>> Thanks for the response.
>>
>> I have both the jars under /opt/flink/plugins but I'm still getting the
>> same error message. Also can someone please provide some pointers on how
>> entropy works. How should I setup the directory structure?
>>
>> In the link that you have provided there is a aws-credentials.jar file,
>> not sure where to get the jar since I don't see it in maven repo.
>>
>> *Plugins:*
>>
>> flink-s3-fs-hadoop-1.9.1.jar
>>
>> flink-s3-fs-presto-1.9.1.jar
>>
>>
>> *flink-conf:*
>>
>> high-availability.storageDir: s3p://.../recovery
>>
>>
>>
>> *Error message:*
>>
>> Shutting StandaloneSessionClusterEntrypoint down with application status
>> FAILED. Diagnostics java.io.IOException: Could not create FileSystem for
>> highly available storage (high-availability.storageDir)
>>
>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>> Could not find a file system implementation for scheme 's3p'. The scheme is
>> not directly supported by Flink and no Hadoop file system to support this
>> scheme could be loaded.
>>
>> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>> Hadoop is not in the classpath/dependencies.
>>
>>
>> Thanks
>>
>>
>>
>> On Thu, Jan 30, 2020 at 2:37 AM Arvid Heise  wrote:
>>
>>> Hi Navneeth,
>>>
>>> did you follow the plugin folder structure? [1]
>>>
>>> There is another plugin called flink-s3-fs-presto that you can use.
>>> If you want to use both plugins, use s3a:// for s3-fs-hadoop (output)
>>> and s3p:// for s3-fs-presto (checkpointing).
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/plugins.html#isolation-and-plugin-structure
>>>
>>> On Thu, Jan 30, 2020 at 10:26 AM Navneeth Krishnan <
>>> reachnavnee...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I'm trying to migrate from NFS to S3 for checkpointing and I'm facing
>>>> few issues. I have flink running in docker with flink-s3-fs-hadoop jar
>>>> copied to plugins folder. Even after having the jar I'm getting the
>>>> following error: Caused by:
>>>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is
>>>> not in the classpath/dependencies. Am I missing something?
>>>>
>>>> In the documentation it says "Presto is the recommended file system
>>>> for checkpointing to S3". How can I enable this? Is there a specific
>>>> configuration that I need to do for this?
>>>>
>>>> Also, I couldn't figure out how the entropy injection works. Should I
>>>> create the bucket with checkpoints folder and flink will automatically
>>>> inject an entropy and create a per job checkpoint folder or should I create
>>>> it?
>>>>
>>>> bucket/checkpoints/_entropy_/dashboard-job/
>>>>
>>>> s3.entropy.key: _entropy_
>>>> s3.entropy.length: 4 (default)
>>>>
>>>> Thanks
>>>>
>>>


Re: Using s3 for checkpointing

2020-01-31 Thread Navneeth Krishnan
Hi Arvid,

Thanks for the response.

I have both the jars under /opt/flink/plugins but I'm still getting the
same error message. Also can someone please provide some pointers on how
entropy works. How should I setup the directory structure?

In the link that you have provided there is a aws-credentials.jar file, not
sure where to get the jar since I don't see it in maven repo.

*Plugins:*

flink-s3-fs-hadoop-1.9.1.jar

flink-s3-fs-presto-1.9.1.jar


*flink-conf:*

high-availability.storageDir: s3p://.../recovery



*Error message:*

Shutting StandaloneSessionClusterEntrypoint down with application status
FAILED. Diagnostics java.io.IOException: Could not create FileSystem for
highly available storage (high-availability.storageDir)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Could not find a file system implementation for scheme 's3p'. The scheme is
not directly supported by Flink and no Hadoop file system to support this
scheme could be loaded.

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
Hadoop is not in the classpath/dependencies.


Thanks



On Thu, Jan 30, 2020 at 2:37 AM Arvid Heise  wrote:

> Hi Navneeth,
>
> did you follow the plugin folder structure? [1]
>
> There is another plugin called flink-s3-fs-presto that you can use.
> If you want to use both plugins, use s3a:// for s3-fs-hadoop (output) and
> s3p:// for s3-fs-presto (checkpointing).
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/plugins.html#isolation-and-plugin-structure
>
> On Thu, Jan 30, 2020 at 10:26 AM Navneeth Krishnan <
> reachnavnee...@gmail.com> wrote:
>
>> Hi All,
>>
>> I'm trying to migrate from NFS to S3 for checkpointing and I'm facing few
>> issues. I have flink running in docker with flink-s3-fs-hadoop jar
>> copied to plugins folder. Even after having the jar I'm getting the
>> following error: Caused by:
>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is
>> not in the classpath/dependencies. Am I missing something?
>>
>> In the documentation it says "Presto is the recommended file system for
>> checkpointing to S3". How can I enable this? Is there a specific
>> configuration that I need to do for this?
>>
>> Also, I couldn't figure out how the entropy injection works. Should I
>> create the bucket with checkpoints folder and flink will automatically
>> inject an entropy and create a per job checkpoint folder or should I create
>> it?
>>
>> bucket/checkpoints/_entropy_/dashboard-job/
>>
>> s3.entropy.key: _entropy_
>> s3.entropy.length: 4 (default)
>>
>> Thanks
>>
>


Using s3 for checkpointing

2020-01-30 Thread Navneeth Krishnan
Hi All,

I'm trying to migrate from NFS to S3 for checkpointing and I'm facing few
issues. I have flink running in docker with flink-s3-fs-hadoop jar copied
to plugins folder. Even after having the jar I'm getting the following
error: Caused by:
org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is
not in the classpath/dependencies. Am I missing something?

In the documentation it says "Presto is the recommended file system for
checkpointing to S3". How can I enable this? Is there a specific
configuration that I need to do for this?

Also, I couldn't figure out how the entropy injection works. Should I
create the bucket with checkpoints folder and flink will automatically
inject an entropy and create a per job checkpoint folder or should I create
it?

bucket/checkpoints/_entropy_/dashboard-job/

s3.entropy.key: _entropy_
s3.entropy.length: 4 (default)

Thanks


Re: Using redis cache in flink

2020-01-12 Thread Navneeth Krishnan
Hi Yun,

Thanks for the update. I can definitely use a redis cluster but what I
don't understand is if I use a custom operator then redis cache will
instantiated per operator instance. What I would like to ideally have is
one redis cache instance per TM JVM. Since there isn't anyway to share data
between task slots today in flink, I would like to use this approach to
basically share common data. What I'm not sure is how can I ensure just one
cache instance per TM JVM is created?

Regards

On Wed, Jan 8, 2020 at 12:46 AM Yun Tang  wrote:

> Hi Navneeth
>
> If you need the redis cache to be fault tolerant, I am afraid you have to
> choose redis cluster since Flink might deploy task on another node which is
> different from previous node after job failover.
>
> If you don't care about the fault tolerance, you could implement a
> customized operator which launch redis.
>
> By the way, there existed a way to combine objects on heap in memory with
> checkpoint mechanism to ensure fault tolerance, you could refer to [1] and
> [2]. The basic idea is to cac
>
> [1]
> https://github.com/apache/flink/blob/9df5c80e7e729f49595ef6814462165831fd1307/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/bundle/AbstractMapBundleOperator.java#L147
> [2]
> https://github.com/apache/flink/blob/9df5c80e7e729f49595ef6814462165831fd1307/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchLocalGroupAggFunction.java#L89
>
>
> --
> *From:* Navneeth Krishnan 
> *Sent:* Wednesday, January 8, 2020 15:36
> *To:* Yun Tang 
> *Cc:* user 
> *Subject:* Re: Using redis cache in flink
>
> Hi Yun,
>
> Thanks, the way I want to use redis is like a cache not as state backend.
> I would still have rocksdb state backend for other states. The reason to
> use cache instead of managed state is because I’d get around 10k msgs per
> task slot and I don’t have to get the state from rocksdb for each lookup.
> In memory cache would be fine but to rebuild the state I want to use redis.
>
> Regards
>
> On Tue, Jan 7, 2020 at 11:21 PM Yun Tang  wrote:
>
> Hi Navneeth
>
> If you wrap redis as a state backend, you cannot easily share data across
> slots as Flink construct state backend per operator with local thread only.
>
> If you use a redis cluster as a externalized service to store your data,
> you can share data across slots easily. However, compared with the reduced
> cost of serialization, the introduce of network communicate cannot be
> ignored. There exists trade-off here, and we cannot ensure there would be a
> performance gain. Actually, I prefer the time used in CPU serialization is
> much less than the time consumed through the network.
>
> Best
> Yun Tang
> --
> *From:* Navneeth Krishnan 
> *Sent:* Wednesday, January 8, 2020 12:33
> *To:* user 
> *Subject:* Using redis cache in flink
>
> Hi All,
>
> I want to use redis as near far cache to store data which are common
> across slots i.e. share data across slots. This data is required for
> processing every single message and it's better to store in a in memory
> cache backed by redis rather than rocksdb since it has to be serialized for
> every single get call. Do you guys think this is good solution or is there
> any other better solution? Also, Is there any reference on how I can create
> a centralized near far cache since the job and operators are distributed by
> the job manager.
>
> Thanks
>
>


Re: Using redis cache in flink

2020-01-07 Thread Navneeth Krishnan
Hi Yun,

Thanks, the way I want to use redis is like a cache not as state backend. I
would still have rocksdb state backend for other states. The reason to use
cache instead of managed state is because I’d get around 10k msgs per task
slot and I don’t have to get the state from rocksdb for each lookup. In
memory cache would be fine but to rebuild the state I want to use redis.

Regards

On Tue, Jan 7, 2020 at 11:21 PM Yun Tang  wrote:

> Hi Navneeth
>
> If you wrap redis as a state backend, you cannot easily share data across
> slots as Flink construct state backend per operator with local thread only.
>
> If you use a redis cluster as a externalized service to store your data,
> you can share data across slots easily. However, compared with the reduced
> cost of serialization, the introduce of network communicate cannot be
> ignored. There exists trade-off here, and we cannot ensure there would be a
> performance gain. Actually, I prefer the time used in CPU serialization is
> much less than the time consumed through the network.
>
> Best
> Yun Tang
> --
> *From:* Navneeth Krishnan 
> *Sent:* Wednesday, January 8, 2020 12:33
> *To:* user 
> *Subject:* Using redis cache in flink
>
> Hi All,
>
> I want to use redis as near far cache to store data which are common
> across slots i.e. share data across slots. This data is required for
> processing every single message and it's better to store in a in memory
> cache backed by redis rather than rocksdb since it has to be serialized for
> every single get call. Do you guys think this is good solution or is there
> any other better solution? Also, Is there any reference on how I can create
> a centralized near far cache since the job and operators are distributed by
> the job manager.
>
> Thanks
>


Using redis cache in flink

2020-01-07 Thread Navneeth Krishnan
Hi All,

I want to use redis as near far cache to store data which are common across
slots i.e. share data across slots. This data is required for processing
every single message and it's better to store in a in memory cache backed
by redis rather than rocksdb since it has to be serialized for every single
get call. Do you guys think this is good solution or is there any other
better solution? Also, Is there any reference on how I can create a
centralized near far cache since the job and operators are distributed by
the job manager.

Thanks


Re: Checkpoints issue and job failing

2020-01-06 Thread Navneeth Krishnan
Thanks Vino & Piotr,

sure, will upgrade the flink version and monitor it to see if the problem
still exist.

Thanks

On Mon, Jan 6, 2020 at 12:39 AM Piotr Nowojski  wrote:

> Hi,
>
> From the top of my head I don’t remember anything particular, however
> release 1.4.0 came with quite a lot of deep change which had it’s fair
> share number of bugs, that were subsequently fixed in later releases.
>
> Because 1.4.x tree is no longer supported I would strongly recommend to
> first upgrade to a more recent Flink version. If that’s not possible, I
> would at least upgrade to the latest release from 1.4.x tree (1.4.2).
>
> Piotrek
>
> On 6 Jan 2020, at 07:25, vino yang  wrote:
>
> Hi Navneeth,
>
> Since the file still exists, this exception is very strange.
>
> I want to ask, does it happen by accident or frequently?
>
> Another concern is that since the 1.4 version is very far away, all
> maintenance and response are not as timely as the recent versions. I
> personally recommend upgrading as soon as possible.
>
> I can ping @Piotr Nowojski   and see if it is
> possible to explain the cause of this problem.
>
> Best,
> Vino
>
> Navneeth Krishnan  于2020年1月4日周六 上午1:03写道:
>
>> Thanks Congxian & Vino.
>>
>> Yes, the file do exist and I don't see any problem in accessing it.
>>
>> Regarding flink 1.9, we haven't migrated yet but we are planning to do.
>> Since we have to test it might take sometime.
>>
>> Thanks
>>
>> On Fri, Jan 3, 2020 at 2:14 AM Congxian Qiu 
>> wrote:
>>
>>> Hi
>>>
>>> Do you have ever check that this problem exists on Flink 1.9?
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> vino yang  于2020年1月3日周五 下午3:54写道:
>>>
>>>> Hi Navneeth,
>>>>
>>>> Did you check if the path contains in the exception is really can not
>>>> be found?
>>>>
>>>> Best,
>>>> Vino
>>>>
>>>> Navneeth Krishnan  于2020年1月3日周五 上午8:23写道:
>>>>
>>>>> Hi All,
>>>>>
>>>>> We are running into checkpoint timeout issue more frequently in
>>>>> production and we also see the below exception. We are running flink 1.4.0
>>>>> and the checkpoints are saved on NFS. Can someone suggest how to overcome
>>>>> this?
>>>>>
>>>>> 
>>>>>
>>>>> java.lang.IllegalStateException: Could not initialize operator state 
>>>>> backend.
>>>>>   at 
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302)
>>>>>   at 
>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249)
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>>>>>   at 
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>>>>>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>>>>   at java.lang.Thread.run(Thread.java:748)
>>>>> Caused by: java.io.FileNotFoundException: 
>>>>> /mnt/checkpoints/02c4f8d5c11921f363b98c5959cc4f06/chk-101/e71d8eaf-ff4a-4783-92bd-77e3d8978e01
>>>>>  (No such file or directory)
>>>>>   at java.io.FileInputStream.open0(Native Method)
>>>>>   at java.io.FileInputStream.open(FileInputStream.java:195)
>>>>>   at java.io.FileInputStream.(FileInputStream.java:138)
>>>>>   at 
>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
>>>>>
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>


Re: Checkpoints issue and job failing

2020-01-03 Thread Navneeth Krishnan
Thanks Congxian & Vino.

Yes, the file do exist and I don't see any problem in accessing it.

Regarding flink 1.9, we haven't migrated yet but we are planning to do.
Since we have to test it might take sometime.

Thanks

On Fri, Jan 3, 2020 at 2:14 AM Congxian Qiu  wrote:

> Hi
>
> Do you have ever check that this problem exists on Flink 1.9?
>
> Best,
> Congxian
>
>
> vino yang  于2020年1月3日周五 下午3:54写道:
>
>> Hi Navneeth,
>>
>> Did you check if the path contains in the exception is really can not be
>> found?
>>
>> Best,
>> Vino
>>
>> Navneeth Krishnan  于2020年1月3日周五 上午8:23写道:
>>
>>> Hi All,
>>>
>>> We are running into checkpoint timeout issue more frequently in
>>> production and we also see the below exception. We are running flink 1.4.0
>>> and the checkpoints are saved on NFS. Can someone suggest how to overcome
>>> this?
>>>
>>> [image: image.png]
>>>
>>> java.lang.IllegalStateException: Could not initialize operator state 
>>> backend.
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302)
>>> at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.io.FileNotFoundException: 
>>> /mnt/checkpoints/02c4f8d5c11921f363b98c5959cc4f06/chk-101/e71d8eaf-ff4a-4783-92bd-77e3d8978e01
>>>  (No such file or directory)
>>> at java.io.FileInputStream.open0(Native Method)
>>> at java.io.FileInputStream.open(FileInputStream.java:195)
>>> at java.io.FileInputStream.(FileInputStream.java:138)
>>> at 
>>> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
>>>
>>>
>>> Thanks
>>>
>>>


Checkpoints issue and job failing

2020-01-02 Thread Navneeth Krishnan
Hi All,

We are running into checkpoint timeout issue more frequently in production
and we also see the below exception. We are running flink 1.4.0 and the
checkpoints are saved on NFS. Can someone suggest how to overcome this?

[image: image.png]

java.lang.IllegalStateException: Could not initialize operator state backend.
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:302)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:249)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException:
/mnt/checkpoints/02c4f8d5c11921f363b98c5959cc4f06/chk-101/e71d8eaf-ff4a-4783-92bd-77e3d8978e01
(No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.(FileInputStream.java:138)
at 
org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)


Thanks


Re: Flink vs Kafka streams

2019-11-08 Thread Navneeth Krishnan
Thanks Congxian. Yes, its been very hard to manage the cluster and thats
why we are trying to evaluate alternate choices. If anyone has found better
methods to deploy and scale, it would be great to know so that we can
adopt the same as well.

Thanks

On Fri, Nov 8, 2019 at 1:56 AM Congxian Qiu  wrote:

> Hi
>
> From your description, seems the big problem is scale in and out, and
> there maybe a big downtime for trigger savepoint and restore from the
> savepoint.
>
> Previously, we have proposed a feature named stop-with-checkpoint[1] same
> as the stop-with-savepoint, but triggering a checkpoint instead of
> savepoint, if you use incremental checkpoint, this can improve the speed
> for much. Currently, as this feature did not merged, you can try to restore
> from the retained checkpoint from previous job[2]
>
> For scale in and scale out, if the restore time cost too much, you can
> measure the time of restore, if spends too much time on downloading states,
> you can try the multi-thread download feature[3].
>
> [1] https://issues.apache.org/jira/browse/FLINK-12619
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
> [3] https://issues.apache.org/jira/browse/FLINK-10461
> Best,
> Congxian
>
>
> Navneeth Krishnan  于2019年11月8日周五 下午3:38写道:
>
>> Hello All,
>>
>> I have a streaming job running in production which is processing over 2
>> billion events per day and it does some heavy processing on each event. We
>> have been facing some challenges in managing flink in production like
>> scaling in and out, restarting the job with savepoint etc. Flink provides a
>> lot of features which seemed as an obvious choice at that time but now with
>> all the operational overhead we are thinking should we still use flink for
>> our stream processing requirements or choose kafka streams.
>>
>> We currently deploy flink on ECR. Bringing up a new cluster for another
>> stream job is too expensive but on the flip side running it on the same
>> cluster becomes difficult since there are no ways to say this job has to be
>> run on a dedicated server versus this can run on a shared instance. Also
>> savepoint point, cancel and submit a new job results in some downtime. The
>> most critical part being there is no shared state among all tasks sort of a
>> global state. We sort of achieve this today using an external redis cache
>> but that incurs cost as well.
>>
>> If we are moving to kafka streams, it makes our deployment life much
>> easier, each new stream job will be a microservice that can scale
>> independently. With global state it's much easier to share state without
>> using external cache. But the disadvantage is we have to rely on the
>> partitions for parallelism. Although this might initially sound easier,
>> when we need to scale much higher this will become a bottleneck.
>>
>> Do you guys have any suggestions on this? We need to decide which way to
>> move forward and any suggestions would be of much greater help.
>>
>> Thanks
>>
>


Flink vs Kafka streams

2019-11-07 Thread Navneeth Krishnan
Hello All,

I have a streaming job running in production which is processing over 2
billion events per day and it does some heavy processing on each event. We
have been facing some challenges in managing flink in production like
scaling in and out, restarting the job with savepoint etc. Flink provides a
lot of features which seemed as an obvious choice at that time but now with
all the operational overhead we are thinking should we still use flink for
our stream processing requirements or choose kafka streams.

We currently deploy flink on ECR. Bringing up a new cluster for another
stream job is too expensive but on the flip side running it on the same
cluster becomes difficult since there are no ways to say this job has to be
run on a dedicated server versus this can run on a shared instance. Also
savepoint point, cancel and submit a new job results in some downtime. The
most critical part being there is no shared state among all tasks sort of a
global state. We sort of achieve this today using an external redis cache
but that incurs cost as well.

If we are moving to kafka streams, it makes our deployment life much
easier, each new stream job will be a microservice that can scale
independently. With global state it's much easier to share state without
using external cache. But the disadvantage is we have to rely on the
partitions for parallelism. Although this might initially sound easier,
when we need to scale much higher this will become a bottleneck.

Do you guys have any suggestions on this? We need to decide which way to
move forward and any suggestions would be of much greater help.

Thanks


Re: ProcessFunction Timer

2019-10-18 Thread Navneeth Krishnan
I can use filtering to do it but I preferred process function because I
don't have to do a KeyBy again to do the windowing or use reinterpret. The
problem I'm having is if I use the processFunction and registered a timer
and before the timer is fired if I have more input records, how can I avoid
creating more timers and just use one timer to collect the data and forward
it. I was thinking about using a local variable but it wouldn't work across
keys. The other approach is to have a value state to indicate if the timer
is registered or not but I'm thinking is this the only way or is there a
better approach?

Thanks

On Fri, Oct 18, 2019 at 6:19 AM Andrey Zagrebin 
wrote:

> Hi Navneeth,
>
> You could also apply filtering on the incoming records before windowing.
> This might save you some development effort but I do not know full details
> of your requirement whether filtering is sufficient.
> In general, you can use timers as you suggested as the windowing itself
> works in a similar way.
>
> Best,
> Andrey
>
> On Thu, Oct 17, 2019 at 11:10 PM Navneeth Krishnan <
> reachnavnee...@gmail.com> wrote:
>
>> Hi All,
>>
>> I'm currently using a tumbling window of 5 seconds using
>> TumblingTimeWindow but due to change in requirements I would not have to
>> window every incoming data. With that said I'm planning to use process
>> function to achieve this selective windowing.
>>
>> I looked at the example provided in the documentation and I'm not clear
>> on how I can implement the windowing.
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>
>> Basically what I want is keep collecting data until it reaches 5 seconds
>> from the time the first data came in for the key and then forward it to the
>> next operator. I will be using ListState to add the entries and then
>> register a timer when the list is empty. When the timer runs then collect
>> all entries and forward it, also remove entries from the list. Do you guys
>> think this will suffice or anything else has to be done?
>>
>> Also I will have about 1M keys, then would there be any performance
>> impact in creating these many timers? I believe the timers are
>> automatically removed after they are fired or should I do anything extra to
>> remove these timers?
>>
>> Thanks
>>
>


ProcessFunction Timer

2019-10-17 Thread Navneeth Krishnan
Hi All,

I'm currently using a tumbling window of 5 seconds using TumblingTimeWindow
but due to change in requirements I would not have to window every incoming
data. With that said I'm planning to use process function to achieve this
selective windowing.

I looked at the example provided in the documentation and I'm not clear on
how I can implement the windowing.
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html

Basically what I want is keep collecting data until it reaches 5 seconds
from the time the first data came in for the key and then forward it to the
next operator. I will be using ListState to add the entries and then
register a timer when the list is empty. When the timer runs then collect
all entries and forward it, also remove entries from the list. Do you guys
think this will suffice or anything else has to be done?

Also I will have about 1M keys, then would there be any performance impact
in creating these many timers? I believe the timers are automatically
removed after they are fired or should I do anything extra to remove these
timers?

Thanks


Re: Broadcast state

2019-10-17 Thread Navneeth Krishnan
Ya, there will not be a problem of duplicates. But what I'm trying to
achieve is if there a large static state which needs to be present just one
per node rather than storing it per slot that would be ideal. The reason
being is that the state is quite large around 100GB of mostly static data
and it is not needed at per slot level. It can be at per instance level
where each slot can read from this shared memory.

Thanks

On Wed, Oct 9, 2019 at 12:13 AM Congxian Qiu  wrote:

> Hi,
>
> After using Redis, why there need to care about eliminate duplicated data,
> if you specify the same key, then Redis will do the deduplicate things.
>
> Best,
> Congxian
>
>
> Fabian Hueske  于2019年10月2日周三 下午5:30写道:
>
>> Hi,
>>
>> State is always associated with a single task in Flink.
>> The state of a task cannot be accessed by other tasks of the same
>> operator or tasks of other operators.
>> This is true for every type of state, including broadcast state.
>>
>> Best, Fabian
>>
>>
>> Am Di., 1. Okt. 2019 um 08:22 Uhr schrieb Navneeth Krishnan <
>> reachnavnee...@gmail.com>:
>>
>>> Hi,
>>>
>>> I can use redis but I’m still having hard time figuring out how I can
>>> eliminate duplicate data. Today without broadcast state in 1.4 I’m using
>>> cache to lazy load the data. I thought the broadcast state will be similar
>>> to that of kafka streams where I have read access to the state across the
>>> pipeline. That will indeed solve a lot of problems. Is there some way I can
>>> do the same with flink?
>>>
>>> Thanks!
>>>
>>> On Mon, Sep 30, 2019 at 10:36 PM Congxian Qiu 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Could you use some cache system such as HBase or Reids to storage this
>>>> data, and query from the cache if needed?
>>>>
>>>> Best,
>>>> Congxian
>>>>
>>>>
>>>> Navneeth Krishnan  于2019年10月1日周二 上午10:15写道:
>>>>
>>>>> Thanks Oytun. The problem with doing that is the same data will be
>>>>> have to be stored multiple times wasting memory. In my case there will
>>>>> around million entries which needs to be used by at least two operators 
>>>>> for
>>>>> now.
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Mon, Sep 30, 2019 at 5:42 PM Oytun Tez  wrote:
>>>>>
>>>>>> This is how we currently use broadcast state. Our states are
>>>>>> re-usable (code-wise), every operator that wants to consume basically 
>>>>>> keeps
>>>>>> the same descriptor state locally by processBroadcastElement'ing into a
>>>>>> local state.
>>>>>>
>>>>>> I am open to suggestions. I see this as a hard drawback of dataflow
>>>>>> programming or Flink framework?
>>>>>>
>>>>>>
>>>>>>
>>>>>> ---
>>>>>> Oytun Tez
>>>>>>
>>>>>> *M O T A W O R D*
>>>>>> The World's Fastest Human Translation Platform.
>>>>>> oy...@motaword.com — www.motaword.com
>>>>>>
>>>>>>
>>>>>> On Mon, Sep 30, 2019 at 8:40 PM Oytun Tez  wrote:
>>>>>>
>>>>>>> You can re-use the broadcasted state (along with its descriptor)
>>>>>>> that comes into your KeyedBroadcastProcessFunction, in another operator
>>>>>>> downstream. that's basically duplicating the broadcasted state whichever
>>>>>>> operator you want to use, every time.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ---
>>>>>>> Oytun Tez
>>>>>>>
>>>>>>> *M O T A W O R D*
>>>>>>> The World's Fastest Human Translation Platform.
>>>>>>> oy...@motaword.com — www.motaword.com
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Sep 30, 2019 at 8:29 PM Navneeth Krishnan <
>>>>>>> reachnavnee...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> Is it possible to access a broadcast state across the pipeline? For
>>>>>>>> example, say I have a KeyedBroadcastProcessFunction which adds the 
>>>>>>>> incoming
>>>>>>>> data to state and I have downstream operator where I need the same 
>>>>>>>> state as
>>>>>>>> well, would I be able to just read the broadcast state with a readonly
>>>>>>>> view. I know this is possible in kafka streams.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>


Re: Broadcast state

2019-09-30 Thread Navneeth Krishnan
Hi,

I can use redis but I’m still having hard time figuring out how I can
eliminate duplicate data. Today without broadcast state in 1.4 I’m using
cache to lazy load the data. I thought the broadcast state will be similar
to that of kafka streams where I have read access to the state across the
pipeline. That will indeed solve a lot of problems. Is there some way I can
do the same with flink?

Thanks!

On Mon, Sep 30, 2019 at 10:36 PM Congxian Qiu 
wrote:

> Hi,
>
> Could you use some cache system such as HBase or Reids to storage this
> data, and query from the cache if needed?
>
> Best,
> Congxian
>
>
> Navneeth Krishnan  于2019年10月1日周二 上午10:15写道:
>
>> Thanks Oytun. The problem with doing that is the same data will be have
>> to be stored multiple times wasting memory. In my case there will around
>> million entries which needs to be used by at least two operators for now.
>>
>> Thanks
>>
>> On Mon, Sep 30, 2019 at 5:42 PM Oytun Tez  wrote:
>>
>>> This is how we currently use broadcast state. Our states are re-usable
>>> (code-wise), every operator that wants to consume basically keeps the same
>>> descriptor state locally by processBroadcastElement'ing into a local state.
>>>
>>> I am open to suggestions. I see this as a hard drawback of dataflow
>>> programming or Flink framework?
>>>
>>>
>>>
>>> ---
>>> Oytun Tez
>>>
>>> *M O T A W O R D*
>>> The World's Fastest Human Translation Platform.
>>> oy...@motaword.com — www.motaword.com
>>>
>>>
>>> On Mon, Sep 30, 2019 at 8:40 PM Oytun Tez  wrote:
>>>
>>>> You can re-use the broadcasted state (along with its descriptor) that
>>>> comes into your KeyedBroadcastProcessFunction, in another operator
>>>> downstream. that's basically duplicating the broadcasted state whichever
>>>> operator you want to use, every time.
>>>>
>>>>
>>>>
>>>> ---
>>>> Oytun Tez
>>>>
>>>> *M O T A W O R D*
>>>> The World's Fastest Human Translation Platform.
>>>> oy...@motaword.com — www.motaword.com
>>>>
>>>>
>>>> On Mon, Sep 30, 2019 at 8:29 PM Navneeth Krishnan <
>>>> reachnavnee...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> Is it possible to access a broadcast state across the pipeline? For
>>>>> example, say I have a KeyedBroadcastProcessFunction which adds the 
>>>>> incoming
>>>>> data to state and I have downstream operator where I need the same state 
>>>>> as
>>>>> well, would I be able to just read the broadcast state with a readonly
>>>>> view. I know this is possible in kafka streams.
>>>>>
>>>>> Thanks
>>>>>
>>>>


Re: Broadcast state

2019-09-30 Thread Navneeth Krishnan
Thanks Oytun. The problem with doing that is the same data will be have to
be stored multiple times wasting memory. In my case there will around
million entries which needs to be used by at least two operators for now.

Thanks

On Mon, Sep 30, 2019 at 5:42 PM Oytun Tez  wrote:

> This is how we currently use broadcast state. Our states are re-usable
> (code-wise), every operator that wants to consume basically keeps the same
> descriptor state locally by processBroadcastElement'ing into a local state.
>
> I am open to suggestions. I see this as a hard drawback of dataflow
> programming or Flink framework?
>
>
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Mon, Sep 30, 2019 at 8:40 PM Oytun Tez  wrote:
>
>> You can re-use the broadcasted state (along with its descriptor) that
>> comes into your KeyedBroadcastProcessFunction, in another operator
>> downstream. that's basically duplicating the broadcasted state whichever
>> operator you want to use, every time.
>>
>>
>>
>> ---
>> Oytun Tez
>>
>> *M O T A W O R D*
>> The World's Fastest Human Translation Platform.
>> oy...@motaword.com — www.motaword.com
>>
>>
>> On Mon, Sep 30, 2019 at 8:29 PM Navneeth Krishnan <
>> reachnavnee...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> Is it possible to access a broadcast state across the pipeline? For
>>> example, say I have a KeyedBroadcastProcessFunction which adds the incoming
>>> data to state and I have downstream operator where I need the same state as
>>> well, would I be able to just read the broadcast state with a readonly
>>> view. I know this is possible in kafka streams.
>>>
>>> Thanks
>>>
>>


Broadcast state

2019-09-30 Thread Navneeth Krishnan
Hi All,

Is it possible to access a broadcast state across the pipeline? For
example, say I have a KeyedBroadcastProcessFunction which adds the incoming
data to state and I have downstream operator where I need the same state as
well, would I be able to just read the broadcast state with a readonly
view. I know this is possible in kafka streams.

Thanks


Re: Running flink on AWS ECS

2019-09-25 Thread Navneeth Krishnan
Thanks Terry, the reason why I asked this is because somewhere I saw
running one slot per container is beneficial. I couldn’t find the where I
saw that.
Also I think running it with multiple slots will reduce IPC since some of
the data will be processed writhing the same JVM.

Thanks

On Wed, Sep 25, 2019 at 1:16 AM Terry Wang  wrote:

> Hi, Navneeth,
>
> I think both is ok.
> IMO, run one container with number of slots same as virtual cores may be
> better for slots can share the Flink Framework and thus reduce memory cost.
>
> Best,
> Terry Wang
>
>
>
> > 在 2019年9月25日,下午3:26,Navneeth Krishnan  写道:
> >
> > Hi All,
> >
> > I’m currently running flink on amazon ecs and I have assigned task slots
> based on vcpus per instance. Is it beneficial to run a separate container
> with one slot each or one container with number of slots same as virtual
> cores?
> >
> > Thanks
>
>


Running flink on AWS ECS

2019-09-25 Thread Navneeth Krishnan
Hi All,

I’m currently running flink on amazon ecs and I have assigned task slots
based on vcpus per instance. Is it beneficial to run a separate container
with one slot each or one container with number of slots same as virtual
cores?

Thanks


Operator state

2019-08-07 Thread Navneeth Krishnan
Hi All,

Is there a way to share operator state among operators? For example, I have
an operator which has union state and the same state data is required in a
downstream operator. If not, is there a recommended way to share the state?

Thanks


RocksDB KeyValue store

2019-07-29 Thread Navneeth Krishnan
Hi All,

I looked at the RocksDB KV store implementation and I found that
deserialization has to happen for each key lookup. Given a scenario where
the key lookup has to happen for every single message would it still be a
good idea to store it in rocksdb store or would in-memory store/cache be
more efficient? I know if the data is stored in KV store it will
automatically distribute when scale up/scale down happens and its fault
tolerant.

For example, if there are 1M user events and a user config of size 1KB is
persisted into rocksdb then for each event would the state have to be
deserialized? Wouldn't this create so many garbage?

Also, is there is per machine sort of state store which can be used for all
keys that are sent to that task manager?

Thanks


Re: Event time window eviction

2019-07-29 Thread Navneeth Krishnan
Thanks Taher. Are there any examples for this? In my scenario I would have
data coming in and it might stop for sometime but I need the window to end
after the duration.

Also, I believe in version 1.3 the event time will progress only if all
partitions in a kafka topic pass the event time. Is that still the case? If
there is data in only few partitions will the event time progress?

Thanks

On Mon, Jul 29, 2019 at 10:51 AM taher koitawala  wrote:

> I believe the approach to this is wrong... For fixing windows we can write
> our custom triggers to fire them... However what I'm not convinced with is
> switching between event and processing time.
>  Write a custom triggers and fire the event time window if you
> don't see any activity. That's the only way.
>
> On Mon, Jul 29, 2019, 11:07 PM Navneeth Krishnan 
> wrote:
>
>> Hi All,
>>
>> Any suggestions?
>>
>> Thanks
>>
>> On Thu, Jul 25, 2019 at 11:45 PM Navneeth Krishnan <
>> reachnavnee...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I'm working on a very short tumbling window for 1 second per key. What I
>>> want to achieve is if the event time per key doesn't progress after a
>>> second I want to evict the window, basically a combination of event time
>>> and processing time. I'm currently achieving it by registering a processing
>>> time timer but is there a way to emit some global punctuator which can be
>>> used to evict all keys window data.
>>>
>>> The issue with registering processing time timer for every key is
>>> causing too much JVM pressure. Any suggestions on how this could be
>>> implemented?
>>>
>>> Thanks
>>>
>>


Re: Event time window eviction

2019-07-29 Thread Navneeth Krishnan
Hi All,

Any suggestions?

Thanks

On Thu, Jul 25, 2019 at 11:45 PM Navneeth Krishnan 
wrote:

> Hi All,
>
> I'm working on a very short tumbling window for 1 second per key. What I
> want to achieve is if the event time per key doesn't progress after a
> second I want to evict the window, basically a combination of event time
> and processing time. I'm currently achieving it by registering a processing
> time timer but is there a way to emit some global punctuator which can be
> used to evict all keys window data.
>
> The issue with registering processing time timer for every key is causing
> too much JVM pressure. Any suggestions on how this could be implemented?
>
> Thanks
>


Event time window eviction

2019-07-25 Thread Navneeth Krishnan
Hi All,

I'm working on a very short tumbling window for 1 second per key. What I
want to achieve is if the event time per key doesn't progress after a
second I want to evict the window, basically a combination of event time
and processing time. I'm currently achieving it by registering a processing
time timer but is there a way to emit some global punctuator which can be
used to evict all keys window data.

The issue with registering processing time timer for every key is causing
too much JVM pressure. Any suggestions on how this could be implemented?

Thanks


Even key distribution workload

2019-07-14 Thread Navneeth Krishnan
Hi All,

Currently I have a keyBy user and I see uneven load distribution since some
of the users would have very high load versus some users having very few
messages. Is there a recommended way to achieve even distribution of
workload? Has someone else encountered this problem and what was the
workaround?

Thanks


Re: Checkpoint failure

2019-07-13 Thread Navneeth Krishnan
Hi All,

Any pointers on the below checkpoint failure scenario. Appreciate all the
help. Thanks

Thanks

On Sun, Jul 7, 2019 at 9:23 PM Navneeth Krishnan 
wrote:

> Hi All,
>
> Occasionally I run into failed checkpoints error where 2 or 3 consecutive
> checkpoints fails after running for a minute and then it recovers. This is
> causing delay in processing the incoming data since there is huge amount of
> data buffered during the failed checkpoints. I don't see any errors in the
> taskmanager logs but here is the error in the jobmanager log. The state
> size is around 100 mb.
>
> *Checkpoint configuration:*
> Option Value
> Checkpointing Mode Exactly Once
> Interval 1m 0s
> Timeout 1m 0s
> Minimum Pause Between Checkpoints 5s
> Maximum Concurrent Checkpoints 1
> Persist Checkpoints Externally Enabled (retain on cancellation)
> *Jobmanager Log:*
>
> 2019-07-05 17:53:54,125 [flink-akka.actor.default-dispatcher-465901] WARN
> o.a.f.r.c.CheckpointCoordinator - Received late message for now expired
> checkpoint attempt 9867 from 79515b6550d2c223701be0a9c870995f of job
> 00ff93caa4cc9464bd41e1d050fcf65c.
> 2019-07-05 17:53:54,141 [flink-akka.actor.default-dispatcher-465901] WARN
> o.a.f.r.c.CheckpointCoordinator - Received late message for now expired
> checkpoint attempt 9867 from 630984cdd5e66b4d9ea95a91cb4d23f6 of job
> 00ff93caa4cc9464bd41e1d050fcf65c.
> 2019-07-05 17:53:54,168 [flink-akka.actor.default-dispatcher-465901] WARN
> o.a.f.r.c.CheckpointCoordinator - Received late message for now expired
> checkpoint attempt 9867 from e12ed2e185a37559f93181905a52ebeb of job
> 00ff93caa4cc9464bd41e1d050fcf65c.
> 2019-07-05 17:53:54,215 [flink-akka.actor.default-dispatcher-465901] WARN
> o.a.f.r.c.CheckpointCoordinator - Received late message for now expired
> checkpoint attempt 9867 from 1fede192e2ff11e0905d98ff5ff6f9ce of job
> 00ff93caa4cc9464bd41e1d050fcf65c.
> 2019-07-05 17:53:54,223 [flink-akka.actor.default-dispatcher-465901] WARN
> o.a.f.r.c.CheckpointCoordinator - Received late message for now expired
> checkpoint attempt 9867 from d4e895eb20cc259c95b249cd0252930f of job
> 00ff93caa4cc9464bd41e1d050fcf65c.
> 2019-07-05 17:53:54,310 [flink-akka.actor.default-dispatcher-465901] WARN
> o.a.f.r.c.CheckpointCoordinator - Received late message for now expired
> checkpoint attempt 9867 from be5c711d7b37ed6d804dc447db91 of job
> 00ff93caa4cc9464bd41e1d050fcf65c.
> 2019-07-05 17:53:54,351 [flink-akka.actor.default-dispatcher-465901] WARN
> o.a.f.r.c.CheckpointCoordinator - Received late message for now expired
> checkpoint attempt 9867 from 1ed52695cc407f2f143d2bb5d23cbdbb of job
> 00ff93caa4cc9464bd41e1d050fcf65c.
> 2019-07-05 17:53:54,398 [flink-akka.actor.default-dispatcher-465901] WARN
> o.a.f.r.c.CheckpointCoordinator - Received late message for now expired
> checkpoint attempt 9867 from 2e43cf968ad399c0b8426239a7dd081c of job
> 00ff93caa4cc9464bd41e1d050fcf65c.
> 2019-07-05 17:53:54,959 [flink-akka.actor.default-dispatcher-465868] INFO
> o.a.f.r.c.CheckpointCoordinator - Completed checkpoint 9868 (279307042
> bytes in 50707 ms).
> 2019-07-05 17:54:04,174 [Checkpoint Timer] INFO
> o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 9869 @ 1562349244171
> 2019-07-05 17:54:10,709 [flink-akka.actor.default-dispatcher-465905] INFO
> o.a.f.r.c.CheckpointCoordinator - Completed checkpoint 9869 (253638470
> bytes in 6430 ms).
> 2019-07-05 17:55:04,174 [Checkpoint Timer] INFO
> o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 9870 @ 1562349304171
> 2019-07-05 17:55:09,816 [flink-akka.actor.default-dispatcher-465913] INFO
> o.a.f.r.c.CheckpointCoordinator - Completed checkpoint 9870 (138649543
> bytes in 5551 ms).
> 2019-07-05 17:56:04,174 [Checkpoint Timer] INFO
> o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 9871 @ 1562349364171
>
> Thanks
>


Checkpoint failure

2019-07-07 Thread Navneeth Krishnan
Hi All,

Occasionally I run into failed checkpoints error where 2 or 3 consecutive
checkpoints fails after running for a minute and then it recovers. This is
causing delay in processing the incoming data since there is huge amount of
data buffered during the failed checkpoints. I don't see any errors in the
taskmanager logs but here is the error in the jobmanager log. The state
size is around 100 mb.

*Checkpoint configuration:*
Option Value
Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 1m 0s
Minimum Pause Between Checkpoints 5s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)
*Jobmanager Log:*

2019-07-05 17:53:54,125 [flink-akka.actor.default-dispatcher-465901] WARN
o.a.f.r.c.CheckpointCoordinator - Received late message for now expired
checkpoint attempt 9867 from 79515b6550d2c223701be0a9c870995f of job
00ff93caa4cc9464bd41e1d050fcf65c.
2019-07-05 17:53:54,141 [flink-akka.actor.default-dispatcher-465901] WARN
o.a.f.r.c.CheckpointCoordinator - Received late message for now expired
checkpoint attempt 9867 from 630984cdd5e66b4d9ea95a91cb4d23f6 of job
00ff93caa4cc9464bd41e1d050fcf65c.
2019-07-05 17:53:54,168 [flink-akka.actor.default-dispatcher-465901] WARN
o.a.f.r.c.CheckpointCoordinator - Received late message for now expired
checkpoint attempt 9867 from e12ed2e185a37559f93181905a52ebeb of job
00ff93caa4cc9464bd41e1d050fcf65c.
2019-07-05 17:53:54,215 [flink-akka.actor.default-dispatcher-465901] WARN
o.a.f.r.c.CheckpointCoordinator - Received late message for now expired
checkpoint attempt 9867 from 1fede192e2ff11e0905d98ff5ff6f9ce of job
00ff93caa4cc9464bd41e1d050fcf65c.
2019-07-05 17:53:54,223 [flink-akka.actor.default-dispatcher-465901] WARN
o.a.f.r.c.CheckpointCoordinator - Received late message for now expired
checkpoint attempt 9867 from d4e895eb20cc259c95b249cd0252930f of job
00ff93caa4cc9464bd41e1d050fcf65c.
2019-07-05 17:53:54,310 [flink-akka.actor.default-dispatcher-465901] WARN
o.a.f.r.c.CheckpointCoordinator - Received late message for now expired
checkpoint attempt 9867 from be5c711d7b37ed6d804dc447db91 of job
00ff93caa4cc9464bd41e1d050fcf65c.
2019-07-05 17:53:54,351 [flink-akka.actor.default-dispatcher-465901] WARN
o.a.f.r.c.CheckpointCoordinator - Received late message for now expired
checkpoint attempt 9867 from 1ed52695cc407f2f143d2bb5d23cbdbb of job
00ff93caa4cc9464bd41e1d050fcf65c.
2019-07-05 17:53:54,398 [flink-akka.actor.default-dispatcher-465901] WARN
o.a.f.r.c.CheckpointCoordinator - Received late message for now expired
checkpoint attempt 9867 from 2e43cf968ad399c0b8426239a7dd081c of job
00ff93caa4cc9464bd41e1d050fcf65c.
2019-07-05 17:53:54,959 [flink-akka.actor.default-dispatcher-465868] INFO
o.a.f.r.c.CheckpointCoordinator - Completed checkpoint 9868 (279307042
bytes in 50707 ms).
2019-07-05 17:54:04,174 [Checkpoint Timer] INFO
o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 9869 @ 1562349244171
2019-07-05 17:54:10,709 [flink-akka.actor.default-dispatcher-465905] INFO
o.a.f.r.c.CheckpointCoordinator - Completed checkpoint 9869 (253638470
bytes in 6430 ms).
2019-07-05 17:55:04,174 [Checkpoint Timer] INFO
o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 9870 @ 1562349304171
2019-07-05 17:55:09,816 [flink-akka.actor.default-dispatcher-465913] INFO
o.a.f.r.c.CheckpointCoordinator - Completed checkpoint 9870 (138649543
bytes in 5551 ms).
2019-07-05 17:56:04,174 [Checkpoint Timer] INFO
o.a.f.r.c.CheckpointCoordinator - Triggering checkpoint 9871 @ 1562349364171

Thanks


Flink forward talks

2019-04-10 Thread Navneeth Krishnan
Hi All,

Where can I get the videos of latest flink forward talks?

Thanks,
Navneeth


Flink Pipeline - CICD

2019-04-08 Thread Navneeth Krishnan
Hi All,

We have some streaming jobs in production and today we manually deploy the
flink jobs in each region/environment. Before we start automating it I just
wanted to check if anyone has already created a CICD script for Jenkins or
other CICD tools to deploy the latest JAR on to running flink clusters? Any
pointers would help. Thanks

Regards,
Navneeth


Connect keyed stream with broadcast

2018-05-10 Thread Navneeth Krishnan
Hi,

Is this feature present in flink 1.5? I have a requirement to connect a
keyed stream and broadcast stream.

https://issues.apache.org/jira/browse/FLINK-3659

Thanks,
Navneeth


SideOutput Issue

2018-04-03 Thread Navneeth Krishnan
Hi All,

I'm having issues with creating side outputs. There are two input sources
(both from kafka) and they are connected and fed into a co-process
function. Inside the co-process, the regular data stream outputs a POJO and
in processElement2 there is a periodic timer which creates the side output.
When I start the job I get the below exception. Is there something that I'm
doing wrong?

I used the below example to implement the side output.
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java

processElement2
ctx.output("side-output", POJO);

Job
dataStream.getSideOutput("side-output").print();


2018-04-03 10:18:38.821 [Co-Flat Map (4/8)] INFO
org.apache.flink.runtime.taskmanager.Task  - Co-Flat Map (4/8)
(20b92b7a8cdd1e63963886de0895882c) switched from CREATED to DEPLOYING.
2018-04-03 10:18:38.821 [Co-Process (1/8)] INFO
org.apache.flink.runtime.taskmanager.Task  - Co-Process (1/8)
(fd8f971eea2e103e340d2955b384eaa3) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
at
org.apache.flink.streaming.api.collector.selector.DirectedOutput.(DirectedOutput.java:74)
at
org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.(CopyingDirectedOutput.java:40)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

2018-04-03 10:18:38.880 [Co-Process (7/8)] INFO
org.apache.flink.runtime.taskmanager.Task  - Co-Process (7/8)
(a86274f9ac49b71f00d218a1533cbd51) switched from RUNNING to FAILED.
java.lang.NullPointerException: null
at
org.apache.flink.streaming.api.collector.selector.DirectedOutput.(DirectedOutput.java:74)
at
org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput.(CopyingDirectedOutput.java:40)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:329)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

Thanks


Record timestamp from kafka

2018-03-29 Thread Navneeth Krishnan
Hi,

Is there way to get the kafka timestamp in deserialization schema? All
records are written to kafka with timestamp and I would like to set that
timestamp to every record that is ingested. Thanks.


Job restart hook

2018-03-29 Thread Navneeth Krishnan
Hi,

Is there a way for a script to be called whenever a job gets restarted? My
scenario is lets say there are 20 slots and the job runs on all 20 slots.
After a while a task manager goes down and now there are only 14 slots and
I need to readjust the parallelism of my job to ensure the job runs until
the lost TM comes up again. It would be great to know how others are
handling this situation.

Thanks,
Navneeth


Re: Event time window questions

2018-01-23 Thread Navneeth Krishnan
Thanks Sendoh. Is there a way to advance watermark even when there are no
incoming events. What exactly does setAutoWatermarkInterval do?

Also I don't see the watermark displayed in flink dashboard.

Will the watermark advance only when there is data from all consuming kafka
topic and partitions? I have 3 topics with 3 partitions in each topic.

Thanks.

Regards,
Navneeth

On Tue, Jan 23, 2018 at 9:32 AM, Sendoh  wrote:

> Hi,
>
> you can write your own trigger and window, and implement whatever logic
> there.
> There are some examples
> https://github.com/apache/flink/blob/1875cac03042dad4a4c47b0de8364f
> 02fbe457c6/flink-streaming-java/src/main/java/org/apache/
> flink/streaming/api/windowing/triggers/
>
> If you don't see any event, it means window is not triggered.
>
> It would mean Watermark is not increasing. The issue can be the timestamp
> is
> not extracted correctly.
> Or, if you miss the trigger if use the window function doesn't have it.
>
> Cheers,
>
> Sendoh
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Timer & Window Memory Consumption

2018-01-23 Thread Navneeth Krishnan
Thanks Fabian but for 1.5k messages per second per TM there are several
million Internal & TimerWindow objects created within a period of 5
seconds. Is there a way to get debug this issue?

Regards,
Navneeth

On Tue, Jan 23, 2018 at 2:09 AM, Fabian Hueske  wrote:

> Hi,
>
> TimeWindows and Timers are created for each window, i.e., every 5 seconds
> for every distinct key that a task is processing.
> Event-time windows are completed and cleaned up when a watermark is
> received that passes the window end timestamp.
> Therefore, there might be more than one window per key depending on the
> watermarks.
>
> Hope this helps,
> Fabian
>
> 2018-01-21 6:48 GMT+01:00 Navneeth Krishnan :
>
>> Hi,
>>
>> I'm facing issues with frequent young generation garbage collections in
>> my task manager which happens approximately every few seconds. I have 3
>> task managers with 12GB heap allocated on each and I have set the config to
>> use G1GC. My program ingests binary data from kafka source and the message
>> rate is around 4.5k msgs/sec with around 400 bytes per msg.  Below are the
>> operators used in the program.
>>
>> kafka src -> keyby -> CoProcess -> keyby -> Tumbling Window (5secs) ->
>> FlatMap -> Sink
>>
>> I captured the below histograms at 5 second intervals and analyzed the
>> heap as well. It looks like a lot InternalTimer and TimeWindow objects are
>> created.
>>
>> Also, I see a high usage in org.apache.flink.streaming.
>> api.operators.HeapInternalTimerService.
>>
>> *Window code:*
>> dataStream.keyBy(new MessageKeySelector())
>> .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>> .apply(new Aggregate());
>>
>> *Captured at time T:*
>>
>>  num #instances #bytes  class name
>> --
>>1:   2074427  481933816  [B
>>2:357192  339368592  [D
>>3:  12759222  204147552  java.lang.Integer
>>4: 31416   85151832  [I
>>5:900982   83872240  [C
>>6:631888   20220416  java.util.HashMap$Node
>>7:804203   19300872  java.lang.String
>>8:541651   17332832  org.apache.flink.streaming.api
>> .operators.InternalTimer
>>9:540252   17288064  org.apache.flink.streaming.api
>> .windowing.windows.TimeWindow
>>
>>
>> *Captured at T1 (T + 5 seconds):*
>>
>>  num #instances #bytes  class name
>> --
>>1:  12084258 2282849264 <(228)%20284-9264>  [B
>>2:   1922018 1828760896  [D
>>3:  68261427 1092182832  java.lang.Integer
>>4:   2712099  291488736  [C
>>5: 54201   98798976  [I
>>6:   2028250   48678000  java.lang.String
>>7: 66080   43528136  [[B
>>8:   1401915   35580168  [Ljava.lang.Object;
>>9:949062   30369984  java.util.HashMap$Node
>>   10:570832   18266624  org.apache.flink.streaming.api
>> .operators.InternalTimer
>>   11:549979   17599328  org.apache.flink.streaming.api
>> .windowing.windows.TimeWindow
>>
>>
>> *Captured at T2 (T1+ 5 seconds):*
>>
>>  num #instances #bytes  class name
>> --
>>1:   9911982 2920384472  [B
>>2:   1584406 1510958520  [D
>>3:  56087337  897397392  java.lang.Integer
>>4:  26080337  834570784  java.util.HashMap$Node
>>5:  25756748  824215936  org.apache.flink.streaming.api
>> .operators.InternalTimer
>>6:  25740086  823682752  org.apache.flink.streaming.api
>> .windowing.windows.TimeWindow
>>
>> Thanks.
>>
>>
>


Re: Network memory segments

2018-01-23 Thread Navneeth Krishnan
Thanks Chesnay.

On Tue, Jan 23, 2018 at 6:54 AM, Chesnay Schepler 
wrote:

> I could reproduce this locally and opened a JIRA
> <https://issues.apache.org/jira/browse/FLINK-8496>.
>
>
> On 21.01.2018 04:32, Navneeth Krishnan wrote:
>
> Hi,
>
> We recently upgraded from flink 1.3 to 1.4 and in the task manager UI it
> shows there are 0 memory segments whereas in 1.3 I think it was default
> 32k. I have even tried adding the below config but still it shows 0.
>
> taskmanager.network.numberOfBuffers: 2048
>
> [image: Inline image 1]
>
> Regards,
> Navneeth
>
>
>


Re: Scaling Flink

2018-01-21 Thread Navneeth Krishnan
Hi,

Any suggestions would really help. Thanks.

On Mon, Jan 15, 2018 at 12:07 AM, Navneeth Krishnan <
reachnavnee...@gmail.com> wrote:

> Hi All,
>
> Has anyone tried scaling out flink cluster on EMR based on CPU usage/
> kafka lag/ back pressure monitoring? If so can you provide some insights on
> how it could be achieved and sample scripts if possible. Thanks a lot.
>
> Thanks,
> Navneeth
>


Event time window questions

2018-01-21 Thread Navneeth Krishnan
Hi,

I am having few issues with event time windowing. Here is my scenario, data
is ingested from a kafka consumer and then keyed by user followed by a
Tumbling event window for 10 seconds. The max lag tolerance limit is 1
second.

I have the BoundedOutOfOrdernessGenerator that extends
AssignerWithPeriodicWatermarks to assign watermarks. When the data is
ingested even after receiving multiple messages per user the window never
gets evicted. What am I missing here?

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_timestamps_watermarks.html

.window(TumblingEventTimeWindows.of(Time.seconds(10)))

The other issue I am having is there will be scenarios where there is just
one message per user for more than a minute. In that case I want the window
content to be evicted after the defined window interval of 10 seconds. Is
there a way to evict the window data even when there is no more incoming
data for that key? I have tried setAutoWatermarkInterval(1) but still
no luck.

How do I get the current watermark to be displayed in the flink dashboard
UI under watermarks sections? Currently it shows no watermarks.

Also is there a way to count the number of messages that missed the time
window due to late arrival?

Thanks and appreciate all the help.


Timer & Window Memory Consumption

2018-01-20 Thread Navneeth Krishnan
Hi,

I'm facing issues with frequent young generation garbage collections in my
task manager which happens approximately every few seconds. I have 3 task
managers with 12GB heap allocated on each and I have set the config to use
G1GC. My program ingests binary data from kafka source and the message rate
is around 4.5k msgs/sec with around 400 bytes per msg.  Below are the
operators used in the program.

kafka src -> keyby -> CoProcess -> keyby -> Tumbling Window (5secs) ->
FlatMap -> Sink

I captured the below histograms at 5 second intervals and analyzed the heap
as well. It looks like a lot InternalTimer and TimeWindow objects are
created.

Also, I see a high usage in
org.apache.flink.streaming.api.operators.HeapInternalTimerService.

*Window code:*
dataStream.keyBy(new MessageKeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new Aggregate());

*Captured at time T:*

 num #instances #bytes  class name
--
   1:   2074427  481933816  [B
   2:357192  339368592  [D
   3:  12759222  204147552  java.lang.Integer
   4: 31416   85151832  [I
   5:900982   83872240  [C
   6:631888   20220416  java.util.HashMap$Node
   7:804203   19300872  java.lang.String
   8:541651   17332832
org.apache.flink.streaming.api.operators.InternalTimer
   9:540252   17288064
org.apache.flink.streaming.api.windowing.windows.TimeWindow


*Captured at T1 (T + 5 seconds):*

 num #instances #bytes  class name
--
   1:  12084258 2282849264  [B
   2:   1922018 1828760896  [D
   3:  68261427 1092182832  java.lang.Integer
   4:   2712099  291488736  [C
   5: 54201   98798976  [I
   6:   2028250   48678000  java.lang.String
   7: 66080   43528136  [[B
   8:   1401915   35580168  [Ljava.lang.Object;
   9:949062   30369984  java.util.HashMap$Node
  10:570832   18266624
org.apache.flink.streaming.api.operators.InternalTimer
  11:549979   17599328
org.apache.flink.streaming.api.windowing.windows.TimeWindow


*Captured at T2 (T1+ 5 seconds):*

 num #instances #bytes  class name
--
   1:   9911982 2920384472  [B
   2:   1584406 1510958520  [D
   3:  56087337  897397392  java.lang.Integer
   4:  26080337  834570784  java.util.HashMap$Node
   5:  25756748  824215936
org.apache.flink.streaming.api.operators.InternalTimer
   6:  25740086  823682752
org.apache.flink.streaming.api.windowing.windows.TimeWindow

Thanks.


Network memory segments

2018-01-20 Thread Navneeth Krishnan
Hi,

We recently upgraded from flink 1.3 to 1.4 and in the task manager UI it
shows there are 0 memory segments whereas in 1.3 I think it was default
32k. I have even tried adding the below config but still it shows 0.

taskmanager.network.numberOfBuffers: 2048

[image: Inline image 1]

Regards,
Navneeth


Scaling Flink

2018-01-15 Thread Navneeth Krishnan
Hi All,

Has anyone tried scaling out flink cluster on EMR based on CPU usage/ kafka
lag/ back pressure monitoring? If so can you provide some insights on how
it could be achieved and sample scripts if possible. Thanks a lot.

Thanks,
Navneeth


Static Variables

2017-12-19 Thread Navneeth Krishnan
Hi,

I have a requirement to initialize few guava caches per jvm and some static
helper classes. I tried few options but nothing worked. Need some help.
Thanks a lot.

1. Operator level static variables:

public static Cache loadingCache;

public void open(Configuration parameters) throws Exception {
if (loadingCache == null)
initializeCache();
}

The cache object is null on each operator slot and it gets initialized on
every call to open method.

2. Initialize in operator class constructor:

public FlatMapFunction(ParameterTool parameterTool) {
this. parameterTool = parameterTool;
initializeCache();
}

The cache doesn't seem to be initialized when accessed inside the task
manager.

Thanks.


Re: Flink Kafka Producer Exception

2017-12-13 Thread Navneeth Krishnan
Hi,

I'm receiving this error and due to which I'm not able to run my job. Any
help is greatly appreciated. Thanks.

On Tue, Dec 12, 2017 at 10:21 AM, Navneeth Krishnan <
reachnavnee...@gmail.com> wrote:

> Hi,
>
> I have a kafka source and sink in my pipeline and when I start my job I
> get this error and the job goes to failed state. I checked the kafka node
> and everything looks good. Any suggestion on what is happening here? Thanks.
>
> java.lang.Exception: Failed to send data to Kafka: The server disconnected 
> before a response was received.
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:443)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:420)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:394)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:612)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:598)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>   at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>   at com.transformations.MyProcessor.flatMap(MyProcessor.java:115)
>   at com.transformations.MyProcessor.flatMap(MyProcessor.java:47)
>   at 
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
>
>


Re: Custom Metrics

2017-12-13 Thread Navneeth Krishnan
Thanks Pitor.

I have couple more questions related to metrics. I use Influx db reporter
to report flink metrics and I see a lot of metrics are bring reported. Is
there a way to select only a subset of metrics that we need to monitor the
application?

Also, Is there a way to specify custom metics scope? Basically I register
metrics like below, add a custom metric group and then add a meter per
user. I would like this to be reported as measurement "Users" and tags with
user id. This way I can easily visualize the data in grafana or any other
tool by selecting the measurement and group by tag. Is there a way to
report like that instead of host, process_type, tm_id, job_name, task_name
& subtask_index?

metricGroup.addGroup("Users")
.meter(userId, new DropwizardMeterWrapper(new
com.codahale.metrics.Meter()));

Thanks a bunch.

On Mon, Dec 11, 2017 at 11:12 PM, Piotr Nowojski 
wrote:

> Hi,
>
> Reporting once per 10 seconds shouldn’t create problems. Best to try it
> out. Let us know if you get into some troubles :)
>
> Piotrek
>
> On 11 Dec 2017, at 18:23, Navneeth Krishnan 
> wrote:
>
> Thanks Piotr.
>
> Yes, passing the metric group should be sufficient. The subcomponents will
> not be able to provide the list of metrics to register since the metrics
> are created based on incoming data by tenant. Also I am planning to have
> the metrics reported every 10 seconds and hope it shouldn't be a problem.
> We use influx and grafana to plot the metrics.
>
> The option 2 that I had in mind was to collect all metrics and use influx
> db sink to report it directly inside the pipeline. But it seems reporting
> per node might not be possible.
>
>
> On Mon, Dec 11, 2017 at 3:14 AM, Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> I’m not sure if I completely understand your issue.
>>
>> 1.
>> - You don’t have to pass RuntimeContext, you can always pass just the
>> MetricGroup or ask your components/subclasses “what metrics do you want to
>> register” and register them at the top level.
>> - Reporting tens/hundreds/thousands of metrics shouldn’t be an issue for
>> Flink, as long as you have a reasonable reporting interval. However keep in
>> mind that Flink only reports your metrics and you still need something to
>> read/handle/process/aggregate your metrics
>> 2.
>> I don’t think that reporting per node/jvm is possible with Flink’s metric
>> system. For that you would need some other solution, like report your
>> metrics using JMX (directly register MBeans from your code)
>>
>> Piotrek
>>
>> > On 10 Dec 2017, at 18:51, Navneeth Krishnan 
>> wrote:
>> >
>> > Hi,
>> >
>> > I have a streaming pipeline running on flink and I need to collect
>> metrics to identify how my algorithm is performing. The entire pipeline is
>> multi-tenanted and I also need metrics per tenant. Lets say there would be
>> around 20 metrics to be captured per tenant. I have the following ideas for
>> implemention but any suggestions on which one might be better will help.
>> >
>> > 1. Use flink metric group and register a group per tenant at the
>> operator level. The disadvantage of this approach for me is I need the
>> runtimecontext parameter to register a metric and I have various subclasses
>> to which I need to pass this object to limit the metric scope within the
>> operator. Also there will be too many metrics reported if there are higher
>> number of subtasks.
>> > How is everyone accessing flink state/ metrics from other classes where
>> you don't have access to runtimecontext?
>> >
>> > 2. Use a custom singleton metric registry to add and send these metrics
>> using custom sink. Instead of using flink metric group to collect metrics
>> per operatior - subtask, collect per jvm and use influx sink to send the
>> metric data. What i'm not sure in this case is how to collect only once per
>> node/jvm.
>> >
>> > Thanks a bunch in advance.
>>
>>
>
>


Flink Kafka Producer Exception

2017-12-12 Thread Navneeth Krishnan
Hi,

I have a kafka source and sink in my pipeline and when I start my job I get
this error and the job goes to failed state. I checked the kafka node and
everything looks good. Any suggestion on what is happening here? Thanks.

java.lang.Exception: Failed to send data to Kafka: The server
disconnected before a response was received.
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:373)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:302)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:421)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:443)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:420)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:394)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:612)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:598)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at com.transformations.MyProcessor.flatMap(MyProcessor.java:115)
at com.transformations.MyProcessor.flatMap(MyProcessor.java:47)
at 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)


Re: Custom Metrics

2017-12-11 Thread Navneeth Krishnan
Thanks Piotr.

Yes, passing the metric group should be sufficient. The subcomponents will
not be able to provide the list of metrics to register since the metrics
are created based on incoming data by tenant. Also I am planning to have
the metrics reported every 10 seconds and hope it shouldn't be a problem.
We use influx and grafana to plot the metrics.

The option 2 that I had in mind was to collect all metrics and use influx
db sink to report it directly inside the pipeline. But it seems reporting
per node might not be possible.


On Mon, Dec 11, 2017 at 3:14 AM, Piotr Nowojski 
wrote:

> Hi,
>
> I’m not sure if I completely understand your issue.
>
> 1.
> - You don’t have to pass RuntimeContext, you can always pass just the
> MetricGroup or ask your components/subclasses “what metrics do you want to
> register” and register them at the top level.
> - Reporting tens/hundreds/thousands of metrics shouldn’t be an issue for
> Flink, as long as you have a reasonable reporting interval. However keep in
> mind that Flink only reports your metrics and you still need something to
> read/handle/process/aggregate your metrics
> 2.
> I don’t think that reporting per node/jvm is possible with Flink’s metric
> system. For that you would need some other solution, like report your
> metrics using JMX (directly register MBeans from your code)
>
> Piotrek
>
> > On 10 Dec 2017, at 18:51, Navneeth Krishnan 
> wrote:
> >
> > Hi,
> >
> > I have a streaming pipeline running on flink and I need to collect
> metrics to identify how my algorithm is performing. The entire pipeline is
> multi-tenanted and I also need metrics per tenant. Lets say there would be
> around 20 metrics to be captured per tenant. I have the following ideas for
> implemention but any suggestions on which one might be better will help.
> >
> > 1. Use flink metric group and register a group per tenant at the
> operator level. The disadvantage of this approach for me is I need the
> runtimecontext parameter to register a metric and I have various subclasses
> to which I need to pass this object to limit the metric scope within the
> operator. Also there will be too many metrics reported if there are higher
> number of subtasks.
> > How is everyone accessing flink state/ metrics from other classes where
> you don't have access to runtimecontext?
> >
> > 2. Use a custom singleton metric registry to add and send these metrics
> using custom sink. Instead of using flink metric group to collect metrics
> per operatior - subtask, collect per jvm and use influx sink to send the
> metric data. What i'm not sure in this case is how to collect only once per
> node/jvm.
> >
> > Thanks a bunch in advance.
>
>


Custom Metrics

2017-12-10 Thread Navneeth Krishnan
Hi,

I have a streaming pipeline running on flink and I need to collect metrics
to identify how my algorithm is performing. The entire pipeline is
multi-tenanted and I also need metrics per tenant. Lets say there would be
around 20 metrics to be captured per tenant. I have the following ideas for
implemention but any suggestions on which one might be better will help.

1. Use flink metric group and register a group per tenant at the operator
level. The disadvantage of this approach for me is I need the
runtimecontext parameter to register a metric and I have various subclasses
to which I need to pass this object to limit the metric scope within the
operator. Also there will be too many metrics reported if there are higher
number of subtasks.
How is everyone accessing flink state/ metrics from other classes where you
don't have access to runtimecontext?

2. Use a custom singleton metric registry to add and send these metrics
using custom sink. Instead of using flink metric group to collect metrics
per operatior - subtask, collect per jvm and use influx sink to send the
metric data. What i'm not sure in this case is how to collect only once per
node/jvm.

Thanks a bunch in advance.


Passing Configuration & State

2017-10-26 Thread Navneeth Krishnan
Hi All,

I have developed a streaming pipeline in java and I need to pass some of
the configuration parameters that are passed during program startup to user
functions. I used the below link as reference.

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html

I have tried withParameters & setGlobalJobParameters but that doesn't seem
to work. The parameters are blank inside my user function when deployed in
a cluster. I have also tried passing the parameters inside the constructor
of my user function and this seem to work on local but not in cluster mode,
again the parameters are blank.

Is there a recommended way to pass the program parameters to user function
classes?

Also, I have scenario where the state created inside a user function has to
passed around to multiple classes. Is there a state registry or something
from which I can retrieve a registered state and use or should I implement
my own?

Thanks in advance.

Regards,
Navneeth


Reading Yarn Application Name in flink

2017-10-24 Thread Navneeth Krishnan
Hi All,

Is there a way to read the yarn application id/ name within flink so that
the logs can be sent to an external logging stack like ELK or CloudWatch
merged by the application?

Thanks,
Navneeth


[no subject]

2017-10-19 Thread Navneeth Krishnan
Hello All,

I have an in-memory cache created inside a user function and I need to
assign the max capacity for it. Since the program can be run on any
hardware, I'm thinking if I cloud assign based on flink's allocated managed
memory.

Is there a way to get the flink managed memory size inside a user function?
If not are there any other options?

Thanks,
Navneeth


Running flink on YARN

2017-10-12 Thread Navneeth Krishnan
Hello,

I'm running flink on AWS EMR and I would like to know how I can pass a
custom log4j properties file. I changed the log4j.properties file in flink
conf directory but it doesn't seem like the changes are reflected. Thanks.

I'm using the below command to start my flink job.
> flink run -m yarn-cluster

Regards,
Navneeth


Re: Flink on EMR

2017-09-26 Thread Navneeth Krishnan
Hi All,

Any suggestions?

Thanks.

On Mon, Sep 25, 2017 at 10:14 PM, Navneeth Krishnan <
reachnavnee...@gmail.com> wrote:

> Hello All,
>
> I'm trying to deploy flink on AWS EMR and I'm very new to EMR. I'm running
> into multiple issues and need some help.
>
> *Issue1:*
>
> How did others resolve this multiple bindings issue?
>
>
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1505848894978_0007/filecache/11/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1505848894978_0007/filecache/12/location-compute-1.0-SNAPSHOT-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>
>
> *Issue2:*
>
> Running the below command runs the pipeline but the task manager is allocated 
> with only 5GB memory instead of 8GB memory. Any reason why?
> flink run -m yarn-cluster -yn 4 -yjm 2048 -ytm 8192 ./my-pipeline.jar
>
>
> *Issue3:*
>
> How to provide the checkpoint directory? By just providing this 
> "hdfs:///checkpoints/" will it work or should I provide any master node host 
> name?
>
>
> *Issue 4:*
>
> How can I get the task manager logs? Should I use log aggregation in hadoop 
> yarn or send it to cloud watch?
>
>
> Also if there any best practices to be used while running flink on yarn, 
> please let me know.
>
>
> Thanks a lot.
>
>
> Regards,
>
> Navneeth
>
>


Re: Flink on EMR

2017-09-25 Thread Navneeth Krishnan
Hi,

I’m using the default flink package that comes with EMR. I’m facing the
issue while running my pipeline. Thanks.

On Mon, Sep 25, 2017 at 11:09 PM Jörn Franke  wrote:

> Amazon EMR has already a Flink package. You just need to check the
> checkbox. I would not install it on your own.
> I think you can find it in the advanced options.
>
> On 26. Sep 2017, at 07:14, Navneeth Krishnan 
> wrote:
>
> Hello All,
>
> I'm trying to deploy flink on AWS EMR and I'm very new to EMR. I'm running
> into multiple issues and need some help.
>
> *Issue1:*
>
> How did others resolve this multiple bindings issue?
>
>
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1505848894978_0007/filecache/11/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1505848894978_0007/filecache/12/location-compute-1.0-SNAPSHOT-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>
>
> *Issue2:*
>
> Running the below command runs the pipeline but the task manager is allocated 
> with only 5GB memory instead of 8GB memory. Any reason why?
> flink run -m yarn-cluster -yn 4 -yjm 2048 -ytm 8192 ./my-pipeline.jar
>
>
> *Issue3:*
>
> How to provide the checkpoint directory? By just providing this 
> "hdfs:///checkpoints/" will it work or should I provide any master node host 
> name?
>
>
> *Issue 4:*
>
> How can I get the task manager logs? Should I use log aggregation in hadoop 
> yarn or send it to cloud watch?
>
>
> Also if there any best practices to be used while running flink on yarn, 
> please let me know.
>
>
> Thanks a lot.
>
>
> Regards,
>
> Navneeth
>
>


Re: Broadcast Config through Connected Stream

2017-09-25 Thread Navneeth Krishnan
Thanks a lot Aljoscha. That helps.

On Mon, Sep 25, 2017 at 4:47 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> I think this is a valid approach, you can even use "operator state" in
> your map function to make the broadcast config state stateful.
>
> Another approach would be to use internal APIs to hack an operator that
> has a keyed stream on one input and a broadcast stream on the second input.
> You can see that approach in action in the Beam Flink Runner [1] but I
> would strongly recommend against doing that because it is using internal
> APIs and if the other approach works for you I would stay with that.
>
> Best,
> Aljoscha
>
> [1] https://github.com/apache/beam/blob/be9fb29901cf4a1ae7b4a9d8e9f25f
> 4ea78359fd/runners/flink/src/main/java/org/apache/beam/runners/flink/
> FlinkStreamingTransformTranslators.java#L488
>
> On 15. Sep 2017, at 07:04, Navneeth Krishnan 
> wrote:
>
> Hi,
>
> Any suggestions on this could be achieved?
>
> Thanks
>
> On Thu, Sep 7, 2017 at 8:02 AM, Navneeth Krishnan <
> reachnavnee...@gmail.com> wrote:
>
>> Hi All,
>>
>> Any suggestions on this would really help.
>>
>> Thanks.
>>
>> On Tue, Sep 5, 2017 at 2:42 PM, Navneeth Krishnan <
>> reachnavnee...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I looked into an earlier email about the topic broadcast config through
>>> connected stream and I couldn't find the conclusion.
>>>
>>> I can't do the below approach since I need the config to be published to
>>> all operator instances but I need keyed state for external querying.
>>>
>>> streamToBeConfigured.connect(configMessageStream)
>>> .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
>>> .flatMap(new FunctionWithConfigurableState())
>>> .addSink(...);
>>>
>>> One of the resolution I found in that mail chain was below. I can use
>>> this to solve my issue but is this still the recommended approach?
>>>
>>> stream1.connect(stream2)
>>> .map(new MergeStreamsMapFunction()) // Holds transient state
>>> of the last ConfigMessage and maps Stream1's data to a Tuple2>> ConfigMessage>
>>> .keyBy(new SomeIdKeySelector()) // KeyBy Id to allow
>>> for ValueStateDescriptors and semantically correct partitioning according
>>> to business logic
>>> .flatMap(new StatefulFlatMapFunction()) // Save latest
>>> received ConfigMessage-Value in ValueStateDescriptor here
>>> .addSink(...);
>>>
>>> Thanks,
>>> Navneeth
>>>
>>
>>
>
>


Flink on EMR

2017-09-25 Thread Navneeth Krishnan
Hello All,

I'm trying to deploy flink on AWS EMR and I'm very new to EMR. I'm running
into multiple issues and need some help.

*Issue1:*

How did others resolve this multiple bindings issue?


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1505848894978_0007/filecache/11/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/mnt/yarn/usercache/hadoop/appcache/application_1505848894978_0007/filecache/12/location-compute-1.0-SNAPSHOT-all.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]


*Issue2:*

Running the below command runs the pipeline but the task manager is
allocated with only 5GB memory instead of 8GB memory. Any reason why?
flink run -m yarn-cluster -yn 4 -yjm 2048 -ytm 8192 ./my-pipeline.jar


*Issue3:*

How to provide the checkpoint directory? By just providing this
"hdfs:///checkpoints/" will it work or should I provide any master
node host name?


*Issue 4:*

How can I get the task manager logs? Should I use log aggregation in
hadoop yarn or send it to cloud watch?


Also if there any best practices to be used while running flink on
yarn, please let me know.


Thanks a lot.


Regards,

Navneeth


Re: Queryable State

2017-09-17 Thread Navneeth Krishnan
No, it doesn't work even if I increase the timeout.

The state being fetched is a Map of data and has around 100 entries in it.
I have a single job manager and 3 task managers with 16 slots each running
on AWS EC2.

final TypeSerializer keySerializer =
TypeInformation.of(new TypeHint()
{}).createSerializer(new ExecutionConfig());
final TypeSerializer> valueSerializer =
TypeInformation.of(new TypeHint>()
{}).createSerializer(new ExecutionConfig());

final byte[] serializedKey =
KvStateRequestSerializer.serializeKeyAndNamespace(
key, keySerializer,
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);

final FiniteDuration duration = new FiniteDuration(5, TimeUnit.MINUTES);


On Fri, Sep 15, 2017 at 6:44 AM, Kostas Kloudas  wrote:

> Hi Navneeth,
>
> If you increase the timeout, everything works ok?
> I suppose from your config that you are running in standalone mode, right?
>
> Any other information about the job (e.g. code and/or size of state being
> fetched) and
> the cluster setup that can help us pin down the problem, would be
> appreciated.
>
> Thanks,
> Kostas
>
> On Sep 13, 2017, at 7:12 PM, Navneeth Krishnan 
> wrote:
>
> Hi,
>
> I am sure I have provided the right job manager details because the
> connection timeout ip is the task manager where the state is kept. I guess
> the client is able to reach the job manager and figure out where the state
> is. Also if I provide a wrong state name, I'm receiving unknown state
> exception. I couldn't find why there is a timeout and a warning message is
> logged in the job manager.
>
> On Wed, Sep 13, 2017 at 4:20 AM, Biplob Biswas 
> wrote:
>
>> Hi,
>>
>>
>> are you sure your jobmanager is running and is accessible from the
>> supplied
>> hostname and port? If you can start up the FLink UI of the job which
>> creates
>> your queryable state, it should have the details of the job manager and
>> the
>> port to be used in this queryable client job.
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>
>


Re: Broadcast Config through Connected Stream

2017-09-14 Thread Navneeth Krishnan
Hi,

Any suggestions on this could be achieved?

Thanks

On Thu, Sep 7, 2017 at 8:02 AM, Navneeth Krishnan 
wrote:

> Hi All,
>
> Any suggestions on this would really help.
>
> Thanks.
>
> On Tue, Sep 5, 2017 at 2:42 PM, Navneeth Krishnan <
> reachnavnee...@gmail.com> wrote:
>
>> Hi All,
>>
>> I looked into an earlier email about the topic broadcast config through
>> connected stream and I couldn't find the conclusion.
>>
>> I can't do the below approach since I need the config to be published to
>> all operator instances but I need keyed state for external querying.
>>
>> streamToBeConfigured.connect(configMessageStream)
>> .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
>> .flatMap(new FunctionWithConfigurableState())
>> .addSink(...);
>>
>> One of the resolution I found in that mail chain was below. I can use
>> this to solve my issue but is this still the recommended approach?
>>
>> stream1.connect(stream2)
>> .map(new MergeStreamsMapFunction()) // Holds transient state
>> of the last ConfigMessage and maps Stream1's data to a Tuple2> ConfigMessage>
>> .keyBy(new SomeIdKeySelector()) // KeyBy Id to allow
>> for ValueStateDescriptors and semantically correct partitioning according
>> to business logic
>> .flatMap(new StatefulFlatMapFunction()) // Save latest
>> received ConfigMessage-Value in ValueStateDescriptor here
>> .addSink(...);
>>
>> Thanks,
>> Navneeth
>>
>
>


Re: Queryable State

2017-09-14 Thread Navneeth Krishnan
Hi,

Any idea on how to solve this issue?

Thanks

On Wed, Sep 13, 2017 at 10:12 AM, Navneeth Krishnan <
reachnavnee...@gmail.com> wrote:

> Hi,
>
> I am sure I have provided the right job manager details because the
> connection timeout ip is the task manager where the state is kept. I guess
> the client is able to reach the job manager and figure out where the state
> is. Also if I provide a wrong state name, I'm receiving unknown state
> exception. I couldn't find why there is a timeout and a warning message is
> logged in the job manager.
>
> On Wed, Sep 13, 2017 at 4:20 AM, Biplob Biswas 
> wrote:
>
>> Hi,
>>
>>
>> are you sure your jobmanager is running and is accessible from the
>> supplied
>> hostname and port? If you can start up the FLink UI of the job which
>> creates
>> your queryable state, it should have the details of the job manager and
>> the
>> port to be used in this queryable client job.
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>


Re: Queryable State

2017-09-13 Thread Navneeth Krishnan
Hi,

I am sure I have provided the right job manager details because the
connection timeout ip is the task manager where the state is kept. I guess
the client is able to reach the job manager and figure out where the state
is. Also if I provide a wrong state name, I'm receiving unknown state
exception. I couldn't find why there is a timeout and a warning message is
logged in the job manager.

On Wed, Sep 13, 2017 at 4:20 AM, Biplob Biswas 
wrote:

> Hi,
>
>
> are you sure your jobmanager is running and is accessible from the supplied
> hostname and port? If you can start up the FLink UI of the job which
> creates
> your queryable state, it should have the details of the job manager and the
> port to be used in this queryable client job.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Queryable State

2017-09-11 Thread Navneeth Krishnan
Hi All,

Any suggestions would really be helpful. Thanks

On Sun, Sep 10, 2017 at 12:04 AM, Navneeth Krishnan <
reachnavnee...@gmail.com> wrote:

> Hi All,
>
> I'm running a streaming job on flink 1.3.2 with few queryable states.
> There are 3 task managers and a job manager. I'm getting timeout exception
> when trying to query a state and also a warning message in the job manager
> log.
>
> *Client:*
> final Configuration config = new Configuration();
>
> config.setString(JobManagerOptions.ADDRESS, jobMgrHost);
> config.setInteger(JobManagerOptions.PORT, 
> JobManagerOptions.PORT.defaultValue());
>
> final HighAvailabilityServices highAvailabilityServices =
> HighAvailabilityServicesUtils.createHighAvailabilityServices(
> config,
> Executors.newSingleThreadScheduledExecutor(),
> 
> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
>
> QueryableStateClient client = new QueryableStateClient(config, 
> highAvailabilityServices);
>
>
> *Exception:*
> Exception in thread "main" io.netty.channel.ConnectTimeoutException:
> connection timed out: /172.31.18.170:43537
> at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(
> AbstractNioChannel.java:212)
> at io.netty.util.concurrent.PromiseTask$RunnableAdapter.
> call(PromiseTask.java:38)
> at io.netty.util.concurrent.ScheduledFutureTask.run(
> ScheduledFutureTask.java:120)
> at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(
> SingleThreadEventExecutor.java:357)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
> at io.netty.util.concurrent.SingleThreadEventExecutor$2.
> run(SingleThreadEventExecutor.java:111)
> at java.lang.Thread.run(Thread.java:745)
>
> *Job Manager:*
> 2017-09-10 06:55:41,599 WARN  akka.remote.ReliableDeliverySupervisor
>- Association with remote system [akka.tcp://
> flink@127.0.0.1:64344] has failed, address is now gated for [5000] ms.
> Reason: [Disassociated]
>
> Thanks,
> Navneeth
>
>
>


Queryable State

2017-09-10 Thread Navneeth Krishnan
Hi All,

I'm running a streaming job on flink 1.3.2 with few queryable states. There
are 3 task managers and a job manager. I'm getting timeout exception when
trying to query a state and also a warning message in the job manager log.

*Client:*
final Configuration config = new Configuration();

config.setString(JobManagerOptions.ADDRESS, jobMgrHost);
config.setInteger(JobManagerOptions.PORT,
JobManagerOptions.PORT.defaultValue());

final HighAvailabilityServices highAvailabilityServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
Executors.newSingleThreadScheduledExecutor(),

HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);

QueryableStateClient client = new QueryableStateClient(config,
highAvailabilityServices);


*Exception:*
Exception in thread "main" io.netty.channel.ConnectTimeoutException:
connection timed out: /172.31.18.170:43537
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
at
io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at
io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)

*Job Manager:*
2017-09-10 06:55:41,599 WARN  akka.remote.ReliableDeliverySupervisor
 - Association with remote system [akka.tcp://
flink@127.0.0.1:64344] has failed, address is now gated for [5000] ms.
Reason: [Disassociated]

Thanks,
Navneeth


Re: State Issue

2017-09-09 Thread Navneeth Krishnan
Sorry my bad, figured out it was a change done at our end which created
different keys. Thanks.

On Fri, Sep 8, 2017 at 5:32 PM, Navneeth Krishnan 
wrote:

> Hi,
>
> I'm experiencing a wired issue where any data put into map state when
> retrieved with the same key is returning as null and hence it puts the same
> value again and again. I used rocksdb state backend but tried with Memory
> state backend too but the issue still exist.
>
> Each time when I set the key and value into MapState it creates a new map
> I couldn't access the previous value. But when I iterate over the MapState
> keys and values, I can see the same key added multiple times.
>
> Each put operation goes through the code lines marked in red.
>
> *NestedMapsStateTable.java*
>
> S get(K key, int keyGroupIndex, N namespace) {
>
>checkKeyNamespacePreconditions(key, namespace);
>
>Map> namespaceMap = getMapForKeyGroup(keyGroupIndex);
>
>
>
> * if (namespaceMap == null) {  return null;   }*
>
>Map keyedMap = namespaceMap.get(namespace);
>
>if (keyedMap == null) {
>   return null;
>}
>
>return keyedMap.get(key);
> }
>
>
> *HeapMapState.java*
>
> @Override
> public void put(UK userKey, UV userValue) {
>
>HashMap userMap = stateTable.get(currentNamespace);
>
>
>
> * if (userMap == null) {  userMap = new HashMap<>();  
> stateTable.put(currentNamespace, userMap);   }*
>
>userMap.put(userKey, userValue);
> }
>
>
> *My Code:*
>
> *open()*
>
> MapStateDescriptor testStateDescriptor = new 
> MapStateDescriptor<>("test-state",
> TypeInformation.of(new TypeHint() {}), TypeInformation.of(new 
> TypeHint() {}));
>
> testState = getRuntimeContext().getMapState(testStateDescriptor);
>
>
> *flatMap:*
>
> if(testState.contains(user)){
> *// DO Something*
> } else {
> testState.put(user, userInfo);
> }
>
>
> streamEnv.setStateBackend(new MemoryStateBackend());
>
> streamEnv.setParallelism(1);
>
>
> Thanks
>
>


State Issue

2017-09-08 Thread Navneeth Krishnan
Hi,

I'm experiencing a wired issue where any data put into map state when
retrieved with the same key is returning as null and hence it puts the same
value again and again. I used rocksdb state backend but tried with Memory
state backend too but the issue still exist.

Each time when I set the key and value into MapState it creates a new map I
couldn't access the previous value. But when I iterate over the MapState
keys and values, I can see the same key added multiple times.

Each put operation goes through the code lines marked in red.

*NestedMapsStateTable.java*

S get(K key, int keyGroupIndex, N namespace) {

   checkKeyNamespacePreconditions(key, namespace);

   Map> namespaceMap = getMapForKeyGroup(keyGroupIndex);



* if (namespaceMap == null) {  return null;   }*

   Map keyedMap = namespaceMap.get(namespace);

   if (keyedMap == null) {
  return null;
   }

   return keyedMap.get(key);
}


*HeapMapState.java*

@Override
public void put(UK userKey, UV userValue) {

   HashMap userMap = stateTable.get(currentNamespace);



* if (userMap == null) {  userMap = new HashMap<>();
stateTable.put(currentNamespace, userMap);   }*

   userMap.put(userKey, userValue);
}


*My Code:*

*open()*

MapStateDescriptor testStateDescriptor = new
MapStateDescriptor<>("test-state",
TypeInformation.of(new TypeHint() {}),
TypeInformation.of(new TypeHint() {}));

testState = getRuntimeContext().getMapState(testStateDescriptor);


*flatMap:*

if(testState.contains(user)){
*// DO Something*
} else {
testState.put(user, userInfo);
}


streamEnv.setStateBackend(new MemoryStateBackend());

streamEnv.setParallelism(1);


Thanks


Re: Broadcast Config through Connected Stream

2017-09-07 Thread Navneeth Krishnan
Hi All,

Any suggestions on this would really help.

Thanks.

On Tue, Sep 5, 2017 at 2:42 PM, Navneeth Krishnan 
wrote:

> Hi All,
>
> I looked into an earlier email about the topic broadcast config through
> connected stream and I couldn't find the conclusion.
>
> I can't do the below approach since I need the config to be published to
> all operator instances but I need keyed state for external querying.
>
> streamToBeConfigured.connect(configMessageStream)
> .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
> .flatMap(new FunctionWithConfigurableState())
> .addSink(...);
>
> One of the resolution I found in that mail chain was below. I can use this
> to solve my issue but is this still the recommended approach?
>
> stream1.connect(stream2)
> .map(new MergeStreamsMapFunction()) // Holds transient state
> of the last ConfigMessage and maps Stream1's data to a Tuple2 ConfigMessage>
> .keyBy(new SomeIdKeySelector()) // KeyBy Id to allow
> for ValueStateDescriptors and semantically correct partitioning according
> to business logic
> .flatMap(new StatefulFlatMapFunction()) // Save latest
> received ConfigMessage-Value in ValueStateDescriptor here
> .addSink(...);
>
> Thanks,
> Navneeth
>


Re: State Maintenance

2017-09-07 Thread Navneeth Krishnan
Will I be able to use both queryable MapState and union list state while
implementing the CheckpointedFunction interface? Because one of my major
requirement on that operator is to provide a queryable state and in order
to compute that state we need the common static state across all parallel
operator instances.

Thanks.

On Thu, Sep 7, 2017 at 12:44 AM, Fabian Hueske  wrote:

> Hi Navneeth,
>
> there's a lower level state interface that should address your
> requirements: OperatorStateStore.getUnionListState()
>
> This union list state is similar to the regular operator list state, but
> instead of splitting the list for recovery and giving out splits to
> operator instance, it restores the complete list on each operator instance.
> So it basically does a broadcast restore. If all operator have the same
> state, only one instance checkpoints its state and this state is restored
> to all other instances in case of a failure. This should also work with
> rescaling.
> The operator instance to checkpoint can be identified by 
> (RuntimeContext.getIndexOfThisSubtask
> == 0).
>
> The OperatorStateStore is a bit hidden. You have to implement the
> CheckpointedFunction interface. When 
> CheckpointedFunction.initializeState(FunctionInitializationContext
> context) is called context has a method getOperatorStateStore().
>
> I'd recommend to have a look at the detailed JavaDocs of all involved
> classes and methods.
>
> Hope this helps,
> Fabian
>
>
> 2017-09-05 19:35 GMT+02:00 Navneeth Krishnan :
>
>> Thanks Gordon for your response. I have around 80 parallel flatmap
>> operator instances and each instance requires 3 states. Out of which one is
>> user state in which each operator will have unique user's data and I need
>> this data to be queryable. The other two states are kind of static states
>> which are only modified when there an update message in config stream. This
>> static data could easily be around 2GB and in my previous approach I used
>> operator state where the data is retrieved inside open method across all
>> operator instances whereas checkpointed only inside one of the operator
>> instance.
>>
>> One of the issue that I have is if I change the operator parallelism how
>> would it affect the internal state?
>>
>>
>> On Tue, Sep 5, 2017 at 5:36 AM, Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Hi Navneeth,
>>>
>>> Answering your three questions separately:
>>>
>>> 1. Yes. Your MapState will be backed by RocksDB, so when removing an
>>> entry
>>> from the map state, the state will be removed from the local RocksDB as
>>> well.
>>>
>>> 2. If state classes are not POJOs, they will be serialized by Kryo,
>>> unless a
>>> custom serializer is specifically specified otherwise. You can take a
>>> look
>>> at this document on how to do that [1].
>>>
>>> 3. I might need to know more information to be able to suggest properly
>>> for
>>> this one. How are you using the "huge state values"? From what you
>>> described, it seems like you only need it on one of the parallel
>>> instances,
>>> so I'm a bit curious on what they are actually used for. Are they needed
>>> when processing your records?
>>>
>>> Cheers,
>>> Gordon
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>> dev/stream/state.html#custom-serialization-for-managed-state
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/
>>>
>>
>>
>


MapState Default Value

2017-09-06 Thread Navneeth Krishnan
Hi,

Is there a reason behind removing the default value option in
MapStateDescriptor? I was using it in the earlier version to initialize
guava cache with loader etc and in the new version by default an empty map
is returned.

Thanks


Broadcast Config through Connected Stream

2017-09-05 Thread Navneeth Krishnan
Hi All,

I looked into an earlier email about the topic broadcast config through
connected stream and I couldn't find the conclusion.

I can't do the below approach since I need the config to be published to
all operator instances but I need keyed state for external querying.

streamToBeConfigured.connect(configMessageStream)
.keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
.flatMap(new FunctionWithConfigurableState())
.addSink(...);

One of the resolution I found in that mail chain was below. I can use this
to solve my issue but is this still the recommended approach?

stream1.connect(stream2)
.map(new MergeStreamsMapFunction()) // Holds transient state of
the last ConfigMessage and maps Stream1's data to a Tuple2
.keyBy(new SomeIdKeySelector()) // KeyBy Id to allow
for ValueStateDescriptors and semantically correct partitioning according
to business logic
.flatMap(new StatefulFlatMapFunction()) // Save latest received
ConfigMessage-Value in ValueStateDescriptor here
.addSink(...);

Thanks,
Navneeth


Re: State Maintenance

2017-09-05 Thread Navneeth Krishnan
Thanks Gordon for your response. I have around 80 parallel flatmap operator
instances and each instance requires 3 states. Out of which one is user
state in which each operator will have unique user's data and I need this
data to be queryable. The other two states are kind of static states which
are only modified when there an update message in config stream. This
static data could easily be around 2GB and in my previous approach I used
operator state where the data is retrieved inside open method across all
operator instances whereas checkpointed only inside one of the operator
instance.

One of the issue that I have is if I change the operator parallelism how
would it affect the internal state?


On Tue, Sep 5, 2017 at 5:36 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Navneeth,
>
> Answering your three questions separately:
>
> 1. Yes. Your MapState will be backed by RocksDB, so when removing an entry
> from the map state, the state will be removed from the local RocksDB as
> well.
>
> 2. If state classes are not POJOs, they will be serialized by Kryo, unless
> a
> custom serializer is specifically specified otherwise. You can take a look
> at this document on how to do that [1].
>
> 3. I might need to know more information to be able to suggest properly for
> this one. How are you using the "huge state values"? From what you
> described, it seems like you only need it on one of the parallel instances,
> so I'm a bit curious on what they are actually used for. Are they needed
> when processing your records?
>
> Cheers,
> Gordon
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/stream/state.html#custom-serialization-for-managed-state
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Process Function

2017-09-05 Thread Navneeth Krishnan
Thanks a lot everyone. I have the user data ingested from kafka and it is
keyed by userid. There are around 80 parallel flatmap operator instances
after keyby and there are around few million users. The map state includes
userid as the key and some value. I guess I will try the approach that
Aljoscha has mentioned and see how it works.

On Tue, Sep 5, 2017 at 8:17 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> This is mostly correct, but you cannot register a timer in open() because
> we don't have an active key there. Only in process() and onTimer() can you
> register a timer.
>
> In your case, I would suggest to somehow clamp the timestamp to the
> nearest 2 minute (or whatever) interval or to keep an extra ValueState that
> tells you whether you already registered a timer.
>
> Best,
> Aljoscha
>
> On 5. Sep 2017, at 16:55, Kien Truong  wrote:
>
> Hi,
>
> You can register a processing time timer inside the onTimer and the open
> function to have a timer that run periodically.
>
> Pseudo-code example:
>
> ValueState lastRuntime;
>
> void open() {
>   ctx.timerService().registerProcessingTimeTimer(current.timestamp + 6);
> }
>
> void onTimer() {
>   // Run the periodic task
>   if (lastRuntime.get() + 6 == timeStamp) {
> periodicTask();
>   }
>   // Re-register the processing time timer timer
>   lastRuntime.setValue(timeStamp);  
> ctx.timerService().registerProcessingTimeTimer(current.timestamp + 6);
> }
>
> void periodicTask()
>
>
> For the second question, timer are already scoped by key, so you can keep
> a lastModified variable as a ValueState,
> then compare it to the timestamp provided by the timer to see if the
> current key should be evicted.
> Checkout the example on the ProcessFunction page.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/
> process_function.html
>
> Best regards,
> Kien
>
> On 9/5/2017 11:49 AM, Navneeth Krishnan wrote:
>
> Hi All,
>
> I have a streaming pipeline which is keyed by userid and then to a flatmap
> function. I need to clear the state after sometime and I was looking at
> process function for it.
>
> Inside the process element function if I register a timer wouldn't it
> create a timer for each incoming message?
>
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.timestamp + 6);
>
> How can I get something like a clean up task that runs every 2 mins and
> evicts all stale data? Also is there a way to get the key inside onTimer
> function so that I know which key has to be evicted?
>
> Thanks,
> Navneeth
>
>
>


Process Function

2017-09-04 Thread Navneeth Krishnan
Hi All,

I have a streaming pipeline which is keyed by userid and then to a flatmap
function. I need to clear the state after sometime and I was looking at
process function for it.

Inside the process element function if I register a timer wouldn't it
create a timer for each incoming message?

// schedule the next timer 60 seconds from the current event time
ctx.timerService().registerEventTimeTimer(current.timestamp + 6);

How can I get something like a clean up task that runs every 2 mins and
evicts all stale data? Also is there a way to get the key inside onTimer
function so that I know which key has to be evicted?

Thanks,
Navneeth


State Maintenance

2017-09-04 Thread Navneeth Krishnan
Hi All,

I have couple of questions regarding state maintenance in flink.

- I have a connected stream and then a keyby operator followed by a flatmap
function. I use MapState and keys get added by data from stream1 and
removed by messges from stream2. Stream2 acts as a control stream in my
pipeline. My question is when the keys are removed will the state in
rocksdb also be removed? How does rocks db get the most recent state?

- Can I use guava cache in MapState like MapState>? Do I have to write a serializer to persist data from guava cache?

- One of my downstream operator requires keyed state because I need to
query the state value but it also has two huge state values that are
basically the same across all parallel operator instances. Initially I used
operator state and checkpoint only in the 0th index of operator and other
instances would not checkpoint the same data. How can I achieve this in
Keyed State? Each operator will have around 10GB of same data. Not sure if
this will be a problem in future.

Thanks,
Navneeth