One option (but requires more code): Write to smaller files with frequent
triggers to directory_X and once the window properly closes, copy all the
files to a single file in your own DoFn. This is certainly more code on
your part, but might be worth it. You can use Wait.on() transoform to run
your finalizer DoFn right after the window that writes smaller files closes.


On Thu, Jul 19, 2018 at 2:43 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote:

> Hey,
>
> I am looking for the advice.
>
> I am trying to do a stream processing with Beam on Flink runtime. Reading
> data from Kafka, doing some processing with it which is not important here
> and in the same time want to store consumed data to history storage for
> archive and reprocessing, which is HDFS.
>
> Now, the part of writing batches to HDFS is giving me hard time.
> Logically, I want to do:
>
> fileIO = FileIO.writeDynamic()
>         .by(destinationFn)
>         .via(AvroIO.sink(avroClass))
>         .to(path)
>         .withNaming(namingFn)
>         .withTempDirectory(tmp)
>         .withNumShards(shards)
>
> data
>    .withFixedWindow(1H, afterWatermarkTrigger, discardFiredPanes)
>    .saveTo(fileIO)
>
>
> This write generates in Flink execution graph 3 operators, which I do not
> full understand yet.
>
> Now, the problem is, that I am not able to run this at scale.
>
> If I want to write big enough files to not to have lots of files on HDFS,
> I keep running into the OOM. With Flink, I use rocksdb state backend and I
> was warned about this JIRA which is probably related to my OOM
> https://issues.apache.org/jira/browse/FLINK-8297
> Therefore, I need to trigger more often and small batches which leads to
> too many files on HDFS.
>
> Question here is, if there is some path I do not see how to make this work
> ( write bulks of data to HDFS of my choosing without running to memory
> troubles ). Also, keeping whole window data which is designated for write
> to output to filesystem in state involves more IO.
>
> Thanks for any thoughts and guidelines,
> Jozef
>
>

Reply via email to