Hi,
I have a pipeline continuously reading files from a folder where files can be
created at a high throughput (with peaks around 100 files/sec), each file is a
few Mb of avro data.
The only purpose of my pipeline is to read those files as they come,
deserialize them to get the GenericRecords and write them back in .parquet
files at a different location.
The simplified code looks as follow:
pipeline
.apply("Get patterns from Configuration", Create.of(getPaths()))
.apply("Match All Continuously", FileIO.matchAll() every 5 seconds))))
.apply(FileIO.readMatches())
.apply(AvroIO.parseFilesGenericRecords())
.apply(window of 10 seconds)
.apply(FileIO.writeDynamic() with ParquetIO.sink())
The issue is that all elements that come out of the FileIO.matchAll() will end
up having the same timestamp if they were retrieved during the same polling,
i.e. if the matchAll() matched 500 files, all records from these files will
have the same timestamp and end up in the same window (and in some cases throw
OOM exceptions).
I have tried adding a ParDo to reassign the timestamp to local time after we
parse the records but without any noticeable effect.
I have also tried to add a "rate limiter" just after matching files and before
reading them, but in this case records are stuck in the "groupByShards"
operation of FileIO, I'm not sure but I think it might be because the watermark
is still unchanged so it still waits for all files that were matched in the
matchAll() operation.
Any ideas on how to solve that ?
Thanks,
Jean Wisser.
Jean Wisser
Data Engineer
[https://www.flowtraders.com/sites/flowtraders-corp/files/images/flow-traders-logo-mv.png]
Flow Traders B.V.
T: +31 20 799 6497
F: +31 20 799 6780
Jacob Bontiusplaats 9
1018 LL Amsterdam
Nederland
www.flowtraders.com<http://www.flowtraders.com>
Flow Traders B.V. has its seat in Amsterdam, Nederland, its registered office
at Jacob Bontiusplaats 9, 1018 LL, Amsterdam, Nederland and is registered with
the Trade Registry of the Chamber of Commerce under number 33.22.3268. This
message may contain information that is not intended for you. If you are not
the addressee or if this message was sent to you by mistake, you are requested
to inform the sender and delete the message. This message may not be forwarded
or published to any other person than its addressees without Flow Traders
B.V.'s prior consent. Flow Traders B.V. accepts no liability for damage of any
kind resulting from the risks inherent in the electronic transmission of
messages.