Hello Julius,
Well I do something similar using FileIO.Write.FileNaming i,e,
https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.FileNaming.html
You can do something like following:
.apply(FileIO.<GenericRecord>*write*()
.via(ParquetIO.*sink*(*getOutput_schema*()))
.to(outputPath)
.withNumShards(1)
*.withNaming(new
DatePartitionedFileName(subpath))*
Where your DatePartitionedFileName will implete FileIO.Write.FileNaming and
override its getFileName method i.e. something like following:
class DatePartitionedFileName implements FileIO.Write.FileNaming {
DatePartitionedFileName(String subpath) {
}
@Override
public String getFilename(BoundedWindow window, PaneInfo pane, int
numShards, int shardIndex, Compression compression) {
IntervalWindow intervalWindow = (IntervalWindow) window;
return String.format(
%s/file.parquet",
subpath,
some function like--> getCurrentHour());
}
}
Hope this helps.
Regards
Mohil
On Fri, Jul 10, 2020 at 1:51 PM Almeida, Julius <[email protected]>
wrote:
> Hi Team,
>
>
>
> I am trying to write files using FileIO, but currently all files fall
> under same folder.
>
>
>
> I need to write to new folder every hour,
>
> eg.: /output/hour-01/files.*. -> events coming in at hour 1
>
> /output/hour-02/files.*. -> events coming in at hour 2
>
>
>
> My Code :
>
>
>
> parquetRecord.apply(*"Batch Events"*, Window.<GenericRecord>*into*(
> FixedWindows.*of*(Duration.*standardMinutes*(10)))
> .triggering(AfterWatermark.*pastEndOfWindow*())
> .discardingFiredPanes()
> .withAllowedLateness(Duration.*standardMinutes*(0)))
> .apply(Distinct.*create*())
>
> .apply(FileIO.<GenericRecord>*write*()
> .via(ParquetIO.*sink*(*getOutput_schema*()))
> .to(outputPath + *"hour=" *+ *new *DateTime().toString(
> *"HH"*) + *"/"*).withNumShards(1)
> .withSuffix(*".snappy.parquet"*));
>
>
>
>
>
> Thanks,
>
> Julius
>
>
>
>
>
>
>