Re: Streaming data to parquet

2020-09-14 Thread Senthil Kumar
Arvid, Jan and Ayush,

Thanks for the ideas! -Kumar

From: Jan Lukavský 
Date: Monday, September 14, 2020 at 6:23 AM
To: "user@flink.apache.org" 
Subject: Re: Streaming data to parquet


Hi,

I'd like to mention another approach, which might not be as "flinkish", but 
removes the source of issues which arise when writing bulk files. The actual 
cause of issues here is that when creating bulk output, the most efficient 
option is to have _reversed flow of commit_. That is to say - on contrary of 
Flink's checkpoint barrier flowing from sources to sinks - the optimal 
performance in bulk case is to let the sink commit source once it finishes the 
bulk write (with whatever period). This is currently impossible to achieve with 
Flink, but what works for me the best is to use Flink sinks to write streaming 
commit log (e.g. Kafka) and then have independent processes (Kafka consumers or 
equivalent) to read output topics, pack them and push to bulk store, once the 
write is finished, the Kafka topic is committed. It requires deployment of 
additional application, but that is low overhead in deployments like k8s.

Moreover, this solves the dilemma between quick commits (for real-time data) 
and large files, because one can read data from both streaming (real real-time) 
source and do a union with batch data stored at bulk store. Both these 
techniques are implemented in [1] (disclaimer: I'm one of the core developers 
of that platform).

Jan

[1] 
https://github.com/O2-Czech-Republic/proxima-platform<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2FO2-Czech-Republic%2Fproxima-platform=02%7C01%7Csenthilku%40vmware.com%7C63bdd93d8fb9422d323d08d858a90584%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637356830252891966=8rveOaZCcqKeHHFqbaNkVhYooHQiY3oFdwnC1yGgca8%3D=0>
On 9/14/20 2:03 PM, Arvid Heise wrote:
Hi Kumar,

for late events, I have seen two approaches:

* Initial compaction every day, repeated compaction after two days, and after 1 
week.
* Using something like delta lake [1], which is a set of specially structured 
parquet files. Usually you also compact them after some time (e.g. 1 week in 
your case), but you can query them efficiently in the meantime.

However, I'm not aware of some out-of-the-box delta lake solution for Flink. 
This might be something that we could put on the community agenda if there is a 
general interest.

[1] 
https://slacker.ro/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log/<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fslacker.ro%2F2019%2F08%2F21%2Fdiving-into-delta-lake-unpacking-the-transaction-log%2F=02%7C01%7Csenthilku%40vmware.com%7C63bdd93d8fb9422d323d08d858a90584%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637356830252901961=1aYcV%2F93blapUS2ml2iHcF%2F8XVxCnJLwuV0H6VKMaRI%3D=0>

On Fri, Sep 11, 2020 at 5:16 PM Senthil Kumar 
mailto:senthi...@vmware.com>> wrote:
Hello Ayush,

I am interesting in knowing about your “really simple” implementation.

So assuming the streaming parquet output goes to S3 bucket: Initial 
(partitioned by event time)

Do you write another Flink batch application (step 2) which partitions the data 
from Initial in larger “event time” chunks
and writes it out to another S3 bucket?

In our case, we are getting straggling records with event times which might be 
up to 1 week old.
One approach is to simply write the batch job after 1 week, but then we lose 
the ability to query the recent data in an efficient fashion.

I would appreciate any ideas etc.

Cheers
Kumar

From: Ayush Verma mailto:ayushver...@gmail.com>>
Date: Friday, September 11, 2020 at 8:14 AM
To: Robert Metzger mailto:rmetz...@apache.org>>
Cc: Marek Maj mailto:marekm...@gmail.com>>, user 
mailto:user@flink.apache.org>>
Subject: Re: Streaming data to parquet

Hi,

Looking at the problem broadly, file size is directly tied up with how often 
you commit. No matter which system you use, this variable will always be there. 
If you commit frequently, you will be close to realtime, but you will have 
numerous small files. If you commit after long intervals, you will have larger 
files, but this is as good as a "batch world". We solved this problem at my 
company by having 2 systems. One to commit the files at small intervals, thus 
bringing data into durable storage reliably, and one to roll up these small 
files. It's actually really simple to implement this if you don't try to do it 
in a single job.

Best
Ayush

On Fri, Sep 11, 2020 at 2:22 PM Robert Metzger 
mailto:rmetz...@apache.org>> wrote:
Hi Marek,

what you are describing is a known problem in Flink. There are some thoughts on 
how to address this in 
https://issues.apache.org/jira/browse/FLINK-11499<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11499=02%7C01%7Csenthilku%40vmware.com%7C63bdd93d8fb9422

Re: Streaming data to parquet

2020-09-11 Thread Senthil Kumar
Hello Ayush,

I am interesting in knowing about your “really simple” implementation.

So assuming the streaming parquet output goes to S3 bucket: Initial 
(partitioned by event time)

Do you write another Flink batch application (step 2) which partitions the data 
from Initial in larger “event time” chunks
and writes it out to another S3 bucket?

In our case, we are getting straggling records with event times which might be 
up to 1 week old.
One approach is to simply write the batch job after 1 week, but then we lose 
the ability to query the recent data in an efficient fashion.

I would appreciate any ideas etc.

Cheers
Kumar

From: Ayush Verma 
Date: Friday, September 11, 2020 at 8:14 AM
To: Robert Metzger 
Cc: Marek Maj , user 
Subject: Re: Streaming data to parquet

Hi,

Looking at the problem broadly, file size is directly tied up with how often 
you commit. No matter which system you use, this variable will always be there. 
If you commit frequently, you will be close to realtime, but you will have 
numerous small files. If you commit after long intervals, you will have larger 
files, but this is as good as a "batch world". We solved this problem at my 
company by having 2 systems. One to commit the files at small intervals, thus 
bringing data into durable storage reliably, and one to roll up these small 
files. It's actually really simple to implement this if you don't try to do it 
in a single job.

Best
Ayush

On Fri, Sep 11, 2020 at 2:22 PM Robert Metzger 
mailto:rmetz...@apache.org>> wrote:
Hi Marek,

what you are describing is a known problem in Flink. There are some thoughts on 
how to address this in 
https://issues.apache.org/jira/browse/FLINK-11499
 and 
https://issues.apache.org/jira/browse/FLINK-17505
Maybe some ideas there help you already for your current problem (use long 
checkpoint intervals).

A related idea to (2) is to write your data with the Avro format, and then 
regularly use a batch job to transform your data from Avro to Parquet.

I hope these are some helpful pointers. I don't have a good overview over other 
potential open source solutions.

Best,
Robert


On Thu, Sep 10, 2020 at 5:10 PM Marek Maj 
mailto:marekm...@gmail.com>> wrote:
Hello Flink Community,

When designing our data pipelines, we very often encounter the requirement to 
stream traffic (usually from kafka) to external distributed file system 
(usually HDFS or S3). This data is typically meant to be queried from 
hive/presto or similar tools. Preferably data sits in columnar format like 
parquet.

Currently, using flink, it is possible to leverage StreamingFileSink to achieve 
what we want to some extent. It satisfies our requirements to partition data by 
event time, ensure exactly-once semantics and fault-tolerance with 
checkpointing. Unfortunately, when using bulk writer like PaquetWriter, that 
comes with a price of producing a big number of files which degrades the 
performance of queries.

I believe that many companies struggle with similar use cases. I know that some 
of them have already approached that problem. Solutions like Alibaba Hologres 
or Netflix solution with Iceberg described during FF 2019 emerged. Given that 
full transition to real-time data warehouse may take a significant amount of 
time and effort, I would like to primarily focus on solutions for tools like 
hive/presto backed up by a distributed file system. Usually those are the 
systems that we are integrating with.

So what options do we have? Maybe I missed some existing open source tool?

Currently, I can come up with two approaches using flink exclusively:
1. Cache incoming traffic in flink state until trigger fires according to 
rolling strategy, probably with some late events special strategy and then 
output data with StreamingFileSink. Solution is not perfect as it may introduce 
additional latency and queries will still be less performant compared to fully 
compacted files (late events problem). And the biggest issue I am afraid of is 
actually a performance drop while releasing data from flink state and its peak 
character
2. Focus on implementing batch rewrite job that will compact data offline. 
Source for the job could be both kafka or small files produced by another job 
that uses plain StreamingFileSink. The drawback is that whole system gets more 
complex, additional maintenance is needed and, maybe what is more troubling, 

Printing effective config for flint 1.11 cli

2020-07-24 Thread Senthil Kumar
Hello,

My understanding is that flink consumes the config from the config file as well 
as those specified via the -D  option.
I assume that the -D will override the values from the config file?

Is there a way to somehow see what the effective config is?
i.e. print all of the config values that flink is going to be invoked with?

We ran into an issue with the flink stop  command.
It was exiting (after about a minute) with 
java.util.concurrent.TimeoutException exception
at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:493)

Following the docs, I tried to issue the command with -D “client.timeout=10 
min”, but it seemed to have made no difference.
That made me wonder just what config values were actually being used.

Cheers
Kumar


Re: Age old stop vs cancel debate

2020-06-09 Thread Senthil Kumar
OK, will do and report back.

We are on 1.9.1,

1.10 will take some time __

On 6/9/20, 2:06 AM, "Kostas Kloudas"  wrote:

Hi Senthil,

From a quick look at the code, it seems that the cancel() of your
source function should be called, and the thread that it is running on
should be interrupted.

In order to pin down the problem and help us see if this is an actual
bug, could you please:
1) enable debug logging and see if you can spot some lines like this:

"Starting checkpoint (-ID) SYNC_SAVEPOINT on task X" or sth
similar with synchronous savepoint in it

and any other message afterwards with -ID in it to see if the
savepoint is completed successfully.

2) could you see if this behavior persists in the FLINK-1.10?

Thanks,
Kostas

On Tue, Jun 2, 2020 at 4:20 PM Senthil Kumar  wrote:
>
> Robert,
>
>
>
> Thank you once again! We are currently doing the “short” Thread.sleep() 
approach. Seems to be working fine.
>
>
>
> Cheers
>
> Kumar
>
>
>
> From: Robert Metzger 
> Date: Tuesday, June 2, 2020 at 2:40 AM
> To: Senthil Kumar 
> Cc: "user@flink.apache.org" 
> Subject: Re: Age old stop vs cancel debate
>
>
>
> Hi Kumar,
>
>
> this is more a Java question than a Flink question now :) If it is easily 
possible from your code, then I would regularly check the isRunning flag (by 
having short Thread.sleeps()) to have a proper cancellation behavior.
>
> If this makes your code very complicated, then you could work with 
manually interrupting your worker thread. I would only use this method if you 
are sure your code (and the libraries you are using) are properly handling 
interrupts.
>
> Sorry that I can not give you a more actionable response. It depends a 
lot on the structure of your code and the libraries you are calling into.
    >
>
>
> Best,
>
> Robert
>
>
>
>
>
> On Fri, May 29, 2020 at 10:48 PM Senthil Kumar  
wrote:
>
> Hi Robert,
>
>
>
> Would appreciate more insights please.
>
>
>
> What we are noticing: When the flink job is issued a stop command, the 
Thread.sleep is not receiving the InterruptedException
>
>
>
> It certainly receives the exception when the flink job is issued a cancel 
command.
>
>
>
> In both cases (cancel and stop) the cancel() method is getting called (to 
set the isRunning variable to false)
>
>
>
> However, given that the thread does not get interrupted in stop, we are 
not in a position to check the isRunning variable.
>
>
>
>
>
> For now, we are doing a Thread.sleep  every 5 minutes (instead of the 
normal interval which is in hours).
>
> Sleeping for 5 minutes gives us a chance to check the isRunning variable.
>
>
>
> Another approach would be to save the currentThread 
(Thread.currentThread()) before doing a Thread.sleep())
>
> and manually calling Thread.interrupt() from the cancel function.
>
>
>
> What is your recommendation?
>
>
>
> Cheers
>
> Kumar
>
>
>
>
>
> From: Robert Metzger 
> Date: Friday, May 29, 2020 at 4:38 AM
> To: Senthil Kumar 
> Cc: "user@flink.apache.org" 
> Subject: Re: Age old stop vs cancel debate
>
>
>
> Hi Kumar,
>
>
>
> They way you've implemented your custom source sounds like the right way: 
Having a "running" flag checked by the run() method and changing it in cancel().
>
> Also, it is good that you are properly handling the interrupt set by 
Flink (some people ignore InterruptedExceptions, which make it difficult 
(basically impossible) for Flink to stop the job)
>
>
>
> Best,
>
> Robert
>
>
>
>
>
> On Wed, May 27, 2020 at 7:38 PM Senthil Kumar  
wrote:
>
> We are on flink 1.9.0
>
>
>
> I have a custom SourceFunction, where I rely on isRunning set to false 
via the cancel() function to exit out of the run loop.
>
> My run loop essentially gets the data from S3, and then simply sleeps 
(Thread.sleep) for a specified time interval.
>
>
>
> When a job gets cancelled, the SourceFunction.cancel() is called, which 
sets the isRunning to false.
>
> In addition, the Thread.sleep 

Re: Stopping a job

2020-06-08 Thread Senthil Kumar
I am just stating this for completeness.

When a job is cancelled, Flink sends an Interrupt signal to the Thread running 
the Source.run method

For some reason (unknown to me), this does not happen when a Stop command is 
issued.

We ran into some minor issues because of said behavior.

From: Kostas Kloudas 
Date: Monday, June 8, 2020 at 2:35 AM
To: Arvid Heise 
Cc: M Singh , User-Flink 
Subject: Re: Stopping a job

What Arvid said is correct.
The only thing I have to add is that "stop" allows also exactly-once sinks to 
push out their buffered data to their final destination (e.g. Filesystem). In 
other words, it takes into account side-effects, so it guarantees exactly-once 
end-to-end, assuming that you are using exactly-once sources and sinks. Cancel 
with savepoint on the other hand did not necessarily and committing 
side-effects is was following a "best-effort" approach.

For more information you can check [1].

Cheers,
Kostas

[1] 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212

On Mon, Jun 8, 2020 at 10:23 AM Arvid Heise 
mailto:ar...@ververica.com>> wrote:
It was before I joined the dev team, so the following are kind of speculative:

The concept of stoppable functions never really took off as it was a bit of a 
clumsy approach. There is no fundamental difference between stopping and 
cancelling on (sub)task level. Indeed if you look in the twitter source of 1.6 
[1], cancel() and stop() are doing the exact same thing. I'd assume that this 
is probably true for all sources.

So what is the difference between cancel and stop then? It's more the way on 
how you terminate the whole DAG. On cancelling, you cancel() on all tasks more 
or less simultaneously. If you want to stop, it's more a fine-grain cancel, 
where you stop first the sources and then let the tasks close themselves when 
all upstream tasks are done. Just before closing the tasks, you also take a 
snapshot. Thus, the difference should not be visible in user code but only in 
the Flink code itself (task/checkpoint coordinator)

So for your question:
1. No, as on task level stop() and cancel() are the same thing on UDF level.
2. Yes, stop will be more graceful and creates a snapshot. [2]
3. Not that I am aware of. In the whole flink code base, there are no more (see 
javadoc). You could of course check if there are some in Bahir. But it 
shouldn't really matter. There is no huge difference between stopping and 
cancelling if you wait for a checkpoint to finish.
4. Okay you answered your second question ;) Yes cancel with savepoint = stop 
now to make it easier for new users.

[1] 
https://github.com/apache/flink/blob/release-1.6/flink-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java#L180-L190
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html

On Sun, Jun 7, 2020 at 1:04 AM M Singh 
mailto:mans2si...@yahoo.com>> wrote:

Hi Arvid:

Thanks for the links.

A few questions:

1. Is there any particular interface in 1.9+ that identifies the source as 
stoppable ?
2. Is there any distinction b/w stop and cancel  in 1.9+ ?
3. Is there any list of sources which are documented as stoppable besides the 
one listed in your SO link ?
4. In 1.9+ there is flink stop command and a flink cancel command. 
(https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#stop).
  So it appears that flink stop will take a savepoint and the call cancel, and 
cancel will just cancel the job (looks like cancel with savepoint is 

Re: Age old stop vs cancel debate

2020-06-02 Thread Senthil Kumar
Robert,

Thank you once again! We are currently doing the “short” Thread.sleep() 
approach. Seems to be working fine.

Cheers
Kumar

From: Robert Metzger 
Date: Tuesday, June 2, 2020 at 2:40 AM
To: Senthil Kumar 
Cc: "user@flink.apache.org" 
Subject: Re: Age old stop vs cancel debate

Hi Kumar,

this is more a Java question than a Flink question now :) If it is easily 
possible from your code, then I would regularly check the isRunning flag (by 
having short Thread.sleeps()) to have a proper cancellation behavior.
If this makes your code very complicated, then you could work with manually 
interrupting your worker thread. I would only use this method if you are sure 
your code (and the libraries you are using) are properly handling interrupts.
Sorry that I can not give you a more actionable response. It depends a lot on 
the structure of your code and the libraries you are calling into.

Best,
Robert


On Fri, May 29, 2020 at 10:48 PM Senthil Kumar 
mailto:senthi...@vmware.com>> wrote:
Hi Robert,

Would appreciate more insights please.

What we are noticing: When the flink job is issued a stop command, the 
Thread.sleep is not receiving the InterruptedException

It certainly receives the exception when the flink job is issued a cancel 
command.

In both cases (cancel and stop) the cancel() method is getting called (to set 
the isRunning variable to false)

However, given that the thread does not get interrupted in stop, we are not in 
a position to check the isRunning variable.


For now, we are doing a Thread.sleep  every 5 minutes (instead of the normal 
interval which is in hours).
Sleeping for 5 minutes gives us a chance to check the isRunning variable.

Another approach would be to save the currentThread (Thread.currentThread()) 
before doing a Thread.sleep())
and manually calling Thread.interrupt() from the cancel function.

What is your recommendation?

Cheers
Kumar


From: Robert Metzger mailto:rmetz...@apache.org>>
Date: Friday, May 29, 2020 at 4:38 AM
To: Senthil Kumar mailto:senthi...@vmware.com>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>
Subject: Re: Age old stop vs cancel debate

Hi Kumar,

They way you've implemented your custom source sounds like the right way: 
Having a "running" flag checked by the run() method and changing it in cancel().
Also, it is good that you are properly handling the interrupt set by Flink 
(some people ignore InterruptedExceptions, which make it difficult (basically 
impossible) for Flink to stop the job)

Best,
Robert


On Wed, May 27, 2020 at 7:38 PM Senthil Kumar 
mailto:senthi...@vmware.com>> wrote:
We are on flink 1.9.0

I have a custom SourceFunction, where I rely on isRunning set to false via the 
cancel() function to exit out of the run loop.
My run loop essentially gets the data from S3, and then simply sleeps 
(Thread.sleep) for a specified time interval.

When a job gets cancelled, the SourceFunction.cancel() is called, which sets 
the isRunning to false.
In addition, the Thread.sleep gets interrupted, a check Is made on the 
isRunning variable (set to false now) and the run loop is exited.

We noticed that when we “stop” the flink job, the Thread.sleep does not get 
interrupted.
It also appears that SoruceFunction.cancel() is not getting called (which seems 
like the correct behavior for “stop”)

My question: what’s the “right” way to exit the run() loop when the flink job 
receives a stop command?

My understanding was that there was a Stoppable interface (which got removed in 
1.9.0)

Would appreciate any insights.

Cheers
Kumar


Re: Age old stop vs cancel debate

2020-05-29 Thread Senthil Kumar
Hi Robert,

Would appreciate more insights please.

What we are noticing: When the flink job is issued a stop command, the 
Thread.sleep is not receiving the InterruptedException

It certainly receives the exception when the flink job is issued a cancel 
command.

In both cases (cancel and stop) the cancel() method is getting called (to set 
the isRunning variable to false)

However, given that the thread does not get interrupted in stop, we are not in 
a position to check the isRunning variable.


For now, we are doing a Thread.sleep  every 5 minutes (instead of the normal 
interval which is in hours).
Sleeping for 5 minutes gives us a chance to check the isRunning variable.

Another approach would be to save the currentThread (Thread.currentThread()) 
before doing a Thread.sleep())
and manually calling Thread.interrupt() from the cancel function.

What is your recommendation?

Cheers
Kumar


From: Robert Metzger 
Date: Friday, May 29, 2020 at 4:38 AM
To: Senthil Kumar 
Cc: "user@flink.apache.org" 
Subject: Re: Age old stop vs cancel debate

Hi Kumar,

They way you've implemented your custom source sounds like the right way: 
Having a "running" flag checked by the run() method and changing it in cancel().
Also, it is good that you are properly handling the interrupt set by Flink 
(some people ignore InterruptedExceptions, which make it difficult (basically 
impossible) for Flink to stop the job)

Best,
Robert


On Wed, May 27, 2020 at 7:38 PM Senthil Kumar 
mailto:senthi...@vmware.com>> wrote:
We are on flink 1.9.0

I have a custom SourceFunction, where I rely on isRunning set to false via the 
cancel() function to exit out of the run loop.
My run loop essentially gets the data from S3, and then simply sleeps 
(Thread.sleep) for a specified time interval.

When a job gets cancelled, the SourceFunction.cancel() is called, which sets 
the isRunning to false.
In addition, the Thread.sleep gets interrupted, a check Is made on the 
isRunning variable (set to false now) and the run loop is exited.

We noticed that when we “stop” the flink job, the Thread.sleep does not get 
interrupted.
It also appears that SoruceFunction.cancel() is not getting called (which seems 
like the correct behavior for “stop”)

My question: what’s the “right” way to exit the run() loop when the flink job 
receives a stop command?

My understanding was that there was a Stoppable interface (which got removed in 
1.9.0)

Would appreciate any insights.

Cheers
Kumar


Age old stop vs cancel debate

2020-05-27 Thread Senthil Kumar
We are on flink 1.9.0

I have a custom SourceFunction, where I rely on isRunning set to false via the 
cancel() function to exit out of the run loop.
My run loop essentially gets the data from S3, and then simply sleeps 
(Thread.sleep) for a specified time interval.

When a job gets cancelled, the SourceFunction.cancel() is called, which sets 
the isRunning to false.
In addition, the Thread.sleep gets interrupted, a check Is made on the 
isRunning variable (set to false now) and the run loop is exited.

We noticed that when we “stop” the flink job, the Thread.sleep does not get 
interrupted.
It also appears that SoruceFunction.cancel() is not getting called (which seems 
like the correct behavior for “stop”)

My question: what’s the “right” way to exit the run() loop when the flink job 
receives a stop command?

My understanding was that there was a Stoppable interface (which got removed in 
1.9.0)

Would appreciate any insights.

Cheers
Kumar


Re: Flink Streaming Job Tuning help

2020-05-13 Thread Senthil Kumar
Zhijiang,

Thanks for your suggestions. We will keep it in mind!

Kumar

From: Zhijiang 
Reply-To: Zhijiang 
Date: Tuesday, May 12, 2020 at 10:10 PM
To: Senthil Kumar , "user@flink.apache.org" 

Subject: Re: Flink Streaming Job Tuning help

Hi Kumar,


I can give some general ideas for further analysis.

> We are finding that flink lags seriously behind when we introduce the keyBy 
> (presumably because of shuffle across the network)
The `keyBy` would break the chained operators, so it might bring obvious 
performance sensitive in practice. I guess if your previous way without keyBy 
can make use of chained mechanism,
the follow-up operator can consume the emitted records from the preceding 
operator directly, no need to involve in buffer serialization-> network shuffle 
-> buffer deserializer processes,
especially your record size 10K is a bit large.

If the keyBy is necessary in your case, then you can further check the current 
bottleneck. E.g. whether there are back pressure which you can monitor from web 
UI. If so, which task is the
bottleneck to cause the back pressure, and you can trace it by network related 
metrics.

Whether there are data skew in your case, that means some task would process 
more records than others. If so, maybe we can increase the parallelism to 
balance the load.

Best,
Zhijiang
--
From:Senthil Kumar 
Send Time:2020年5月13日(星期三) 00:49
To:user@flink.apache.org 
Subject:Re: Flink Streaming Job Tuning help

I forgot to mention, we are consuming said records from AWS kinesis and writing 
out to S3.

From: Senthil Kumar 
Date: Tuesday, May 12, 2020 at 10:47 AM
To: "user@flink.apache.org" 
Subject: Flink Streaming Job Tuning help

Hello Flink Community!

We have a fairly intensive flink streaming application, processing 8-9 million 
records a minute, with each record being 10k.
One of our steps is a keyBy operation. We are finding that flink lags seriously 
behind when we introduce the keyBy (presumably because of shuffle across the 
network).

We are trying to tune it ourselves (size of nodes, memory, network buffers 
etc), but before we spend way too much time on
this; would it be better to hire some “flink tuning expert” to get us through?

If so what resources are recommended on this list?

Cheers
Kumar



Re: Flink Streaming Job Tuning help

2020-05-12 Thread Senthil Kumar
I forgot to mention, we are consuming said records from AWS kinesis and writing 
out to S3.

From: Senthil Kumar 
Date: Tuesday, May 12, 2020 at 10:47 AM
To: "user@flink.apache.org" 
Subject: Flink Streaming Job Tuning help

Hello Flink Community!

We have a fairly intensive flink streaming application, processing 8-9 million 
records a minute, with each record being 10k.
One of our steps is a keyBy operation. We are finding that flink lags seriously 
behind when we introduce the keyBy (presumably because of shuffle across the 
network).

We are trying to tune it ourselves (size of nodes, memory, network buffers 
etc), but before we spend way too much time on
this; would it be better to hire some “flink tuning expert” to get us through?

If so what resources are recommended on this list?

Cheers
Kumar


Flink Streaming Job Tuning help

2020-05-12 Thread Senthil Kumar
Hello Flink Community!

We have a fairly intensive flink streaming application, processing 8-9 million 
records a minute, with each record being 10k.
One of our steps is a keyBy operation. We are finding that flink lags seriously 
behind when we introduce the keyBy (presumably because of shuffle across the 
network).

We are trying to tune it ourselves (size of nodes, memory, network buffers 
etc), but before we spend way too much time on
this; would it be better to hire some “flink tuning expert” to get us through?

If so what resources are recommended on this list?

Cheers
Kumar


Re: Correctly implementing of SourceFunction.run()

2020-05-08 Thread Senthil Kumar
OK, thank you. Much appreciated.

Yes, I don’t want the job to fail. The source has very little data that is 
being pumped into a Broadcast stream.

From: Robert Metzger 
Date: Friday, May 8, 2020 at 9:51 AM
To: Jingsong Li 
Cc: Senthil Kumar , "user@flink.apache.org" 

Subject: Re: Correctly implementing of SourceFunction.run()

Hey Kumar,

if you are swallowing any and all exceptions, your Flink job will not fail 
because of issues arising from your custom source. It might make sense to stop 
the source if you are catching an InterruptedException.

Throwing exceptions out of the run method basically signals the Flink framework 
that the source has failed, and thus the job will fail / go into recovery.
The way you are using the cancel() method + isRunning variable is correct for 
having a proper cancellation behavior of the source.



On Fri, May 8, 2020 at 3:31 AM Jingsong Li 
mailto:jingsongl...@gmail.com>> wrote:
Hi,

Some suggestions from my side:
- synchronized (checkpointLock) to some work and ctx.collect?
- Put Thread.sleep(interval) out of try catch? Maybe should not swallow 
interrupt exception (Like cancel the job).

Best,
Jingsong Lee

On Fri, May 8, 2020 at 2:52 AM Senthil Kumar 
mailto:senthi...@vmware.com>> wrote:
I am implementing a source function which periodically wakes up and consumes 
data from S3.


My currently implementation is like so.

Following: 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction

Is it safe to simply swallow any and all exceptions in the run method and just 
rely on this.isRunning variable to quit the run() method?

Cheers
Kumar

---


@Override
public void cancel() {
this.isRunning = false;   // Set volatile state variable, initially set to 
true on Class
}

@Override
public void run(SourceFunction.SourceContext ctx) {
while (this.isRunning) {
try {
OUT out = /* Do some work */
ctx.collect(out);
Thread.sleep(this.sleepIntervalHours * 60 * 60 * 1000); // Hours to 
milli seconds
} catch(Throwable t) {
// Simply swallow
}
}
}



--
Best, Jingsong Lee


Correctly implementing of SourceFunction.run()

2020-05-07 Thread Senthil Kumar
I am implementing a source function which periodically wakes up and consumes 
data from S3.


My currently implementation is like so.

Following: 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction

Is it safe to simply swallow any and all exceptions in the run method and just 
rely on this.isRunning variable to quit the run() method?

Cheers
Kumar

---


@Override
public void cancel() {
this.isRunning = false;   // Set volatile state variable, initially set to 
true on Class
}

@Override
public void run(SourceFunction.SourceContext ctx) {
while (this.isRunning) {
try {
OUT out = /* Do some work */
ctx.collect(out);

Thread.sleep(this.sleepIntervalHours * 60 * 60 * 1000); // Hours to 
milli seconds
} catch(Throwable t) {
// Simply swallow
}
}
}



Updating Closure Variables

2020-04-27 Thread Senthil Kumar
Hello Flink Community!

We have a flink streaming application with a particular use case where a 
closure variable Set is used in a filter function.

Currently, the variable is set at startup time.

It’s populated from an S3 location, where several files exist (we consume the 
one with the last updated timestamp).

Is it possible to periodically update (say once every 24 hours) this closure 
variable?

My initial research indicates that we cannot update closure variables and 
expect them to show up at the workers.

There seems to be something called BrodcastStream in Flink. 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Is that the right approach? I would like some kind of a confirmation before I 
go deeper into it.

cheers
Kumar


Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

2020-01-24 Thread Senthil Kumar
Thanks, here’s the debug output. It looks like we need to setup hdfs-config 
file in the flink config.
Could you advise us further?

--


2020-01-23 22:07:44,014 DEBUG org.apache.flink.core.fs.FileSystem   
- Loading extension file systems via services

2020-01-23 22:07:44,016 DEBUG org.apache.flink.core.fs.FileSystem   
- Added file system 
maprfs:org.apache.flink.runtime.fs.maprfs.MapRFsFactory

2020-01-23 22:07:44,045 DEBUG org.apache.flink.runtime.util.HadoopUtils 
- Cannot find hdfs-default configuration-file path in Flink config.

2020-01-23 22:07:44,045 DEBUG org.apache.flink.runtime.util.HadoopUtils 
- Cannot find hdfs-site configuration-file path in Flink config.


From: Aaron Langford 
Date: Thursday, January 23, 2020 at 12:22 PM
To: Senthil Kumar 
Cc: Yang Wang , "user@flink.apache.org" 

Subject: Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

When creating your cluster, you can provide configurations that EMR will find 
the right home for. Example for the aws cli:

aws emr create-cluster ... --configurations '[{
"Classification": "flink-log4j",
"Properties": {
  "log4j.rootLogger": "DEBUG,file"
}
  },{
"Classification": "flink-log4j-yarn-session",
"Properties": {
  "log4j.rootLogger": "DEBUG,stdout"
  }]'

If you can't take down your existing EMR cluster for some reason, you can ask 
AWS to modify these configurations for you on the cluster. They should take 
effect when you start a new Flink job (new job manager as well as a new job in 
that job manager). It is my understanding that configuration changes require a 
restart of a flink jobmanager + topology in order to take effect. Here's an 
example of how to modify an existing cluster (I just threw this together, so 
beware malformed JSON):

aws emr modify-instance-groups --cli-input-json '{
"ClusterId": "",
"InstanceGroups": [{
"InstanceGroupId": "",
"Configurations": [{
"Classification": "flink-log4j",
"Properties": {
"log4j.rootLogger": "DEBUG,file"
}
},{
"Classification": "flink-log4j-yarn-session",
"Properties": {
"log4j.rootLogger": "DEBUG,stdout"
}
}]
},{
"InstanceGroupId": "",
"Configurations": [{
"Classification": "flink-log4j",
    "Properties": {
"log4j.rootLogger": "DEBUG,file"
}
},{
"Classification": "flink-log4j-yarn-session",
"Properties": {
"log4j.rootLogger": "DEBUG,stdout"
}
}]
 }]
}'

On Thu, Jan 23, 2020 at 11:03 AM Senthil Kumar 
mailto:senthi...@vmware.com>> wrote:
Could you tell us how to turn on debug level logs?

We attempted this (on driver)

sudo stop hadoop-yarn-resourcemanager

followed the instructions here
https://stackoverflow.com/questions/27853974/how-to-set-debug-log-level-for-resourcemanager<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F27853974%2Fhow-to-set-debug-log-level-for-resourcemanager=02%7C01%7Csenthilku%40vmware.com%7Cc967d9972dc4448eab4e08d7a039a09f%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C0%7C637154041690780390=VqB7Aeb7dNJSFBgePjKeHzigxdBSzPykFZ4YqFexb1I%3D=0>

and

sudo start hadoop-yarn-resourcemanager

but we still don’t see any debug level logs

Any further info is much appreciated!


From: Aaron Langford 
mailto:aaron.langfor...@gmail.com>>
Date: Tuesday, January 21, 2020 at 10:54 AM
To: Senthil Kumar mailto:senthi...@vmware.com>>
Cc: Yang Wang mailto:danrtsey...@gmail.com>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>
Subject: Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

Senthil,

One of the key steps in debugging this for me was enabling debug level logs on 
my cluster, and then looking at the logs in the resource manager. The failure 
you are after happens before the exceptions you have reported here. When your 
Flink application is starting, it will attempt to load various file system 
implementations. You can see which ones it successfully loaded when you have 
the debug level of logs configured. You will have to do some digging, but this 
is a good place to start. Try to discover if your application is indeed loading 
the s3 file system, or if that is not happening. You should be able to find the 
file system implem

Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

2020-01-23 Thread Senthil Kumar
Could you tell us how to turn on debug level logs?

We attempted this (on driver)

sudo stop hadoop-yarn-resourcemanager

followed the instructions here
https://stackoverflow.com/questions/27853974/how-to-set-debug-log-level-for-resourcemanager

and

sudo start hadoop-yarn-resourcemanager

but we still don’t see any debug level logs

Any further info is much appreciated!


From: Aaron Langford 
Date: Tuesday, January 21, 2020 at 10:54 AM
To: Senthil Kumar 
Cc: Yang Wang , "user@flink.apache.org" 

Subject: Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

Senthil,

One of the key steps in debugging this for me was enabling debug level logs on 
my cluster, and then looking at the logs in the resource manager. The failure 
you are after happens before the exceptions you have reported here. When your 
Flink application is starting, it will attempt to load various file system 
implementations. You can see which ones it successfully loaded when you have 
the debug level of logs configured. You will have to do some digging, but this 
is a good place to start. Try to discover if your application is indeed loading 
the s3 file system, or if that is not happening. You should be able to find the 
file system implementations that were loaded by searching for the string "Added 
file system".

Also, do you mind sharing the bootstrap action script that you are using to get 
the s3 file system in place?

Aaron

On Tue, Jan 21, 2020 at 8:39 AM Senthil Kumar 
mailto:senthi...@vmware.com>> wrote:
Yang, I appreciate your help! Please let me know if I can provide with any 
other info.

I resubmitted my executable jar file as a step to the flink EMR and here’s are 
all the  exceptions. I see two of them.

I fished them out of /var/log/Hadoop//syslog


2020-01-21 16:31:37,587 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask (Split Reader: Custom File 
Source -> Sink: Unnamed (11/16)): Error during di

sposal of stream operator.

java.lang.NullPointerException

at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.dispose(ContinuousFileReaderOperator.java:165)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:582)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

at java.lang.Thread.run(Thread.java:748)



2020-01-21 16:31:37,591 INFO org.apache.flink.runtime.taskmanager.Task (Split 
Reader: Custom File Source -> Sink: Unnamed (8/16)): Split Reader: Custom File 
Source -> Sink: Unnamed (8/16) (865a5a078c3e40cbe2583afeaa0c601e) switched from 
RUNNING to FAILED.

java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only 
supported for HDFS and for Hadoop version 2.7 or newer

at 
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57)

at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)

at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)

at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)

at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)

at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

at java.lang.Thread.run(Thread.java:748)


From: Yang Wang mailto:danrtsey...@gmail.com>>
Date: Saturday, January 18, 2020 at 7:58 PM
To: Senthil Kumar mailto:senthi...@vmware.com>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>
Subject: Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

I think this exception is not because the hadoop version isn't h

Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

2020-01-21 Thread Senthil Kumar
Yang, I appreciate your help! Please let me know if I can provide with any 
other info.

I resubmitted my executable jar file as a step to the flink EMR and here’s are 
all the  exceptions. I see two of them.

I fished them out of /var/log/Hadoop//syslog


2020-01-21 16:31:37,587 ERROR 
org.apache.flink.streaming.runtime.tasks.StreamTask (Split Reader: Custom File 
Source -> Sink: Unnamed (11/16)): Error during di

sposal of stream operator.

java.lang.NullPointerException

at 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.dispose(ContinuousFileReaderOperator.java:165)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:582)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

at java.lang.Thread.run(Thread.java:748)



2020-01-21 16:31:37,591 INFO org.apache.flink.runtime.taskmanager.Task (Split 
Reader: Custom File Source -> Sink: Unnamed (8/16)): Split Reader: Custom File 
Source -> Sink: Unnamed (8/16) (865a5a078c3e40cbe2583afeaa0c601e) switched from 
RUNNING to FAILED.

java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only 
supported for HDFS and for Hadoop version 2.7 or newer

at 
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57)

at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)

at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:112)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)

at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)

at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)

at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)

at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:281)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

at java.lang.Thread.run(Thread.java:748)


From: Yang Wang 
Date: Saturday, January 18, 2020 at 7:58 PM
To: Senthil Kumar 
Cc: "user@flink.apache.org" 
Subject: Re: Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

I think this exception is not because the hadoop version isn't high enough.
It seems that the "s3" URI scheme could not be recognized by 
`S3FileSystemFactory`. So it fallbacks to
the `HadoopFsFactory`.

Could you share the debug level jobmanager/taskmanger logs so that we could 
confirm whether the
classpath and FileSystem are loaded correctly.



Best,
Yang

Senthil Kumar mailto:senthi...@vmware.com>> 于2020年1月17日周五 
下午10:57写道:

Hello all,



Newbie here!



We are running in Amazon EMR with the following installed in the EMR Software 
Configuration

Hadoop 2.8.5

JupyterHub 1.0.0

Ganglia 3.7.2

Hive 2.3.6

Flink 1.9.0



I am trying to get a Streaming job from one S3 bucket into an another S3 bucket 
using the StreamingFileSink



I got the infamous exception:

Caused by: java.lang.UnsupportedOperationException: Recoverable writers on 
Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer



According to this, I needed to install flink-s3-fs-hadoop-1.9.0.jar in 
/usr/lib/flink/lib

https://stackoverflow.com/questions/55517566/amazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fstackoverflow.com%2Fquestions%2F55517566%2Famazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov=02%7C01%7Csenthilku%40vmware.com%7Cabf42a493619411f2b8b08d79c8b7e76%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637149995250956538=bHwqfmuyzvc8DZuLRs4FZ4Cil%2Fbd7yaIEerD%2FTKe5eo%3D=0>



That did not work.



Further googling, revealed for Flink 1.9.0 and above:  (according to this)

https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/<https://nam04.safelinks.protection.

Location of flink-s3-fs-hadoop plugin (Flink 1.9.0 on EMR)

2020-01-17 Thread Senthil Kumar
Hello all,



Newbie here!



We are running in Amazon EMR with the following installed in the EMR Software 
Configuration

Hadoop 2.8.5

JupyterHub 1.0.0

Ganglia 3.7.2

Hive 2.3.6

Flink 1.9.0



I am trying to get a Streaming job from one S3 bucket into an another S3 bucket 
using the StreamingFileSink



I got the infamous exception:

Caused by: java.lang.UnsupportedOperationException: Recoverable writers on 
Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer



According to this, I needed to install flink-s3-fs-hadoop-1.9.0.jar in 
/usr/lib/flink/lib

https://stackoverflow.com/questions/55517566/amazon-emr-while-submitting-job-for-apache-flink-getting-error-with-hadoop-recov



That did not work.



Further googling, revealed for Flink 1.9.0 and above:  (according to this)

https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/



it seems that I need to install the jar file in the plugins directory 
(/usr/lib/flink/plugins/s3-fs-hadoop)



That did not work either.



At this point, I am not sure what to do and would appreciate some help!



Cheers

Kumar