Re: TextIO. Writing late files

2020-05-21 Thread Reuven Lax
On Tue, May 19, 2020 at 2:02 AM Maximilian Michels wrote: > > This is still confusing to me - why would the messages be dropped as > late in this case? > > Since you previously mentioned that the bug is due to the pane info > missing, I just pointed out that the WriteFiles logic is expected to >

Re: TextIO. Writing late files

2020-05-19 Thread Maximilian Michels
> This is still confusing to me - why would the messages be dropped as late in > this case? Since you previously mentioned that the bug is due to the pane info missing, I just pointed out that the WriteFiles logic is expected to drop the pane info. @Jose Would it make sense to file a JIRA and su

Re: TextIO. Writing late files

2020-05-18 Thread Jose Manuel
Hi Reuven, I can try to explaining what I guess. - There is a source which is reading data entries and updating the watermark. - Then, data entries are grouped and stored in files. - The window information of these data entries are used to emit filenames. Data entries's window and timestamp. Pane

Re: TextIO. Writing late files

2020-05-18 Thread Reuven Lax
This is still confusing to me - why would the messages be dropped as late in this case? On Mon, May 18, 2020 at 6:14 AM Maximilian Michels wrote: > All runners which use the Beam reference implementation drop the > PaneInfo for WriteFilesResult#getPerDestinationOutputFilenames(). That's > why we

Re: TextIO. Writing late files

2020-05-18 Thread Jose Manuel
Hi, Many thanks for your responses. I agree with you, Reuven, source is who should determine if data are late or not. Maximilian, I agree with you, as I mentioned in previous emails I saw the same behavior with Spark, and I guessed the problem was here https://github.com/apache/beam/blob/6a4ef33

Re: TextIO. Writing late files

2020-05-18 Thread Maximilian Michels
All runners which use the Beam reference implementation drop the PaneInfo for WriteFilesResult#getPerDestinationOutputFilenames(). That's why we can observe this behavior not only in Flink but also Spark. The WriteFilesResult is returned here: https://github.com/apache/beam/blob/d773f8ca7a4d63d014

Re: TextIO. Writing late files

2020-05-15 Thread Reuven Lax
Lateness should never be introduced inside a pipeline - generally late data can only come from a source. If data was not dropped as late earlier in the pipeline, it should not be dropped after the file write. I suspect that this is a bug in how the Flink runner handles the Reshuffle transform, but

Re: TextIO. Writing late files

2020-05-15 Thread Jozef Vilcek
Hi Jose, thank you for putting the effort to get example which demonstrate your problem. You are using a streaming pipeline and it seems that watermark in downstream already advanced further, so when your File pane arrives, it is already late. Since you define that lateness is not tolerated, it i

Re: TextIO. Writing late files

2020-05-14 Thread Jose Manuel
Hi again, I have simplify the example to reproduce the data loss. The scenario is the following: - TextIO write files. - getPerDestinationOutputFilenames emits file names - File names are processed by a aggregator (combine, distinct, groupbyKey...) with a window **without allowlateness** - File n

Re: TextIO. Writing late files

2020-05-12 Thread Jose Manuel
Hi, I would like to clarify that while TextIO is writing every data are in the files (shards). The losing happens when file names emitted by getPerDestinationOutputFilenames are processed by a window. I have created a pipeline to reproduce the scenario in which some filenames are loss after the g

Re: TextIO. Writing late files

2020-05-10 Thread Reuven Lax
Pane info is supposed to be preserved across transforms. If the Fink runner does not, than I believe that is a bug. On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek wrote: > I am using FileIO and I do observe the drop of pane info information on > Flink runner too. It was mentioned in this thread: >

Re: TextIO. Writing late files

2020-05-09 Thread Jozef Vilcek
I am using FileIO and I do observe the drop of pane info information on Flink runner too. It was mentioned in this thread: https://www.mail-archive.com/dev@beam.apache.org/msg20186.html It is a result of different reshuffle expansion for optimisation reasons. However, I did not observe a data loss

Re: TextIO. Writing late files

2020-05-09 Thread Reuven Lax
Ah, I think I see the problem. It appears that for some reason, the Flink runner loses windowing information when a Reshuffle is applied. I'm not entirely sure why, because windowing information should be maintained across a Reshuffle. Reuven On Sat, May 9, 2020 at 9:50 AM Jose Manuel wrote: >

Re: TextIO. Writing late files

2020-05-09 Thread Jose Manuel
Hi, I have added some logs to the pipeline as following (you can find the log function in the Appendix): //STREAM + processing time. pipeline.apply(KafkaIO.read()) .apply(...) //mappers, a window and a combine .apply(logBeforeWrite()) .apply("WriteFiles", TextIO.writeCustomT

Re: TextIO. Writing late files

2020-05-08 Thread Reuven Lax
The window information should still be there. Beam propagates windows through PCollection, and I don't think WriteFiles does anything explicit to stop that. Can you try this with the direct runner to see what happens there? What is your windowing on this PCollection? Reuven On Fri, May 8, 2020

Re: TextIO. Writing late files

2020-05-08 Thread Jose Manuel
I got the same behavior using Spark Runner (with Spark 2.4.3), window information was missing. Just to clarify, the combiner after TextIO had different results. In Flink runner the files names were dropped, and in Spark the combination process happened twice, duplicating data. I think it is becau

Re: TextIO. Writing late files

2020-05-07 Thread Luke Cwik
+dev On Mon, May 4, 2020 at 3:56 AM Jose Manuel wrote: > Hi guys, > > I think I have found something interesting about windowing. > > I have a pipeline that gets data from Kafka and writes in HDFS by means of > TextIO. > Once written, generated files are combined to apply some custom operations

TextIO. Writing late files

2020-05-04 Thread Jose Manuel
Hi guys, I think I have found something interesting about windowing. I have a pipeline that gets data from Kafka and writes in HDFS by means of TextIO. Once written, generated files are combined to apply some custom operations. However, the combine does not receive data. Following, you can find t