Hello Ayush,

I am interesting in knowing about your “really simple” implementation.

So assuming the streaming parquet output goes to S3 bucket: Initial 
(partitioned by event time)

Do you write another Flink batch application (step 2) which partitions the data 
from Initial in larger “event time” chunks
and writes it out to another S3 bucket?

In our case, we are getting straggling records with event times which might be 
up to 1 week old.
One approach is to simply write the batch job after 1 week, but then we lose 
the ability to query the recent data in an efficient fashion.

I would appreciate any ideas etc.

Cheers
Kumar

From: Ayush Verma <ayushver...@gmail.com>
Date: Friday, September 11, 2020 at 8:14 AM
To: Robert Metzger <rmetz...@apache.org>
Cc: Marek Maj <marekm...@gmail.com>, user <user@flink.apache.org>
Subject: Re: Streaming data to parquet

Hi,

Looking at the problem broadly, file size is directly tied up with how often 
you commit. No matter which system you use, this variable will always be there. 
If you commit frequently, you will be close to realtime, but you will have 
numerous small files. If you commit after long intervals, you will have larger 
files, but this is as good as a "batch world". We solved this problem at my 
company by having 2 systems. One to commit the files at small intervals, thus 
bringing data into durable storage reliably, and one to roll up these small 
files. It's actually really simple to implement this if you don't try to do it 
in a single job.

Best
Ayush

On Fri, Sep 11, 2020 at 2:22 PM Robert Metzger 
<rmetz...@apache.org<mailto:rmetz...@apache.org>> wrote:
Hi Marek,

what you are describing is a known problem in Flink. There are some thoughts on 
how to address this in 
https://issues.apache.org/jira/browse/FLINK-11499<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11499&data=02%7C01%7Csenthilku%40vmware.com%7C41d91d190b1b451e84b308d8565d0f21%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637354304977869315&sdata=u8QY%2FedTNZcUH2%2BYDBAadHKEgN%2BpA2QBxKqywA7xbUA%3D&reserved=0>
 and 
https://issues.apache.org/jira/browse/FLINK-17505<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-17505&data=02%7C01%7Csenthilku%40vmware.com%7C41d91d190b1b451e84b308d8565d0f21%7Cb39138ca3cee4b4aa4d6cd83d9dd62f0%7C0%7C1%7C637354304977879306&sdata=Jy%2FR4bPXjYx1bM1XMg6QDKzu61vtn291b3MchT6O7N8%3D&reserved=0>
Maybe some ideas there help you already for your current problem (use long 
checkpoint intervals).

A related idea to (2) is to write your data with the Avro format, and then 
regularly use a batch job to transform your data from Avro to Parquet.

I hope these are some helpful pointers. I don't have a good overview over other 
potential open source solutions.

Best,
Robert


On Thu, Sep 10, 2020 at 5:10 PM Marek Maj 
<marekm...@gmail.com<mailto:marekm...@gmail.com>> wrote:
Hello Flink Community,

When designing our data pipelines, we very often encounter the requirement to 
stream traffic (usually from kafka) to external distributed file system 
(usually HDFS or S3). This data is typically meant to be queried from 
hive/presto or similar tools. Preferably data sits in columnar format like 
parquet.

Currently, using flink, it is possible to leverage StreamingFileSink to achieve 
what we want to some extent. It satisfies our requirements to partition data by 
event time, ensure exactly-once semantics and fault-tolerance with 
checkpointing. Unfortunately, when using bulk writer like PaquetWriter, that 
comes with a price of producing a big number of files which degrades the 
performance of queries.

I believe that many companies struggle with similar use cases. I know that some 
of them have already approached that problem. Solutions like Alibaba Hologres 
or Netflix solution with Iceberg described during FF 2019 emerged. Given that 
full transition to real-time data warehouse may take a significant amount of 
time and effort, I would like to primarily focus on solutions for tools like 
hive/presto backed up by a distributed file system. Usually those are the 
systems that we are integrating with.

So what options do we have? Maybe I missed some existing open source tool?

Currently, I can come up with two approaches using flink exclusively:
1. Cache incoming traffic in flink state until trigger fires according to 
rolling strategy, probably with some late events special strategy and then 
output data with StreamingFileSink. Solution is not perfect as it may introduce 
additional latency and queries will still be less performant compared to fully 
compacted files (late events problem). And the biggest issue I am afraid of is 
actually a performance drop while releasing data from flink state and its peak 
character
2. Focus on implementing batch rewrite job that will compact data offline. 
Source for the job could be both kafka or small files produced by another job 
that uses plain StreamingFileSink. The drawback is that whole system gets more 
complex, additional maintenance is needed and, maybe what is more troubling, we 
enter to batch world again (how could we know that no more late data will come 
and we can safely run the job)

I would really love to hear what are community thoughts on that.

Kind regards
Marek

Reply via email to