Hi, I'm working for the first time with Flink and I'm trying to create
solution that will store events from Kafka into Parquet files in S3. This
also should support re-injection of events from Parquet files into a Kafka
topic.

Here <https://gist.github.com/speeddragon/18fbd570557da59d7f6a2c5822cc7ad4>
is the code with a simple usage of StreamingFileSink with BulkEncode that
will get the events and store in parquet files. The files will be partition
by account_id and year and month (yyyyMM). The issue with this approach is
when running the backfill from a certain point in time, it will be hard to
not generate duplicated events, since we will not override the same files,
as the filename is generate by "*part-<sub_task_id>-<sequencial_number>*".

To add predictability, I've used a tumbling window to aggregate multiple
GenericRecord, in order to write the parquet file with a list of them. For
that I've created a custom file sink, but I'm not sure of the properties I
am going to lose compared to the Streaming File Sink. Here
<https://gist.github.com/speeddragon/6a98805d7f4aacff729f3d60b6a57ff8> is
the code. Still, there is something missing in this solution to close a
window for with a giving timeout, so it can write into the sink the last
events if no more events are sent.

Another work around, would be create a StreamingFileSink with a RowEncoder,
and receive a List of GenericRecord, and create a custom Encoder with
*AvroParquetWritter* to write to a File. This way I have access to a custom
rolling policy. But this looks like truly inefficient. Here
<https://gist.github.com/speeddragon/ea19cb07569a52cd78fad8d4af8c9e68> is
the code.

Am I overthinking this solution ? I'm know there are some issues (recently
closed) for the StreamingFileSink to support more custom rolling policies
in BulkEncode, like https://issues.apache.org/jira/browse/FLINK-13027, but
I just notice that now.
<https://gist.github.com/speeddragon/ea19cb07569a52cd78fad8d4af8c9e68>

Reply via email to