BTW, if you want current hour based on your interval window you can also do:
class DatePartitionedFileName implements FileIO.Write.FileNaming {
private static final DateTimeFormatter DATE_FORMAT =
DateTimeFormat.forPattern(“HH”); —> some java function is there
DatePartitionedFileName(String subpath) {
}
@Override
public String getFilename(BoundedWindow window, PaneInfo pane, int
numShards, int shardIndex, Compression compression) {
IntervalWindow intervalWindow = (IntervalWindow) window;
IntervalWindow intervalWindow = (IntervalWindow) window;
return String.format(
%s/file.parquet",
subpath,
* DATE_FORMAT.print(intervalWindow.start()*);
}
}
Regards
Mohil
On Fri, Jul 10, 2020 at 3:39 PM Mohil Khare <[email protected]> wrote:
> Hello Julius,
>
> Well I do something similar using FileIO.Write.FileNaming i,e,
> https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.FileNaming.html
>
> You can do something like following:
> .apply(FileIO.<GenericRecord>*write*()
> .via(ParquetIO.*sink*(*getOutput_schema*()))
> .to(outputPath)
> .withNumShards(1)
>
> *.withNaming(new
> DatePartitionedFileName(subpath))*
>
> Where your DatePartitionedFileName will implete FileIO.Write.FileNaming
> and override its getFileName method i.e. something like following:
>
> class DatePartitionedFileName implements FileIO.Write.FileNaming {
>
>
> DatePartitionedFileName(String subpath) {
>
>
> }
>
>
>
> @Override
>
> public String getFilename(BoundedWindow window, PaneInfo pane, int
> numShards, int shardIndex, Compression compression) {
>
> IntervalWindow intervalWindow = (IntervalWindow) window;
>
> return String.format(
>
> %s/file.parquet",
>
> subpath,
>
> some function like--> getCurrentHour());
>
> }
>
> }
>
> Hope this helps.
>
> Regards
> Mohil
>
>
> On Fri, Jul 10, 2020 at 1:51 PM Almeida, Julius <[email protected]>
> wrote:
>
>> Hi Team,
>>
>>
>>
>> I am trying to write files using FileIO, but currently all files fall
>> under same folder.
>>
>>
>>
>> I need to write to new folder every hour,
>>
>> eg.: /output/hour-01/files.*. -> events coming in at hour 1
>>
>> /output/hour-02/files.*. -> events coming in at hour 2
>>
>>
>>
>> My Code :
>>
>>
>>
>> parquetRecord.apply(*"Batch Events"*, Window.<GenericRecord>*into*(
>> FixedWindows.*of*(Duration.*standardMinutes*(10)))
>> .triggering(AfterWatermark.*pastEndOfWindow*())
>> .discardingFiredPanes()
>> .withAllowedLateness(Duration.*standardMinutes*(0)))
>> .apply(Distinct.*create*())
>>
>> .apply(FileIO.<GenericRecord>*write*()
>> .via(ParquetIO.*sink*(*getOutput_schema*()))
>> .to(outputPath + *"hour=" *+ *new *DateTime().toString(
>> *"HH"*) + *"/"*).withNumShards(1)
>> .withSuffix(*".snappy.parquet"*));
>>
>>
>>
>>
>>
>> Thanks,
>>
>> Julius
>>
>>
>>
>>
>>
>>
>>
>