Hi Team, Thanks Mohil, that seems to be working.
I would also like to control the size of file I write to, is there a way to achieve this in beam. I am currently using this, but not sure if this works. pipeline.getOptions().as(S3Options.class).setS3UploadBufferSizeBytes(134217728); Thanks, Julius From: Mohil Khare <[email protected]> Reply-To: "[email protected]" <[email protected]> Date: Friday, July 10, 2020 at 3:43 PM To: "[email protected]" <[email protected]> Subject: Re: FileIO write to new folder every hour. This email is from an external sender. BTW, if you want current hour based on your interval window you can also do: class DatePartitionedFileName implements FileIO.Write.FileNaming { private static final DateTimeFormatter DATE_FORMAT = DateTimeFormat.forPattern(“HH”); —> some java function is there DatePartitionedFileName(String subpath) { } @Override public String getFilename(BoundedWindow window, PaneInfo pane, int numShards, int shardIndex, Compression compression) { IntervalWindow intervalWindow = (IntervalWindow) window; IntervalWindow intervalWindow = (IntervalWindow) window; return String.format( %s/file.parquet", subpath, DATE_FORMAT.print(intervalWindow.start()); } } Regards Mohil On Fri, Jul 10, 2020 at 3:39 PM Mohil Khare <[email protected]<mailto:[email protected]>> wrote: Hello Julius, Well I do something similar using FileIO.Write.FileNaming i,e, https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.FileNaming.html You can do something like following: .apply(FileIO.<GenericRecord>write() .via(ParquetIO.sink(getOutput_schema())) .to(outputPath) .withNumShards(1) .withNaming(new DatePartitionedFileName(subpath)) Where your DatePartitionedFileName will implete FileIO.Write.FileNaming and override its getFileName method i.e. something like following: class DatePartitionedFileName implements FileIO.Write.FileNaming { DatePartitionedFileName(String subpath) { } @Override public String getFilename(BoundedWindow window, PaneInfo pane, int numShards, int shardIndex, Compression compression) { IntervalWindow intervalWindow = (IntervalWindow) window; return String.format( %s/file.parquet", subpath, some function like--> getCurrentHour()); } } Hope this helps. Regards Mohil On Fri, Jul 10, 2020 at 1:51 PM Almeida, Julius <[email protected]<mailto:[email protected]>> wrote: Hi Team, I am trying to write files using FileIO, but currently all files fall under same folder. I need to write to new folder every hour, eg.: /output/hour-01/files.*. -> events coming in at hour 1 /output/hour-02/files.*. -> events coming in at hour 2 My Code : parquetRecord.apply("Batch Events", Window.<GenericRecord>into( FixedWindows.of(Duration.standardMinutes(10))) .triggering(AfterWatermark.pastEndOfWindow()) .discardingFiredPanes() .withAllowedLateness(Duration.standardMinutes(0))) .apply(Distinct.create()) .apply(FileIO.<GenericRecord>write() .via(ParquetIO.sink(getOutput_schema())) .to(outputPath + "hour=" + new DateTime().toString("HH") + "/").withNumShards(1) .withSuffix(".snappy.parquet")); Thanks, Julius
