Hi Team,

Thanks Mohil, that seems to be working.

I would also like to control the size of file I write to, is there a way to 
achieve this in beam.

I am currently using this, but not sure if this works.

pipeline.getOptions().as(S3Options.class).setS3UploadBufferSizeBytes(134217728);

Thanks,
Julius

From: Mohil Khare <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Friday, July 10, 2020 at 3:43 PM
To: "[email protected]" <[email protected]>
Subject: Re: FileIO write to new folder every hour.

This email is from an external sender.

BTW, if you want current hour based on your interval window you can also do:


class DatePartitionedFileName implements FileIO.Write.FileNaming {



    private static final DateTimeFormatter DATE_FORMAT = 
DateTimeFormat.forPattern(“HH”); —> some java function is there

    DatePartitionedFileName(String subpath) {



    }





      @Override

    public String getFilename(BoundedWindow window, PaneInfo pane, int 
numShards, int shardIndex, Compression compression) {

        IntervalWindow intervalWindow = (IntervalWindow) window;

        IntervalWindow intervalWindow = (IntervalWindow) window;

        return String.format(

            %s/file.parquet",

            subpath,

            DATE_FORMAT.print(intervalWindow.start());

    }

}

Regards
Mohil

On Fri, Jul 10, 2020 at 3:39 PM Mohil Khare 
<[email protected]<mailto:[email protected]>> wrote:
Hello Julius,

Well I do something similar using FileIO.Write.FileNaming i,e, 
https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.FileNaming.html

You can do something like following:
.apply(FileIO.<GenericRecord>write()
                .via(ParquetIO.sink(getOutput_schema()))
                .to(outputPath)
                .withNumShards(1)

                                   .withNaming(new 
DatePartitionedFileName(subpath))

Where your DatePartitionedFileName will implete FileIO.Write.FileNaming and 
override its getFileName method i.e. something like following:


class DatePartitionedFileName implements FileIO.Write.FileNaming {



    DatePartitionedFileName(String subpath) {



    }





      @Override

    public String getFilename(BoundedWindow window, PaneInfo pane, int 
numShards, int shardIndex, Compression compression) {

        IntervalWindow intervalWindow = (IntervalWindow) window;

        return String.format(

            %s/file.parquet",

            subpath,

            some function like--> getCurrentHour());

    }

}

Hope this helps.

Regards
Mohil


On Fri, Jul 10, 2020 at 1:51 PM Almeida, Julius 
<[email protected]<mailto:[email protected]>> wrote:
Hi Team,

I am trying to write files using FileIO, but currently all files fall under 
same folder.

I need to write to new folder every hour,
eg.: /output/hour-01/files.*.    -> events coming in at hour 1
       /output/hour-02/files.*.    -> events coming in at hour 2

My Code :

parquetRecord.apply("Batch Events", Window.<GenericRecord>into(
        FixedWindows.of(Duration.standardMinutes(10)))
        .triggering(AfterWatermark.pastEndOfWindow())
        .discardingFiredPanes()
        .withAllowedLateness(Duration.standardMinutes(0)))
        .apply(Distinct.create())

        .apply(FileIO.<GenericRecord>write()
                .via(ParquetIO.sink(getOutput_schema()))
                .to(outputPath + "hour=" + new DateTime().toString("HH") + 
"/").withNumShards(1)
                .withSuffix(".snappy.parquet"));


Thanks,
Julius



Reply via email to