Hello Julius,

Well I do something similar using FileIO.Write.FileNaming i,e,
https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.FileNaming.html

You can do something like following:
.apply(FileIO.<GenericRecord>*write*()
                .via(ParquetIO.*sink*(*getOutput_schema*()))
                .to(outputPath)
                .withNumShards(1)

                                   *.withNaming(new
DatePartitionedFileName(subpath))*

Where your DatePartitionedFileName will implete FileIO.Write.FileNaming and
override its getFileName method i.e. something like following:

class DatePartitionedFileName implements FileIO.Write.FileNaming {


    DatePartitionedFileName(String subpath) {


    }



      @Override

    public String getFilename(BoundedWindow window, PaneInfo pane, int
numShards, int shardIndex, Compression compression) {

        IntervalWindow intervalWindow = (IntervalWindow) window;

        return String.format(

            %s/file.parquet",

            subpath,

            some function like--> getCurrentHour());

    }

}

Hope this helps.

Regards
Mohil


On Fri, Jul 10, 2020 at 1:51 PM Almeida, Julius <[email protected]>
wrote:

> Hi Team,
>
>
>
> I am trying to write files using FileIO, but currently all files fall
> under same folder.
>
>
>
> I need to write to new folder every hour,
>
> eg.: /output/hour-01/files.*.    -> events coming in at hour 1
>
>        /output/hour-02/files.*.    -> events coming in at hour 2
>
>
>
> My Code :
>
>
>
> parquetRecord.apply(*"Batch Events"*, Window.<GenericRecord>*into*(
>         FixedWindows.*of*(Duration.*standardMinutes*(10)))
>         .triggering(AfterWatermark.*pastEndOfWindow*())
>         .discardingFiredPanes()
>         .withAllowedLateness(Duration.*standardMinutes*(0)))
>         .apply(Distinct.*create*())
>
>         .apply(FileIO.<GenericRecord>*write*()
>                 .via(ParquetIO.*sink*(*getOutput_schema*()))
>                 .to(outputPath + *"hour=" *+ *new *DateTime().toString(
> *"HH"*) + *"/"*).withNumShards(1)
>                 .withSuffix(*".snappy.parquet"*));
>
>
>
>
>
> Thanks,
>
> Julius
>
>
>
>
>
>
>

Reply via email to