Hi Ankur,
FileIO.write() with FileNaming is the right combination. It should be
something like:
.apply(FileIO.write().via(...).to("gs://bucket-name/prefixdir").withNumShards(1).withNaming((window,
pane, numShardsIgnored, shardIndexIgnored, compressionIgnored) ->
...construct a string like "$year/$month/$day/$hour/filename-$pane.pb"...))

Please note that you may have data for a given date/time arrive multiple
times. FileIO can not append to existing files (and files on GCS can not be
appended to anyway). That means that your filename MUST include the pane
info (hence the "-pane" in my example above).

On Wed, Apr 25, 2018 at 5:02 PM Ankur Chauhan <[email protected]> wrote:

> Hi all,
>
> I am working through a use case where I would like to write events from
> PubSubIO to GCS. The events are protobuf events so I used a custom
> FileIO.Sink which is defined as:
>
>     // delimited writer
>     static class ProtobufFileIOSink<T extends Message> implements 
> FileIO.Sink<T> {
>         @Nullable private transient CodedOutputStream cos;
>
>         @Override
>         public void open(WritableByteChannel channel) {
>             this.cos = 
> CodedOutputStream.newInstance(Channels.newOutputStream(channel));
>         }
>
>         @Override
>         public void write(Message element) throws IOException {
>             if (element == null) {
>                 return;
>             }
>
>             if (cos == null) {
>                 return;
>             }
>
>             element.writeTo(cos);
>         }
>
>         @Override
>         public void flush() throws IOException {
>             if (cos != null) {
>                 cos.flush();
>             }
>         }
>     }
>
> The pipeline that I am using is this:
>
> p.apply(PubsubIO.readProtos(Billing.BillingMeasurement.class).fromSubscription(input
>  + "-billing.proto").withIdAttribute("event_id"))
>  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1L))))
>  .apply(FileIO.<Billing.BillingMeasurement>write().via(new 
> ProtobufFileIOSink<>()).withCompression(Compression.GZIP).to(output + 
> "/billing.proto/").withSuffix(".pb"));
>
> In order to make sifting through the output easier, I would like to have
> the resulting file to be organized by year/month/day/hour so the hours
> looks like:
>
> gs://<bucket-name>/<prefixdir>/<year>/<month>/<day>/<hour>/[filename.pb.gz]
>
> I tired looking through FileIO.writeDynamic() and FileNaming but I am not
> sure if that is the correct place. Is there an example or another
> implementation that someone can point me to that would be a good place to
> look at.
>
> — Ankur Chauhan
>

Reply via email to