Thanks Mohil. Will give it a try.
From: Mohil Khare <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Friday, July 10, 2020 at 3:43 PM
To: "[email protected]" <[email protected]>
Subject: Re: FileIO write to new folder every hour.
This email is from an external sender.
BTW, if you want current hour based on your interval window you can also do:
class DatePartitionedFileName implements FileIO.Write.FileNaming {
private static final DateTimeFormatter DATE_FORMAT =
DateTimeFormat.forPattern(“HH”); —> some java function is there
DatePartitionedFileName(String subpath) {
}
@Override
public String getFilename(BoundedWindow window, PaneInfo pane, int
numShards, int shardIndex, Compression compression) {
IntervalWindow intervalWindow = (IntervalWindow) window;
IntervalWindow intervalWindow = (IntervalWindow) window;
return String.format(
%s/file.parquet",
subpath,
DATE_FORMAT.print(intervalWindow.start());
}
}
Regards
Mohil
On Fri, Jul 10, 2020 at 3:39 PM Mohil Khare
<[email protected]<mailto:[email protected]>> wrote:
Hello Julius,
Well I do something similar using FileIO.Write.FileNaming i,e,
https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.FileNaming.html
You can do something like following:
.apply(FileIO.<GenericRecord>write()
.via(ParquetIO.sink(getOutput_schema()))
.to(outputPath)
.withNumShards(1)
.withNaming(new
DatePartitionedFileName(subpath))
Where your DatePartitionedFileName will implete FileIO.Write.FileNaming and
override its getFileName method i.e. something like following:
class DatePartitionedFileName implements FileIO.Write.FileNaming {
DatePartitionedFileName(String subpath) {
}
@Override
public String getFilename(BoundedWindow window, PaneInfo pane, int
numShards, int shardIndex, Compression compression) {
IntervalWindow intervalWindow = (IntervalWindow) window;
return String.format(
%s/file.parquet",
subpath,
some function like--> getCurrentHour());
}
}
Hope this helps.
Regards
Mohil
On Fri, Jul 10, 2020 at 1:51 PM Almeida, Julius
<[email protected]<mailto:[email protected]>> wrote:
Hi Team,
I am trying to write files using FileIO, but currently all files fall under
same folder.
I need to write to new folder every hour,
eg.: /output/hour-01/files.*. -> events coming in at hour 1
/output/hour-02/files.*. -> events coming in at hour 2
My Code :
parquetRecord.apply("Batch Events", Window.<GenericRecord>into(
FixedWindows.of(Duration.standardMinutes(10)))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(0)))
.apply(Distinct.create())
.apply(FileIO.<GenericRecord>write()
.via(ParquetIO.sink(getOutput_schema()))
.to(outputPath + "hour=" + new DateTime().toString("HH") +
"/").withNumShards(1)
.withSuffix(".snappy.parquet"));
Thanks,
Julius