Hi Team,

I am trying to write files using FileIO, but currently all files fall under 
same folder.

I need to write to new folder every hour,
eg.: /output/hour-01/files.*.    -> events coming in at hour 1
       /output/hour-02/files.*.    -> events coming in at hour 2

My Code :

parquetRecord.apply("Batch Events", Window.<GenericRecord>into(
        FixedWindows.of(Duration.standardMinutes(10)))
        .triggering(AfterWatermark.pastEndOfWindow())
        .discardingFiredPanes()
        .withAllowedLateness(Duration.standardMinutes(0)))
        .apply(Distinct.create())

        .apply(FileIO.<GenericRecord>write()
                .via(ParquetIO.sink(getOutput_schema()))
                .to(outputPath + "hour=" + new DateTime().toString("HH") + 
"/").withNumShards(1)
                .withSuffix(".snappy.parquet"));


Thanks,
Julius



Reply via email to