On Fri, Jul 20, 2018 at 2:58 AM Jozef Vilcek <[email protected]> wrote:
> Hm, that is interesting idea to make the write composite and merge files > later. Do not know Beam well yet. > I will look into it and learn about Wait.on() transform (wonder how it > will work with late fires). Thanks! > > But keeps me thinking... > Does it make sense to have support from SDK? > Is my use case that uncommon? Not fit for Beam? How does others out there > does similar thing? > SDK does allow it. Looks like you are running into scaling and memory limits with amount of state stored in large windows. This is something that will improve. I am not familiar enough with Flink runner to comment on specifics. I was mainly thinking of a work around. Raghu. > > On Thu, Jul 19, 2018 at 11:21 PM Raghu Angadi <[email protected]> wrote: > >> One option (but requires more code): Write to smaller files with frequent >> triggers to directory_X and once the window properly closes, copy all the >> files to a single file in your own DoFn. This is certainly more code on >> your part, but might be worth it. You can use Wait.on() transoform to run >> your finalizer DoFn right after the window that writes smaller files closes. >> >> >> On Thu, Jul 19, 2018 at 2:43 AM Jozef Vilcek <[email protected]> >> wrote: >> >>> Hey, >>> >>> I am looking for the advice. >>> >>> I am trying to do a stream processing with Beam on Flink runtime. >>> Reading data from Kafka, doing some processing with it which is not >>> important here and in the same time want to store consumed data to history >>> storage for archive and reprocessing, which is HDFS. >>> >>> Now, the part of writing batches to HDFS is giving me hard time. >>> Logically, I want to do: >>> >>> fileIO = FileIO.writeDynamic() >>> .by(destinationFn) >>> .via(AvroIO.sink(avroClass)) >>> .to(path) >>> .withNaming(namingFn) >>> .withTempDirectory(tmp) >>> .withNumShards(shards) >>> >>> data >>> .withFixedWindow(1H, afterWatermarkTrigger, discardFiredPanes) >>> .saveTo(fileIO) >>> >>> >>> This write generates in Flink execution graph 3 operators, which I do >>> not full understand yet. >>> >>> Now, the problem is, that I am not able to run this at scale. >>> >>> If I want to write big enough files to not to have lots of files on >>> HDFS, I keep running into the OOM. With Flink, I use rocksdb state backend >>> and I was warned about this JIRA which is probably related to my OOM >>> https://issues.apache.org/jira/browse/FLINK-8297 >>> Therefore, I need to trigger more often and small batches which leads to >>> too many files on HDFS. >>> >>> Question here is, if there is some path I do not see how to make this >>> work ( write bulks of data to HDFS of my choosing without running to memory >>> troubles ). Also, keeping whole window data which is designated for write >>> to output to filesystem in state involves more IO. >>> >>> Thanks for any thoughts and guidelines, >>> Jozef >>> >>>
