On Tue, Jul 14, 2020 at 10:38 PM Almeida, Julius <[email protected]> wrote:
> Hi Team, > > > > Thanks Mohil, that seems to be working. > > > > I would also like to control the size of file I write to, is there a way > to achieve this in beam. > > > > I am currently using this, but not sure if this works. > > > > pipeline.getOptions().as(S3Options.*class*).setS3UploadBufferSizeBytes( > 134217728); > You cannot control the exact size of files since this depends on the way elements are assigned to windows and bundles and the way runner splits the source that precedes your sink. But you can control the number of shards per window - https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1178 Note that this might have performance implications. Thanks, Cham > > Thanks, > > Julius > > > > *From: *Mohil Khare <[email protected]> > *Reply-To: *"[email protected]" <[email protected]> > *Date: *Friday, July 10, 2020 at 3:43 PM > *To: *"[email protected]" <[email protected]> > *Subject: *Re: FileIO write to new folder every hour. > > > > This email is from an external sender. > > > > BTW, if you want current hour based on your interval window you can also > do: > > > > class DatePartitionedFileName implements FileIO.Write.FileNaming { > > > > private static final DateTimeFormatter DATE_FORMAT = > DateTimeFormat.forPattern(“HH”); —> some java function is there > > DatePartitionedFileName(String subpath) { > > > > } > > > > > > @Override > > public String getFilename(BoundedWindow window, PaneInfo pane, int > numShards, int shardIndex, Compression compression) { > > IntervalWindow intervalWindow = (IntervalWindow) window; > > IntervalWindow intervalWindow = (IntervalWindow) window; > > return String.format( > > %s/file.parquet", > > subpath, > > * DATE_FORMAT.print(intervalWindow.start()*); > > } > > } > > > > Regards > > Mohil > > > > On Fri, Jul 10, 2020 at 3:39 PM Mohil Khare <[email protected]> wrote: > > Hello Julius, > > > > Well I do something similar using FileIO.Write.FileNaming i,e, > https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.FileNaming.html > > > > You can do something like following: > > .apply(FileIO.<GenericRecord>*write*() > .via(ParquetIO.*sink*(*getOutput_schema*())) > .to(outputPath) > > .withNumShards(1) > > * .withNaming(new > DatePartitionedFileName(subpath))* > > > > Where your DatePartitionedFileName will implete FileIO.Write.FileNaming > and override its getFileName method i.e. something like following: > > > > class DatePartitionedFileName implements FileIO.Write.FileNaming { > > > > DatePartitionedFileName(String subpath) { > > > > } > > > > > > @Override > > public String getFilename(BoundedWindow window, PaneInfo pane, int > numShards, int shardIndex, Compression compression) { > > IntervalWindow intervalWindow = (IntervalWindow) window; > > return String.format( > > %s/file.parquet", > > subpath, > > some function like--> getCurrentHour()); > > } > > } > > > > Hope this helps. > > > > Regards > > Mohil > > > > > > On Fri, Jul 10, 2020 at 1:51 PM Almeida, Julius <[email protected]> > wrote: > > Hi Team, > > > > I am trying to write files using FileIO, but currently all files fall > under same folder. > > > > I need to write to new folder every hour, > > eg.: /output/hour-01/files.*. -> events coming in at hour 1 > > /output/hour-02/files.*. -> events coming in at hour 2 > > > > My Code : > > > > parquetRecord.apply(*"Batch Events"*, Window.<GenericRecord>*into*( > FixedWindows.*of*(Duration.*standardMinutes*(10))) > .triggering(AfterWatermark.*pastEndOfWindow*()) > .discardingFiredPanes() > .withAllowedLateness(Duration.*standardMinutes*(0))) > .apply(Distinct.*create*()) > > .apply(FileIO.<GenericRecord>*write*() > .via(ParquetIO.*sink*(*getOutput_schema*())) > .to(outputPath + *"hour=" *+ *new *DateTime().toString( > *"HH"*) + *"/"*).withNumShards(1) > .withSuffix(*".snappy.parquet"*)); > > > > > > Thanks, > > Julius > > > > > > > >
