Is the data keyed by shardNumber? To have a unique final pane for a filename prefix, you will need to include the key in the prefix.
Can you also provide the triggering configuration you are working with? Kenn On Fri, May 1, 2020 at 6:47 AM Truebody, Kyle <truebo...@dnb.com> wrote: > Hi Kenn, > > > > Thanks for the response… > > Not sure if I under this correctly : ‘affected by the fact that windows > processed independently for each key’ > > I put a high level example below, hope it clarifies what I am trying to > ask. > > Is there more precise way we can get informed of the final pane of a > window session has been written completely. > > Due to nature of coordination set up for downstream consumers, the > .trigger file delivery needs to be on the completion of the absolute last > pane. > > > > ``` > > public class WindowFileNamePolicy extends FileBasedSink.FilenamePolicy { > > > > private final ResourceId prefix; > > > > private final String dataSource; > > > > /** > > * file names - file source name > > * - timestamp (processing timestamp / event timestamp) > Based on the current time window > > * - optional : - shard number > > * - window start ts > > * @param prefix > > */ > > public WindowFileNamePolicy(ResourceId prefix,String dataSource){ > > this.prefix = prefix; > > this.dataSource = dataSource; > > } > > > > > > public String filenamePrefixForWindow(IntervalWindow window) { > > String filePrefix = prefix.isDirectory() ? "" : > prefix.getFilename(); > > > > DateTimeFormatter formatter = > DateTimeFormat.forPattern(Utilities.lngDateFormat); > > DateTime windowStart = > formatter.parseDateTime(window.start().toString()); > > > > DateTimeFormatter resultformat = > DateTimeFormat.forPattern(Utilities.shtDateFormat); > > > > return String.format( > > "%s/%s/%s-%s", resultformat.print(windowStart), > dataSource, dataSource, resultformat.print(windowStart)); > > } > > > > @Override > > public ResourceId windowedFilename(int shardNumber, int numShards, > BoundedWindow window, PaneInfo paneInfo, FileBasedSink.OutputFileHints > outputFileHints) { > > IntervalWindow intervalWindow = (IntervalWindow) window; > > String filename = > > String.format( > > "%s-%s-%s", > > filenamePrefixForWindow(intervalWindow), > > shardNumber, > > numShards); > > > > if(paneInfo.isLast()) > > createTriggerFile(/*tigger file name*/ ".trigger"); *//writes > to the same directory of the current window. This fires multiple time > depending on the number of panes that have isLast() is true/ or write > operators (not sure exactly).* > > return prefix.getCurrentDirectory().resolve(filename, > ResolveOptions.StandardResolveOptions.RESOLVE_FILE); > > } > > ``` > > > > Thanks, > > Kyle > > > > *From:* Kenneth Knowles <k...@apache.org> > *Sent:* Friday, May 1, 2020 2:25 PM > *To:* user <user@beam.apache.org> > *Subject:* Re: Notifying the closure of a Window Period > > > > *CAUTION:* This email originated from outside of D&B. Please do not click > links or open attachments unless you recognize the sender and know the > content is safe. > > > > I am guessing you will be affected by the fact that windows processed > independently for each key. Is that what you are referring to when you > mention multiple isLast() windows? > > > > Kenn > > > > On Fri, May 1, 2020 at 3:36 AM Truebody, Kyle <truebo...@dnb.com> wrote: > > Hi all, > > > > We are working on a streaming pipeline that we need to compatible with out > legacy platform while we make the move over to Beam Streaming. > > Our legacy platform uses a co-ordination framework (oozie). Each step is > in the coordination pipeline is active by the creation of a trigger file. > > > > I am looking for a beam construct or flag that will notify the Context/ > driver of the closure of a Time window. We need to enable to create a > trigger flag only when all the files have been emitted > > from set window period. > > > > We have tried creating the trigger flag using the PaneInfo.isLast() > through a custom WindowFileNamePolicy. Noticed that a window has multiple > Panes that will have isLast() as true. > > > > Thanks, > > Kyle > >