BTW, if you want current hour based on your interval window you can also do:

class DatePartitionedFileName implements FileIO.Write.FileNaming {


    private static final DateTimeFormatter DATE_FORMAT =
DateTimeFormat.forPattern(“HH”); —> some java function is there

    DatePartitionedFileName(String subpath) {


    }



      @Override

    public String getFilename(BoundedWindow window, PaneInfo pane, int
numShards, int shardIndex, Compression compression) {

        IntervalWindow intervalWindow = (IntervalWindow) window;

        IntervalWindow intervalWindow = (IntervalWindow) window;

        return String.format(

            %s/file.parquet",

            subpath,

         *   DATE_FORMAT.print(intervalWindow.start()*);

    }

}

Regards
Mohil

On Fri, Jul 10, 2020 at 3:39 PM Mohil Khare <[email protected]> wrote:

> Hello Julius,
>
> Well I do something similar using FileIO.Write.FileNaming i,e,
> https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.FileNaming.html
>
> You can do something like following:
> .apply(FileIO.<GenericRecord>*write*()
>                 .via(ParquetIO.*sink*(*getOutput_schema*()))
>                 .to(outputPath)
>                 .withNumShards(1)
>
>                                    *.withNaming(new
> DatePartitionedFileName(subpath))*
>
> Where your DatePartitionedFileName will implete FileIO.Write.FileNaming
> and override its getFileName method i.e. something like following:
>
> class DatePartitionedFileName implements FileIO.Write.FileNaming {
>
>
>     DatePartitionedFileName(String subpath) {
>
>
>     }
>
>
>
>       @Override
>
>     public String getFilename(BoundedWindow window, PaneInfo pane, int
> numShards, int shardIndex, Compression compression) {
>
>         IntervalWindow intervalWindow = (IntervalWindow) window;
>
>         return String.format(
>
>             %s/file.parquet",
>
>             subpath,
>
>             some function like--> getCurrentHour());
>
>     }
>
> }
>
> Hope this helps.
>
> Regards
> Mohil
>
>
> On Fri, Jul 10, 2020 at 1:51 PM Almeida, Julius <[email protected]>
> wrote:
>
>> Hi Team,
>>
>>
>>
>> I am trying to write files using FileIO, but currently all files fall
>> under same folder.
>>
>>
>>
>> I need to write to new folder every hour,
>>
>> eg.: /output/hour-01/files.*.    -> events coming in at hour 1
>>
>>        /output/hour-02/files.*.    -> events coming in at hour 2
>>
>>
>>
>> My Code :
>>
>>
>>
>> parquetRecord.apply(*"Batch Events"*, Window.<GenericRecord>*into*(
>>         FixedWindows.*of*(Duration.*standardMinutes*(10)))
>>         .triggering(AfterWatermark.*pastEndOfWindow*())
>>         .discardingFiredPanes()
>>         .withAllowedLateness(Duration.*standardMinutes*(0)))
>>         .apply(Distinct.*create*())
>>
>>         .apply(FileIO.<GenericRecord>*write*()
>>                 .via(ParquetIO.*sink*(*getOutput_schema*()))
>>                 .to(outputPath + *"hour=" *+ *new *DateTime().toString(
>> *"HH"*) + *"/"*).withNumShards(1)
>>                 .withSuffix(*".snappy.parquet"*));
>>
>>
>>
>>
>>
>> Thanks,
>>
>> Julius
>>
>>
>>
>>
>>
>>
>>
>

Reply via email to