> This is still confusing to me - why would the messages be dropped as late in > this case?
Since you previously mentioned that the bug is due to the pane info missing, I just pointed out that the WriteFiles logic is expected to drop the pane info. @Jose Would it make sense to file a JIRA and summarize all the findings here? @Jozef What you describe in https://www.mail-archive.com/dev@beam.apache.org/msg20186.html is expected because Flink does not do a GroupByKey on Reshuffle but just redistributes the elements. Thanks, Max On 18.05.20 21:59, Jose Manuel wrote: > Hi Reuven, > > I can try to explaining what I guess. > > - There is a source which is reading data entries and updating the > watermark. > - Then, data entries are grouped and stored in files. > - The window information of these data entries are used to emit > filenames. Data entries's window and timestamp. PaneInfo is empty. > - When a second window is applied to filenames, if allowlateness is zero > of lower than the spent time in the previous reading/writing, the > filenames are discarded as late. > > I guess, the key is in > https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L168 > > My assumption is global watermark (or source watermark, I am not sure > about the name) is used to evaluate the filenames, what are in an > already emitted window. > > Thanks > Jose > > > El lun., 18 may. 2020 a las 18:37, Reuven Lax (<re...@google.com > <mailto:re...@google.com>>) escribió: > > This is still confusing to me - why would the messages be dropped as > late in this case? > > On Mon, May 18, 2020 at 6:14 AM Maximilian Michels <m...@apache.org > <mailto:m...@apache.org>> wrote: > > All runners which use the Beam reference implementation drop the > PaneInfo for > WriteFilesResult#getPerDestinationOutputFilenames(). That's > why we can observe this behavior not only in Flink but also Spark. > > The WriteFilesResult is returned here: > > https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L363 > > GatherBundlesPerWindow will discard the pane information because all > buffered elements are emitted in the FinishBundle method which > always > has a NO_FIRING (unknown) pane info: > > https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L895 > > So this seems expected behavior. We would need to preserve the > panes in > the Multimap buffer. > > -Max > > On 15.05.20 18:34, Reuven Lax wrote: > > Lateness should never be introduced inside a pipeline - > generally late > > data can only come from a source. If data was not dropped as late > > earlier in the pipeline, it should not be dropped after the > file write. > > I suspect that this is a bug in how the Flink runner handles the > > Reshuffle transform, but I'm not sure what the exact bug is. > > > > Reuven > > > > On Fri, May 15, 2020 at 2:23 AM Jozef Vilcek > <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com> > > <mailto:jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>>> > wrote: > > > > Hi Jose, > > > > thank you for putting the effort to get example which > > demonstrate your problem. > > > > You are using a streaming pipeline and it seems that > watermark in > > downstream already advanced further, so when your File > pane arrives, > > it is already late. Since you define that lateness is not > tolerated, > > it is dropped. > > I myself never had requirement to specify zero allowed > lateness for > > streaming. It feels dangerous. Do you have a specific use > case? > > Also, in may cases, after windowed files are written, I > usually > > collect them into global window and specify a different > triggering > > policy for collecting them. Both cases are why I never > came across > > this situation. > > > > I do not have an explanation if it is a bug or not. I > would guess > > that watermark can advance further, e.g. because elements > can be > > processed in arbitrary order. Not saying this is the case. > > It needs someone with better understanding of how > watermark advance > > is / should be handled within pipelines. > > > > > > P.S.: you can add `.withTimestampFn()` to your generate > sequence, to > > get more stable timing, which is also easier to reason about: > > > > Dropping element at 1970-01-01T00:00:19.999Z for key > > > ... window:[1970-01-01T00:00:15.000Z..1970-01-01T00:00:20.000Z) > > since too far behind inputWatermark:1970-01-01T00:00:24.000Z; > > outputWatermark:1970-01-01T00:00:24 > > .000Z > > > > instead of > > > > Dropping element at 2020-05-15T08:52:34.999Z for key ... > > > window:[2020-05-15T08:52:30.000Z..2020-05-15T08:52:35.000Z) since > > too far behind inputWatermark:2020-05-15T08:52:39.318Z; > > outputWatermark:2020-05-15T08:52:39.318Z > > > > > > > > > > In my > > > > > > > > On Thu, May 14, 2020 at 10:47 AM Jose Manuel > <kiuby88....@gmail.com <mailto:kiuby88....@gmail.com> > > <mailto:kiuby88....@gmail.com > <mailto:kiuby88....@gmail.com>>> wrote: > > > > Hi again, > > > > I have simplify the example to reproduce the data > loss. The > > scenario is the following: > > > > - TextIO write files. > > - getPerDestinationOutputFilenames emits file names > > - File names are processed by a aggregator (combine, > distinct, > > groupbyKey...) with a window **without allowlateness** > > - File names are discarded as late > > > > Here you can see the data loss in the picture > > > in > https://github.com/kiuby88/windowing-textio/blob/master/README.md#showing-data-loss > > > > Please, follow README to run the pipeline and find log > traces > > that say data are dropped as late. > > Remember, you can run the pipeline with another > > window's lateness values (check README.md) > > > > Kby. > > > > El mar., 12 may. 2020 a las 17:16, Jose Manuel > > (<kiuby88....@gmail.com <mailto:kiuby88....@gmail.com> > <mailto:kiuby88....@gmail.com <mailto:kiuby88....@gmail.com>>>) > escribió: > > > > Hi, > > > > I would like to clarify that while TextIO is > writing every > > data are in the files (shards). The losing happens > when file > > names emitted by getPerDestinationOutputFilenames are > > processed by a window. > > > > I have created a pipeline to reproduce the > scenario in which > > some filenames are loss after the > > getPerDestinationOutputFilenames. Please, note I > tried to > > simplify the code as much as possible, but the > scenario is > > not easy to reproduce. > > > > Please check this project > > https://github.com/kiuby88/windowing-textio > > Check readme to build and run > > > (https://github.com/kiuby88/windowing-textio#build-and-run) > > Project contains only a class with the > > pipeline PipelineWithTextIo, a log4j2.xml file in the > > resources and the pom. > > > > The pipeline in PipelineWithTextIo generates > unbounded data > > using a sequence. It adds a little delay (10s) per > data > > entry, it uses a distinct (just to apply the > window), and > > then it writes data using TexIO. > > The windows for the distinct is fixed (5 seconds) > and it > > does not use lateness. > > Generated files can be found in > > windowing-textio/pipe_with_lateness_0s/files. To > write files > > the FileNamePolicy uses window + timing + shards > > > (see > https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L135) > > Files are emitted using > getPerDestinationOutputFilenames() > > (see the code here, > > > > https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L71-L78) > > > > Then, File names in the PCollection are extracted and > > logged. Please, note file names dot not have pain > > information in that point. > > > > To apply a window a distinct is used again. Here > several > > files are discarded as late and they are not > processed by > > this second distinct. Please, see > > > > https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L80-L83 > > > > Debug is enabled for WindowTracing, so you can > find in the > > terminal several messages as the followiing: > > DEBUG org.apache.beam.sdk.util.WindowTracing - > > LateDataFilter: Dropping element at > 2020-05-12T14:05:14.999Z > > for > > > > key:path/pipe_with_lateness_0s/files/[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)-ON_TIME-0-of-1.txt; > > > window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z) > > since too far behind > > inputWatermark:2020-05-12T14:05:19.799Z; > > outputWatermark:2020-05-12T14:05:19.799Z` > > > > What happen here? I think that messages are > generated per > > second and a window of 5 seconds group them. Then > a delay is > > added and finally data are written in a file. > > The pipeline reads more data, increasing the > watermark. > > Then, file names are emitted without pane > information (see > > "Emitted File" in logs). Window in second distinct > compares > > file names' timestamp and the pipeline watermark > and then it > > discards file names as late. > > > > > > Bonus > > ----- > > You can add a lateness to the pipeline. See > > > > https://github.com/kiuby88/windowing-textio/blob/master/README.md#run-with-lateness > > > > If a minute is added a lateness for window the > file names > > are processed as late. As result the traces of > > LateDataFilter disappear. > > > > Moreover, in order to illustrate better that file > names are > > emitted as late for the second discarded I added a > second > > TextIO to write file names in other files. > > Same FileNamePolicy than before was used (window + > timing + > > shards). Then, you can find files that contains > the original > > filenames in > > > windowing-textio/pipe_with_lateness_60s/files-after-distinct. This > > is the interesting part, because you will find > several files > > with LATE in their names. > > > > Please, let me know if you need more information > or if the > > example is not enough to check the expected scenarios. > > > > Kby. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > El dom., 10 may. 2020 a las 17:04, Reuven Lax > > (<re...@google.com <mailto:re...@google.com> > <mailto:re...@google.com <mailto:re...@google.com>>>) escribió: > > > > Pane info is supposed to be preserved across > transforms. > > If the Fink runner does not, than I believe > that is a bug. > > > > On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek > > <jozo.vil...@gmail.com > <mailto:jozo.vil...@gmail.com> <mailto:jozo.vil...@gmail.com > <mailto:jozo.vil...@gmail.com>>> > > wrote: > > > > I am using FileIO and I do observe the > drop of pane > > info information on Flink runner too. It was > > mentioned in this thread: > > > https://www.mail-archive.com/dev@beam.apache.org/msg20186.html > > > > It is a result of different reshuffle > expansion for > > optimisation reasons. However, I did not > observe a > > data loss in my case. Windowing and > watermark info > > should be preserved. Pane info is not, > which brings > > a question how reliable pane info should > be in terms > > of SDK and runner. > > > > If you do observe a data loss, it would be > great to > > share a test case which replicates the > problem. > > > > On Sun, May 10, 2020 at 8:03 AM Reuven Lax > > <re...@google.com > <mailto:re...@google.com> <mailto:re...@google.com > <mailto:re...@google.com>>> wrote: > > > > Ah, I think I see the problem. > > > > It appears that for some reason, the Flink > > runner loses windowing information when a > > Reshuffle is applied. I'm not > entirely sure why, > > because windowing information should be > > maintained across a Reshuffle. > > > > Reuven > > > > On Sat, May 9, 2020 at 9:50 AM Jose Manuel > > <kiuby88....@gmail.com > <mailto:kiuby88....@gmail.com> > > <mailto:kiuby88....@gmail.com > <mailto:kiuby88....@gmail.com>>> wrote: > > > > > > Hi, > > > > I have added some logs to the > pipeline as > > following (you can find the log > function in > > the Appendix): > > > > //STREAM + processing time. > > pipeline.apply(KafkaIO.read()) > > .apply(...) //mappers, a > window and a > > combine > > .apply(logBeforeWrite()) > > > > .apply("WriteFiles", > > > > TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) > > > .getPerDestinationOutputFilenames() > > > > .apply(logAfterWrite()) > > .apply("CombineFileNames", > > Combine.perKey(...)) > > > > I have run the pipeline using > DirectRunner > > (local), SparkRunner and > FlinkRunner, both > > of them using a cluster. > > Below you can see the timing and pane > > information before/after (you can > see traces > > in detail with window and timestamp > > information in the Appendix). > > > > DirectRunner: > > [Before Write] timing=ON_TIME, > > pane=PaneInfo{isFirst=true, > isLast=true, > > timing=ON_TIME, index=0, > onTimeIndex=0} > > [After Write] timing=EARLY, > > pane=PaneInfo{isFirst=true, > timing=EARLY, > > index=0} > > > > FlinkRunner: > > [Before Write] timing=ON_TIME, > > pane=PaneInfo{isFirst=true, > isLast=true, > > timing=ON_TIME, index=0, > onTimeIndex=0} > > [After Write] timing=UNKNOWN, > > pane=PaneInfo.NO_FIRING > > > > SparkRunner: > > [Before Write] timing=ON_TIME, > > pane=PaneInfo{isFirst=true, > isLast=true, > > timing=ON_TIME, index=0, > onTimeIndex=0} > > [After Write] timing=UNKNOWN, > > pane=PaneInfo.NO_FIRING > > > > It seems DirectRunner propagates the > > windowing information as expected. > > I am not sure if TextIO really > propagates or > > it just emits a window pane, > because the > > timing before TextIO is ON_TIME > and after > > TextIO is EARLY. > > In any case using FlinkRunner and > > SparkRunner the timing and the > pane are not set. > > > > I thought the problem was in > > GatherBundlesPerWindowFn, but now, > after > > seeing that the DirectRunner filled > > windowing data... I am not sure. > > > > https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406 > > > > > > Appendix > > ----------- > > Here you can see the log function > and traces > > for different runners in detail. > > > > private SingleOutput<String, String> > > logBefore() { > > return ParDo.of(new > DoFn<String, String>() { > > @ProcessElement > > public void > > processElement(ProcessContext context, > > BoundedWindow boundedWindow) { > > String value = > context.element(); > > log.info <http://log.info> > > <http://log.info>("[Before Write] > > Element=data window={}, timestamp={}, > > timing={}, index ={}, isFirst ={}, > > isLast={}, pane={}", > > boundedWindow, > > > context.timestamp(), > > > context.pane().getTiming(), > > > context.pane().getIndex(), > > > context.pane().isFirst(), > > > context.pane().isLast(), > > context.pane() > > ); > > > context.output(context.element()); > > } > > }); > > } > > > > logAfter function shows the same > information. > > > > Traces in details. > > > > DirectRunner (local): > > [Before Write] Element=data > > > window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), > > timestamp=2020-05-09T13:39:59.999Z, > > timing=ON_TIME, index =0, isFirst > =true, > > isLast=true > pane=PaneInfo{isFirst=true, > > isLast=true, timing=ON_TIME, index=0, > > onTimeIndex=0} > > [After Write] Element=file > > > window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), > > timestamp=2020-05-09T13:39:59.999Z, > > timing=EARLY, index =0, isFirst > =true, > > isLast=false > pane=PaneInfo{isFirst=true, > > timing=EARLY, index=0} > > > > > > > FlinkRunner (cluster): > > [Before Write] Element=data > > > window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), > > timestamp=2020-05-09T15:13:59.999Z, > > timing=ON_TIME, index =0, isFirst > =true, > > isLast=true > pane=PaneInfo{isFirst=true, > > isLast=true, timing=ON_TIME, index=0, > > onTimeIndex=0} > > [After Write] Element=file > > > window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), > > timestamp=2020-05-09T15:13:59.999Z, > > timing=UNKNOWN, index =0, isFirst > =true, > > isLast=true pane=PaneInfo.NO_FIRING > > > > SparkRunner (cluster): > > [Before Write] Element=data > > > window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), > > timestamp=2020-05-09T15:34:59.999Z, > > timing=ON_TIME, index =0, isFirst > =true, > > isLast=true > pane=PaneInfo{isFirst=true, > > isLast=true, timing=ON_TIME, index=0, > > onTimeIndex=0} > > [After Write] Element=file > > > window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), > > timestamp=2020-05-09T15:34:59.999Z, > > timing=UNKNOWN, index =0, isFirst > =true, > > isLast=true pane=PaneInfo.NO_FIRING > > > > > > El vie., 8 may. 2020 a las 19:01, > Reuven Lax > > (<re...@google.com > <mailto:re...@google.com> > > <mailto:re...@google.com > <mailto:re...@google.com>>>) escribió: > > > > The window information should > still be > > there. Beam propagates > windows through > > PCollection, and I don't think > > WriteFiles does anything > explicit to > > stop that. > > > > Can you try this with the > direct runner > > to see what happens there? > > What is your windowing on this > PCollection? > > > > Reuven > > > > On Fri, May 8, 2020 at 3:49 AM > Jose > > Manuel <kiuby88....@gmail.com > <mailto:kiuby88....@gmail.com> > > <mailto:kiuby88....@gmail.com > <mailto:kiuby88....@gmail.com>>> wrote: > > > > I got the same behavior > using Spark > > Runner (with Spark 2.4.3), > window > > information was missing. > > > > Just to clarify, the > combiner after > > TextIO had different > results. In > > Flink runner the files > names were > > dropped, and in Spark the > > combination process > happened twice, > > duplicating data. I think > it is > > because different runners > manage in > > a different way the data > without the > > windowing information. > > > > > > > > El vie., 8 may. 2020 a las > 0:45, > > �� Luke Cwik > (<lc...@google.com <mailto:lc...@google.com> > > <mailto:lc...@google.com > <mailto:lc...@google.com>>>) escribió: > > > > +dev > <mailto:d...@beam.apache.org <mailto:d...@beam.apache.org>> > > > > On Mon, May 4, 2020 at > 3:56 AM > > Jose Manuel > > <kiuby88....@gmail.com > <mailto:kiuby88....@gmail.com> > > > <mailto:kiuby88....@gmail.com <mailto:kiuby88....@gmail.com>>> > > wrote: > > > > Hi guys, > > > > I think I have found > > something > interesting about > > windowing. > > > > I have a pipeline > that gets > > data from Kafka > and writes > > in HDFS by means > of TextIO. > > Once written, > generated > > files are combined > to apply > > some custom > operations. > > However, the > combine does > > not receive data. > Following, > > you can find the > > highlight of my > pipeline. > > > > //STREAM + > processing time. > > > pipeline.apply(KafkaIO.read()) > > .apply(...) > > //mappers, a > window and a > > combine > > > > > .apply("WriteFiles", > > > > TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) > > > > > .getPerDestinationOutputFilenames() > > > > > .apply("CombineFileNames", > > Combine.perKey(...)) > > > > Running the > pipeline with > > Flink I have found > a log > > trace that says > data are > > discarded as late > in the > > combine > CombineFileNames. > > Then, I have added > > AllowedLateness to > pipeline > > window strategy, as a > > workaround. > > It works by now, > but this > > opens several > questions to me > > > > I think the problem is > > > getPerDestinationOutputFilenames > > generates files, but > > it does not > maintain the > > windowing > information. Then, > > CombineFileNames > compares > > file names with the > > watermark of the > pipeline > > and discards them > as late. > > > > Is there any issue > with > > > getPerDestinationOutputFilenames? > > Maybe, I am doing > something > > wrong > > and using > > > getPerDestinationOutputFilenames > > + combine does not > make sense. > > What do you think? > > > > Please, note I am > using Beam > > 2.17.0 with Flink > 1.9.1. > > > > Many thanks, > > Jose > > >