> This is still confusing to me - why would the messages be dropped as late in 
> this case?

Since you previously mentioned that the bug is due to the pane info
missing, I just pointed out that the WriteFiles logic is expected to
drop the pane info.

@Jose Would it make sense to file a JIRA and summarize all the findings
here?

@Jozef What you describe in
https://www.mail-archive.com/dev@beam.apache.org/msg20186.html is
expected because Flink does not do a GroupByKey on Reshuffle but just
redistributes the elements.

Thanks,
Max

On 18.05.20 21:59, Jose Manuel wrote:
> Hi Reuven, 
> 
> I can try to explaining what I guess. 
> 
> - There is a source which is reading data entries and updating the
> watermark.
> - Then, data entries are grouped and stored in files. 
> - The window information of these data entries are used to emit
> filenames. Data entries's window and timestamp. PaneInfo is empty.
> - When a second window is applied to filenames, if allowlateness is zero
> of lower than the spent time in the previous reading/writing, the
> filenames are discarded as late.
> 
> I guess, the key is in 
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L168
> 
> My assumption is global watermark (or source watermark, I am not sure
> about the name) is used to evaluate the filenames, what are in an
> already emitted window.
> 
> Thanks
> Jose
> 
> 
> El lun., 18 may. 2020 a las 18:37, Reuven Lax (<re...@google.com
> <mailto:re...@google.com>>) escribió:
> 
>     This is still confusing to me - why would the messages be dropped as
>     late in this case?
> 
>     On Mon, May 18, 2020 at 6:14 AM Maximilian Michels <m...@apache.org
>     <mailto:m...@apache.org>> wrote:
> 
>         All runners which use the Beam reference implementation drop the
>         PaneInfo for
>         WriteFilesResult#getPerDestinationOutputFilenames(). That's
>         why we can observe this behavior not only in Flink but also Spark.
> 
>         The WriteFilesResult is returned here:
>         
> https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L363
> 
>         GatherBundlesPerWindow will discard the pane information because all
>         buffered elements are emitted in the FinishBundle method which
>         always
>         has a NO_FIRING (unknown) pane info:
>         
> https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L895
> 
>         So this seems expected behavior. We would need to preserve the
>         panes in
>         the Multimap buffer.
> 
>         -Max
> 
>         On 15.05.20 18:34, Reuven Lax wrote:
>         > Lateness should never be introduced inside a pipeline -
>         generally late
>         > data can only come from a source.  If data was not dropped as late
>         > earlier in the pipeline, it should not be dropped after the
>         file write.
>         > I suspect that this is a bug in how the Flink runner handles the
>         > Reshuffle transform, but I'm not sure what the exact bug is.
>         >
>         > Reuven
>         >
>         > On Fri, May 15, 2020 at 2:23 AM Jozef Vilcek
>         <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>
>         > <mailto:jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>>>
>         wrote:
>         >
>         >     Hi Jose,
>         >
>         >     thank you for putting the effort to get example which
>         >     demonstrate your problem. 
>         >
>         >     You are using a streaming pipeline and it seems that
>         watermark in
>         >     downstream already advanced further, so when your File
>         pane arrives,
>         >     it is already late. Since you define that lateness is not
>         tolerated,
>         >     it is dropped.
>         >     I myself never had requirement to specify zero allowed
>         lateness for
>         >     streaming. It feels dangerous. Do you have a specific use
>         case?
>         >     Also, in may cases, after windowed files are written, I
>         usually
>         >     collect them into global window and specify a different
>         triggering
>         >     policy for collecting them. Both cases are why I never
>         came across
>         >     this situation.
>         >
>         >     I do not have an explanation if it is a bug or not. I
>         would guess
>         >     that watermark can advance further, e.g. because elements
>         can be
>         >     processed in arbitrary order. Not saying this is the case.
>         >     It needs someone with better understanding of how
>         watermark advance
>         >     is / should be handled within pipelines. 
>         >
>         >
>         >     P.S.: you can add `.withTimestampFn()` to your generate
>         sequence, to
>         >     get more stable timing, which is also easier to reason about:
>         >
>         >     Dropping element at 1970-01-01T00:00:19.999Z for key
>         >   
>          ... window:[1970-01-01T00:00:15.000Z..1970-01-01T00:00:20.000Z)
>         >     since too far behind inputWatermark:1970-01-01T00:00:24.000Z;
>         >     outputWatermark:1970-01-01T00:00:24
>         >     .000Z
>         >
>         >                instead of
>         >
>         >     Dropping element at 2020-05-15T08:52:34.999Z for key ...
>         >   
>          window:[2020-05-15T08:52:30.000Z..2020-05-15T08:52:35.000Z) since
>         >     too far behind inputWatermark:2020-05-15T08:52:39.318Z;
>         >     outputWatermark:2020-05-15T08:52:39.318Z
>         >
>         >
>         >
>         >
>         >     In my
>         >
>         >
>         >
>         >     On Thu, May 14, 2020 at 10:47 AM Jose Manuel
>         <kiuby88....@gmail.com <mailto:kiuby88....@gmail.com>
>         >     <mailto:kiuby88....@gmail.com
>         <mailto:kiuby88....@gmail.com>>> wrote:
>         >
>         >         Hi again, 
>         >
>         >         I have simplify the example to reproduce the data
>         loss. The
>         >         scenario is the following:
>         >
>         >         - TextIO write files. 
>         >         - getPerDestinationOutputFilenames emits file names 
>         >         - File names are processed by a aggregator (combine,
>         distinct,
>         >         groupbyKey...) with a window **without allowlateness** 
>         >         - File names are discarded as late
>         >
>         >         Here you can see the data loss in the picture
>         >       
>          in 
> https://github.com/kiuby88/windowing-textio/blob/master/README.md#showing-data-loss
>         >
>         >         Please, follow README to run the pipeline and find log
>         traces
>         >         that say data are dropped as late.
>         >         Remember, you can run the pipeline with another
>         >         window's  lateness values (check README.md)
>         >
>         >         Kby.
>         >
>         >         El mar., 12 may. 2020 a las 17:16, Jose Manuel
>         >         (<kiuby88....@gmail.com <mailto:kiuby88....@gmail.com>
>         <mailto:kiuby88....@gmail.com <mailto:kiuby88....@gmail.com>>>)
>         escribió:
>         >
>         >             Hi,
>         >
>         >             I would like to clarify that while TextIO is
>         writing every
>         >             data are in the files (shards). The losing happens
>         when file
>         >             names emitted by getPerDestinationOutputFilenames are
>         >             processed by a window.
>         >
>         >             I have created a pipeline to reproduce the
>         scenario in which
>         >             some filenames are loss after the
>         >             getPerDestinationOutputFilenames. Please, note I
>         tried to
>         >             simplify the code as much as possible, but the
>         scenario is
>         >             not easy to reproduce.
>         >
>         >             Please check this project
>         >             https://github.com/kiuby88/windowing-textio
>         >             Check readme to build and run
>         >           
>          (https://github.com/kiuby88/windowing-textio#build-and-run)
>         >             Project contains only a class with the
>         >             pipeline PipelineWithTextIo, a log4j2.xml file in the
>         >             resources and the pom.
>         >
>         >             The pipeline in PipelineWithTextIo generates
>         unbounded data
>         >             using a sequence. It adds a little delay (10s) per
>         data
>         >             entry, it uses a distinct (just to apply the
>         window), and
>         >             then it writes data using TexIO.
>         >             The windows for the distinct is fixed (5 seconds)
>         and it
>         >             does not use lateness.
>         >             Generated files can be found in
>         >             windowing-textio/pipe_with_lateness_0s/files. To
>         write files
>         >             the FileNamePolicy uses window + timing + shards
>         >           
>          (see 
> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L135)
>         >             Files are emitted using
>         getPerDestinationOutputFilenames()
>         >             (see the code here,
>         >           
>          
> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L71-L78)
>         >
>         >             Then, File names in the PCollection are extracted and
>         >             logged. Please, note file names dot not have pain
>         >             information in that point.
>         >
>         >             To apply a window a distinct is used again. Here
>         several
>         >             files are discarded as late and they are not
>         processed by
>         >             this second distinct. Please, see
>         >           
>          
> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L80-L83
>         >
>         >             Debug is enabled for WindowTracing, so you can
>         find in the
>         >             terminal several messages as the followiing:
>         >             DEBUG org.apache.beam.sdk.util.WindowTracing -
>         >             LateDataFilter: Dropping element at
>         2020-05-12T14:05:14.999Z
>         >             for
>         >           
>          
> key:path/pipe_with_lateness_0s/files/[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)-ON_TIME-0-of-1.txt;
>         >           
>          window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)
>         >             since too far behind
>         >             inputWatermark:2020-05-12T14:05:19.799Z;
>         >             outputWatermark:2020-05-12T14:05:19.799Z`
>         >
>         >             What happen here? I think that messages are
>         generated per
>         >             second and a window of 5 seconds group them. Then
>         a delay is
>         >             added and finally data are written in a file.
>         >             The pipeline reads more data, increasing the
>         watermark.
>         >             Then, file names are emitted without pane
>         information (see
>         >             "Emitted File" in logs). Window in second distinct
>         compares
>         >             file names' timestamp and the pipeline watermark
>         and then it
>         >             discards file names as late.
>         >
>         >
>         >             Bonus
>         >             -----
>         >             You can add a lateness to the pipeline. See
>         >           
>          
> https://github.com/kiuby88/windowing-textio/blob/master/README.md#run-with-lateness
>         >
>         >             If a minute is added a lateness for window the
>         file names
>         >             are processed as late. As result the traces of
>         >             LateDataFilter disappear.
>         >
>         >             Moreover, in order to illustrate better that file
>         names are
>         >             emitted as late for the second discarded I added a
>         second
>         >             TextIO to write file names in other files.
>         >             Same FileNamePolicy than before was used (window +
>         timing +
>         >             shards). Then, you can find files that contains
>         the original
>         >             filenames in
>         >           
>          windowing-textio/pipe_with_lateness_60s/files-after-distinct. This
>         >             is the interesting part, because you will find
>         several files
>         >             with LATE in their names.
>         >
>         >             Please, let me know if you need more information
>         or if the
>         >             example is not enough to check the expected scenarios.
>         >
>         >             Kby.
>         >
>         >
>         >
>         >
>         >
>         >
>         >
>         >
>         >
>         >
>         >
>         >
>         >
>         >
>         >
>         >
>         >
>         >             El dom., 10 may. 2020 a las 17:04, Reuven Lax
>         >             (<re...@google.com <mailto:re...@google.com>
>         <mailto:re...@google.com <mailto:re...@google.com>>>) escribió:
>         >
>         >                 Pane info is supposed to be preserved across
>         transforms.
>         >                 If the Fink runner does not, than I believe
>         that is a bug.
>         >
>         >                 On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek
>         >                 <jozo.vil...@gmail.com
>         <mailto:jozo.vil...@gmail.com> <mailto:jozo.vil...@gmail.com
>         <mailto:jozo.vil...@gmail.com>>>
>         >                 wrote:
>         >
>         >                     I am using FileIO and I do observe the
>         drop of pane
>         >                     info information on Flink runner too. It was
>         >                     mentioned in this thread:
>         >                   
>          https://www.mail-archive.com/dev@beam.apache.org/msg20186.html
>         >
>         >                     It is a result of different reshuffle
>         expansion for
>         >                     optimisation reasons. However, I did not
>         observe a
>         >                     data loss in my case. Windowing and
>         watermark info
>         >                     should be preserved. Pane info is not,
>         which brings
>         >                     a question how reliable pane info should
>         be in terms
>         >                     of SDK and runner.
>         >
>         >                     If you do observe a data loss, it would be
>         great to
>         >                     share a test case which replicates the
>         problem.
>         >
>         >                     On Sun, May 10, 2020 at 8:03 AM Reuven Lax
>         >                     <re...@google.com
>         <mailto:re...@google.com> <mailto:re...@google.com
>         <mailto:re...@google.com>>> wrote:
>         >
>         >                         Ah, I think I see the problem.
>         >
>         >                         It appears that for some reason, the Flink
>         >                         runner loses windowing information when a
>         >                         Reshuffle is applied. I'm not
>         entirely sure why,
>         >                         because windowing information should be
>         >                         maintained across a Reshuffle.
>         >
>         >                         Reuven
>         >
>         >                         On Sat, May 9, 2020 at 9:50 AM Jose Manuel
>         >                         <kiuby88....@gmail.com
>         <mailto:kiuby88....@gmail.com>
>         >                         <mailto:kiuby88....@gmail.com
>         <mailto:kiuby88....@gmail.com>>> wrote:
>         >
>         >
>         >                             Hi,
>         >
>         >                             I have added some logs to the
>         pipeline as
>         >                             following (you can find the log
>         function in
>         >                             the Appendix):
>         >
>         >                             //STREAM + processing time.
>         >                             pipeline.apply(KafkaIO.read())
>         >                                    .apply(...) //mappers, a
>         window and a
>         >                             combine
>         >                                    .apply(logBeforeWrite())
>         >
>         >                                    .apply("WriteFiles",
>         >                           
>          
> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>         >                                  
>          .getPerDestinationOutputFilenames()
>         >
>         >                                    .apply(logAfterWrite())
>         >                                    .apply("CombineFileNames",
>         >                             Combine.perKey(...))
>         >
>         >                             I have run the pipeline using
>         DirectRunner
>         >                             (local), SparkRunner and
>         FlinkRunner, both
>         >                             of them using a cluster.
>         >                             Below you can see the timing and pane
>         >                             information before/after (you can
>         see traces
>         >                             in detail with window and timestamp
>         >                             information in the Appendix).
>         >
>         >                             DirectRunner:
>         >                             [Before Write] timing=ON_TIME, 
>         >                             pane=PaneInfo{isFirst=true,
>         isLast=true,
>         >                             timing=ON_TIME, index=0,
>         onTimeIndex=0}
>         >                             [After    Write] timing=EARLY,     
>         >                              pane=PaneInfo{isFirst=true,
>         timing=EARLY,
>         >                             index=0}                              
>         >
>         >                             FlinkRunner:
>         >                             [Before Write] timing=ON_TIME,   
>         >                             pane=PaneInfo{isFirst=true,
>         isLast=true,
>         >                             timing=ON_TIME, index=0,
>         onTimeIndex=0}
>         >                             [After    Write] timing=UNKNOWN,
>         >                             pane=PaneInfo.NO_FIRING
>         >
>         >                             SparkRunner:
>         >                             [Before Write] timing=ON_TIME,   
>         >                             pane=PaneInfo{isFirst=true,
>         isLast=true,
>         >                             timing=ON_TIME, index=0,
>         onTimeIndex=0}
>         >                             [After    Write] timing=UNKNOWN,
>         >                             pane=PaneInfo.NO_FIRING
>         >
>         >                             It seems DirectRunner propagates the
>         >                             windowing information as expected.
>         >                             I am not sure if TextIO really
>         propagates or
>         >                             it just emits a window pane,
>         because the
>         >                             timing before TextIO is ON_TIME
>         and after
>         >                             TextIO is EARLY.
>         >                             In any case using FlinkRunner and
>         >                             SparkRunner the timing and the
>         pane are not set.
>         >
>         >                             I thought the problem was in
>         >                             GatherBundlesPerWindowFn, but now,
>         after
>         >                             seeing that the DirectRunner filled
>         >                             windowing data... I am not sure.
>         >                           
>          
> https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406
>         >
>         >
>         >                             Appendix
>         >                             -----------
>         >                             Here you can see the log function
>         and traces
>         >                             for different runners in detail.
>         >
>         >                             private SingleOutput<String, String>
>         >                             logBefore() {
>         >                                 return ParDo.of(new
>         DoFn<String, String>() {
>         >                                     @ProcessElement
>         >                                     public void
>         >                             processElement(ProcessContext context,
>         >                             BoundedWindow boundedWindow) {
>         >                                         String value =
>         context.element();
>         >                                         log.info <http://log.info>
>         >                             <http://log.info>("[Before Write]
>         >                             Element=data window={}, timestamp={},
>         >                             timing={}, index ={}, isFirst ={},
>         >                             isLast={}, pane={}",
>         >                                                 boundedWindow,
>         >                                                
>         context.timestamp(),
>         >                                                
>         context.pane().getTiming(),
>         >                                                
>         context.pane().getIndex(),
>         >                                                
>         context.pane().isFirst(),
>         >                                                
>         context.pane().isLast(),
>         >                                                 context.pane()
>         >                                         );
>         >                                        
>         context.output(context.element());
>         >                                     }
>         >                                 });
>         >                             }
>         >
>         >                             logAfter function shows the same
>         information.
>         >
>         >                             Traces in details.
>         >
>         >                             DirectRunner (local):
>         >                             [Before Write] Element=data
>         >                           
>          window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
>         >                             timestamp=2020-05-09T13:39:59.999Z,
>         >                             timing=ON_TIME, index =0, isFirst
>         =true,
>         >                             isLast=true
>          pane=PaneInfo{isFirst=true,
>         >                             isLast=true, timing=ON_TIME, index=0,
>         >                             onTimeIndex=0}
>         >                             [After  Write] Element=file
>         >                           
>          window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
>         >                             timestamp=2020-05-09T13:39:59.999Z,
>         >                             timing=EARLY,   index =0, isFirst
>         =true,
>         >                             isLast=false
>         pane=PaneInfo{isFirst=true,
>         >                             timing=EARLY, index=0}            
>                  
>         >                                    
>         >
>         >                             FlinkRunner (cluster):
>         >                             [Before Write] Element=data
>         >                           
>          window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
>         >                             timestamp=2020-05-09T15:13:59.999Z,
>         >                             timing=ON_TIME, index =0, isFirst
>         =true,
>         >                             isLast=true
>          pane=PaneInfo{isFirst=true,
>         >                             isLast=true, timing=ON_TIME, index=0,
>         >                             onTimeIndex=0}
>         >                             [After  Write] Element=file
>         >                           
>          window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
>         >                             timestamp=2020-05-09T15:13:59.999Z,
>         >                             timing=UNKNOWN, index =0, isFirst
>         =true,
>         >                             isLast=true  pane=PaneInfo.NO_FIRING
>         >
>         >                             SparkRunner (cluster):
>         >                             [Before Write] Element=data
>         >                           
>          window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
>         >                             timestamp=2020-05-09T15:34:59.999Z,
>         >                             timing=ON_TIME, index =0, isFirst
>         =true,
>         >                             isLast=true
>          pane=PaneInfo{isFirst=true,
>         >                             isLast=true, timing=ON_TIME, index=0,
>         >                             onTimeIndex=0}
>         >                             [After  Write] Element=file
>         >                           
>          window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
>         >                             timestamp=2020-05-09T15:34:59.999Z,
>         >                             timing=UNKNOWN, index =0, isFirst
>         =true,
>         >                             isLast=true  pane=PaneInfo.NO_FIRING
>         >
>         >
>         >                             El vie., 8 may. 2020 a las 19:01,
>         Reuven Lax
>         >                             (<re...@google.com
>         <mailto:re...@google.com>
>         >                             <mailto:re...@google.com
>         <mailto:re...@google.com>>>) escribió:
>         >
>         >                                 The window information should
>         still be
>         >                                 there.  Beam propagates
>         windows through
>         >                                 PCollection, and I don't think
>         >                                 WriteFiles does anything
>         explicit to
>         >                                 stop that. 
>         >
>         >                                 Can you try this with the
>         direct runner
>         >                                 to see what happens there?
>         >                                 What is your windowing on this
>         PCollection?
>         >
>         >                                 Reuven
>         >
>         >                                 On Fri, May 8, 2020 at 3:49 AM
>         Jose
>         >                                 Manuel <kiuby88....@gmail.com
>         <mailto:kiuby88....@gmail.com>
>         >                                 <mailto:kiuby88....@gmail.com
>         <mailto:kiuby88....@gmail.com>>> wrote:
>         >
>         >                                     I got the same behavior
>         using Spark
>         >                                     Runner (with Spark 2.4.3),
>         window
>         >                                     information was missing.
>         >
>         >                                     Just to clarify, the
>         combiner after
>         >                                     TextIO had different
>         results. In
>         >                                     Flink runner the files
>         names were
>         >                                     dropped, and in Spark the
>         >                                     combination process
>         happened twice,
>         >                                     duplicating data.  I think
>         it is
>         >                                     because different runners
>         manage in
>         >                                     a different way the data
>         without the
>         >                                     windowing information.
>         >
>         >
>         >
>         >                                     El vie., 8 may. 2020 a las
>         0:45,
>         >        ��                            Luke Cwik
>         (<lc...@google.com <mailto:lc...@google.com>
>         >                                     <mailto:lc...@google.com
>         <mailto:lc...@google.com>>>) escribió:
>         >
>         >                                         +dev
>         <mailto:d...@beam.apache.org <mailto:d...@beam.apache.org>> 
>         >
>         >                                         On Mon, May 4, 2020 at
>         3:56 AM
>         >                                         Jose Manuel
>         >                                         <kiuby88....@gmail.com
>         <mailto:kiuby88....@gmail.com>
>         >                                       
>          <mailto:kiuby88....@gmail.com <mailto:kiuby88....@gmail.com>>>
>         >                                         wrote:
>         >
>         >                                             Hi guys,
>         >
>         >                                             I think I have found
>         >                                             something
>         interesting about
>         >                                             windowing.
>         >
>         >                                             I have a pipeline
>         that gets
>         >                                             data from Kafka
>         and writes
>         >                                             in HDFS by means
>         of TextIO.
>         >                                             Once written,
>         generated
>         >                                             files are combined
>         to apply
>         >                                             some custom
>         operations.
>         >                                             However, the
>         combine does
>         >                                             not receive data.
>         Following,
>         >                                             you can find the
>         >                                             highlight of my
>         pipeline.
>         >
>         >                                             //STREAM +
>         processing time.
>         >                                           
>          pipeline.apply(KafkaIO.read())
>         >                                                     .apply(...)
>         >                                             //mappers, a
>         window and a
>         >                                             combine
>         >
>         >                                                    
>         .apply("WriteFiles",
>         >                                           
>          
> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>         >                                                    
>         >                                           
>          .getPerDestinationOutputFilenames()
>         >                                                    
>         >                                           
>          .apply("CombineFileNames",
>         >                                             Combine.perKey(...))
>         >
>         >                                             Running the
>         pipeline with
>         >                                             Flink I have found
>         a log
>         >                                             trace that says
>         data are
>         >                                             discarded as late
>         in the
>         >                                             combine
>         CombineFileNames.
>         >                                             Then, I have added
>         >                                             AllowedLateness to
>         pipeline
>         >                                             window strategy, as a
>         >                                             workaround.
>         >                                             It works by now,
>         but this
>         >                                             opens several
>         questions to me
>         >
>         >                                             I think the problem is
>         >                                           
>          getPerDestinationOutputFilenames
>         >                                             generates files, but
>         >                                             it does not
>         maintain the
>         >                                             windowing
>         information. Then,
>         >                                             CombineFileNames
>         compares
>         >                                             file names with the
>         >                                             watermark of the
>         pipeline
>         >                                             and discards them
>         as late.
>         >
>         >                                             Is there any issue
>         with
>         >                                           
>          getPerDestinationOutputFilenames?
>         >                                             Maybe, I am doing
>         something
>         >                                             wrong
>         >                                             and using
>         >                                           
>          getPerDestinationOutputFilenames
>         >                                             + combine does not
>         make sense.
>         >                                             What do you think?
>         >
>         >                                             Please, note I am
>         using Beam
>         >                                             2.17.0 with Flink
>         1.9.1.
>         >
>         >                                             Many thanks,
>         >                                             Jose
>         >
> 

Reply via email to