Is the data keyed by shardNumber? To have a unique final pane for a
filename prefix, you will need to include the key in the prefix.

Can you also provide the triggering configuration you are working with?

Kenn

On Fri, May 1, 2020 at 6:47 AM Truebody, Kyle <truebo...@dnb.com> wrote:

> Hi Kenn,
>
>
>
> Thanks for the response…
>
> Not sure if  I under this correctly : ‘affected by the fact that windows
> processed independently for each key’
>
> I put a high level example below, hope it clarifies what I am trying to
> ask.
>
> Is there more precise way we can get informed of the final pane of a
> window session has been written completely.
>
> Due to nature of coordination set up for downstream consumers, the
> .trigger file delivery needs to be on the completion of the absolute last
> pane.
>
>
>
> ```
>
> public class WindowFileNamePolicy extends FileBasedSink.FilenamePolicy  {
>
>
>
>     private final ResourceId prefix;
>
>
>
>     private final String dataSource;
>
>
>
>     /**
>
>      *  file names - file source name
>
>      *             - timestamp (processing timestamp / event timestamp)
> Based on the current time window
>
>      *             - optional : - shard number
>
>      *                          - window start ts
>
>      * @param prefix
>
>      */
>
>     public WindowFileNamePolicy(ResourceId prefix,String dataSource){
>
>         this.prefix = prefix;
>
>         this.dataSource  = dataSource;
>
>     }
>
>
>
>
>
>     public String filenamePrefixForWindow(IntervalWindow window) {
>
>         String filePrefix = prefix.isDirectory() ? "" :
> prefix.getFilename();
>
>
>
>         DateTimeFormatter formatter =
> DateTimeFormat.forPattern(Utilities.lngDateFormat);
>
>         DateTime windowStart =
> formatter.parseDateTime(window.start().toString());
>
>
>
>         DateTimeFormatter resultformat =
> DateTimeFormat.forPattern(Utilities.shtDateFormat);
>
>
>
>         return String.format(
>
>                 "%s/%s/%s-%s", resultformat.print(windowStart),
> dataSource, dataSource, resultformat.print(windowStart));
>
>     }
>
>
>
>     @Override
>
>     public ResourceId windowedFilename(int shardNumber, int numShards,
> BoundedWindow window, PaneInfo paneInfo, FileBasedSink.OutputFileHints
> outputFileHints) {
>
>         IntervalWindow intervalWindow = (IntervalWindow) window;
>
>         String filename =
>
>                 String.format(
>
>                         "%s-%s-%s",
>
>                         filenamePrefixForWindow(intervalWindow),
>
>                         shardNumber,
>
>                         numShards);
>
>
>
>         if(paneInfo.isLast())
>
>             createTriggerFile(/*tigger file name*/ ".trigger");  *//writes
> to the same directory of the current window.  This fires multiple time
> depending on the number of panes that have isLast() is true/ or write
> operators (not sure exactly).*
>
>         return prefix.getCurrentDirectory().resolve(filename,
> ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
>
>     }
>
> ```
>
>
>
> Thanks,
>
> Kyle
>
>
>
> *From:* Kenneth Knowles <k...@apache.org>
> *Sent:* Friday, May 1, 2020 2:25 PM
> *To:* user <user@beam.apache.org>
> *Subject:* Re: Notifying the closure of a Window Period
>
>
>
> *CAUTION:* This email originated from outside of D&B. Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> I am guessing you will be affected by the fact that windows processed
> independently for each key. Is that what you are referring to when you
> mention multiple isLast() windows?
>
>
>
> Kenn
>
>
>
> On Fri, May 1, 2020 at 3:36 AM Truebody, Kyle <truebo...@dnb.com> wrote:
>
> Hi all,
>
>
>
> We are working on a streaming pipeline that we need to compatible with out
> legacy platform while we make the move over to Beam Streaming.
>
> Our legacy platform uses a co-ordination framework (oozie). Each step is
> in the coordination pipeline is active by the creation of  a trigger file.
>
>
>
> I am looking for a beam construct or flag that will notify the Context/
> driver of the closure of a Time window. We need to enable to create a
> trigger flag only when all the files have been emitted
>
> from set window period.
>
>
>
> We have tried creating the trigger flag using the  PaneInfo.isLast()
> through a custom WindowFileNamePolicy. Noticed that a window has multiple
> Panes that will have isLast() as true.
>
>
>
> Thanks,
>
> Kyle
>
>

Reply via email to