Hi David, I'm pulling in Kostas who worked on the StreamingFileSink and might be able to answer some of your questions.
Cheers, Till On Mon, Jan 13, 2020 at 2:45 PM Leonard Xu <xbjt...@gmail.com> wrote: > Hi, David > > For you first description, I’m a little confused about duplicated records > when backfilling, could you describe your usage scenario/code more? > > I remembered a backfill user solution from Pinterest which is very similar > to yours and using Flink too[1], hope that can help you. > > Best, > Leonard > > [1] > https://www.youtube.com/watch?v=3-X6FJ5JS4E&list=PLDX4T_cnKjD207Aa8b5CsZjc7Z_KRezGz&index=64 > > > 在 2020年1月10日,12:14,David Magalhães <speeddra...@gmail.com> 写道: > > Hi, I'm working for the first time with Flink and I'm trying to create > solution that will store events from Kafka into Parquet files in S3. This > also should support re-injection of events from Parquet files into a Kafka > topic. > > Here > <https://gist.github.com/speeddragon/18fbd570557da59d7f6a2c5822cc7ad4> is > the code with a simple usage of StreamingFileSink with BulkEncode that will > get the events and store in parquet files. The files will be partition by > account_id and year and month (yyyyMM). The issue with this approach is > when running the backfill from a certain point in time, it will be hard to > not generate duplicated events, since we will not override the same files, > as the filename is generate by "*part-<sub_task_id>-<sequencial_number>*". > > To add predictability, I've used a tumbling window to aggregate multiple > GenericRecord, in order to write the parquet file with a list of them. For > that I've created a custom file sink, but I'm not sure of the properties I > am going to lose compared to the Streaming File Sink. Here > <https://gist.github.com/speeddragon/6a98805d7f4aacff729f3d60b6a57ff8> is > the code. Still, there is something missing in this solution to close a > window for with a giving timeout, so it can write into the sink the last > events if no more events are sent. > > Another work around, would be create a StreamingFileSink with a > RowEncoder, and receive a List of GenericRecord, and create a custom > Encoder with *AvroParquetWritter* to write to a File. This way I have > access to a custom rolling policy. But this looks like truly inefficient. > Here > <https://gist.github.com/speeddragon/ea19cb07569a52cd78fad8d4af8c9e68> is > the code. > > Am I overthinking this solution ? I'm know there are some issues (recently > closed) for the StreamingFileSink to support more custom rolling policies > in BulkEncode, like https://issues.apache.org/jira/browse/FLINK-13027, > but I just notice that now. > <https://gist.github.com/speeddragon/ea19cb07569a52cd78fad8d4af8c9e68> > > >