Ah yes...

On Fri, May 1, 2020 at 8:30 AM Truebody, Kyle <truebo...@dnb.com> wrote:

> Yes data will be keyed by shard.
>
>
>
> This is the trigger config we used:
>
>
>
> WindowFileNamePolicy policy = new
> WindowFileNamePolicy(prefix,options.getDataSource());
>
>
>
> TextIO.Write textWriter = TextIO.write()
>
>                 .to(policy)
>
>                 .withTempDirectory(tempPrefix)
>
>                 .withWindowedWrites()
>
>                 .withNumShards(options.getShardCount());
>
>
>
>   batchCollection = batchCollection.apply("Fixed
> Strategy",Window.<String>into(
>
>
> FixedWindows.of(Utilities.resolveDuration(options.getWindowDuration())))
>
>                     .triggering(AfterWatermark.pastEndOfWindow())
>

This trigger will close the window immediately and drop data. You probably
want .trigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) or
.trigger(AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(1))).
These have the same behavior.

In more recent SDKs this pipeline should be rejected to protect you from
data loss caused by the trigger "closing" and dropping data. What version
of SDK are you using?

Kenn



>
> .withAllowedLateness(Utilities.resolveDuration(options.getWindowLateness()))
>
>                     .discardingFiredPanes()).apply(textWriter);
>
>
>
>
>
>
>
> *From:* Kenneth Knowles <k...@apache.org>
> *Sent:* Friday, May 1, 2020 4:19 PM
> *To:* user <user@beam.apache.org>
> *Subject:* Re: Notifying the closure of a Window Period
>
>
>
> *CAUTION:* This email originated from outside of D&B. Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> Is the data keyed by shardNumber? To have a unique final pane for a
> filename prefix, you will need to include the key in the prefix.
>
>
>
> Can you also provide the triggering configuration you are working with?
>
>
>
> Kenn
>
>
>
> On Fri, May 1, 2020 at 6:47 AM Truebody, Kyle <truebo...@dnb.com> wrote:
>
> Hi Kenn,
>
>
>
> Thanks for the response…
>
> Not sure if  I under this correctly : ‘affected by the fact that windows
> processed independently for each key’
>
> I put a high level example below, hope it clarifies what I am trying to
> ask.
>
> Is there more precise way we can get informed of the final pane of a
> window session has been written completely.
>
> Due to nature of coordination set up for downstream consumers, the
> .trigger file delivery needs to be on the completion of the absolute last
> pane.
>
>
>
> ```
>
> public class WindowFileNamePolicy extends FileBasedSink.FilenamePolicy  {
>
>
>
>     private final ResourceId prefix;
>
>
>
>     private final String dataSource;
>
>
>
>     /**
>
>      *  file names - file source name
>
>      *             - timestamp (processing timestamp / event timestamp)
> Based on the current time window
>
>      *             - optional : - shard number
>
>      *                          - window start ts
>
>      * @param prefix
>
>      */
>
>     public WindowFileNamePolicy(ResourceId prefix,String dataSource){
>
>         this.prefix = prefix;
>
>         this.dataSource  = dataSource;
>
>     }
>
>
>
>
>
>     public String filenamePrefixForWindow(IntervalWindow window) {
>
>         String filePrefix = prefix.isDirectory() ? "" :
> prefix.getFilename();
>
>
>
>         DateTimeFormatter formatter =
> DateTimeFormat.forPattern(Utilities.lngDateFormat);
>
>         DateTime windowStart =
> formatter.parseDateTime(window.start().toString());
>
>
>
>         DateTimeFormatter resultformat =
> DateTimeFormat.forPattern(Utilities.shtDateFormat);
>
>
>
>         return String.format(
>
>                 "%s/%s/%s-%s", resultformat.print(windowStart),
> dataSource, dataSource, resultformat.print(windowStart));
>
>     }
>
>
>
>     @Override
>
>     public ResourceId windowedFilename(int shardNumber, int numShards,
> BoundedWindow window, PaneInfo paneInfo, FileBasedSink.OutputFileHints
> outputFileHints) {
>
>         IntervalWindow intervalWindow = (IntervalWindow) window;
>
>         String filename =
>
>                 String.format(
>
>                         "%s-%s-%s",
>
>                         filenamePrefixForWindow(intervalWindow),
>
>                         shardNumber,
>
>                         numShards);
>
>
>
>         if(paneInfo.isLast())
>
>             createTriggerFile(/*tigger file name*/ ".trigger");  *//writes
> to the same directory of the current window.  This fires multiple time
> depending on the number of panes that have isLast() is true/ or write
> operators (not sure exactly).*
>
>         return prefix.getCurrentDirectory().resolve(filename,
> ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
>
>     }
>
> ```
>
>
>
> Thanks,
>
> Kyle
>
>
>
> *From:* Kenneth Knowles <k...@apache.org>
> *Sent:* Friday, May 1, 2020 2:25 PM
> *To:* user <user@beam.apache.org>
> *Subject:* Re: Notifying the closure of a Window Period
>
>
>
> *CAUTION:* This email originated from outside of D&B. Please do not click
> links or open attachments unless you recognize the sender and know the
> content is safe.
>
>
>
> I am guessing you will be affected by the fact that windows processed
> independently for each key. Is that what you are referring to when you
> mention multiple isLast() windows?
>
>
>
> Kenn
>
>
>
> On Fri, May 1, 2020 at 3:36 AM Truebody, Kyle <truebo...@dnb.com> wrote:
>
> Hi all,
>
>
>
> We are working on a streaming pipeline that we need to compatible with out
> legacy platform while we make the move over to Beam Streaming.
>
> Our legacy platform uses a co-ordination framework (oozie). Each step is
> in the coordination pipeline is active by the creation of  a trigger file.
>
>
>
> I am looking for a beam construct or flag that will notify the Context/
> driver of the closure of a Time window. We need to enable to create a
> trigger flag only when all the files have been emitted
>
> from set window period.
>
>
>
> We have tried creating the trigger flag using the  PaneInfo.isLast()
> through a custom WindowFileNamePolicy. Noticed that a window has multiple
> Panes that will have isLast() as true.
>
>
>
> Thanks,
>
> Kyle
>
>

Reply via email to