Hi Team,
I am trying to write files using FileIO, but currently all files fall under
same folder.
I need to write to new folder every hour,
eg.: /output/hour-01/files.*. -> events coming in at hour 1
/output/hour-02/files.*. -> events coming in at hour 2
My Code :
parquetRecord.apply("Batch Events", Window.<GenericRecord>into(
FixedWindows.of(Duration.standardMinutes(10)))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(0)))
.apply(Distinct.create())
.apply(FileIO.<GenericRecord>write()
.via(ParquetIO.sink(getOutput_schema()))
.to(outputPath + "hour=" + new DateTime().toString("HH") +
"/").withNumShards(1)
.withSuffix(".snappy.parquet"));
Thanks,
Julius