Thanks a lot everyone for your so valuable feedback!
Just updated my code, made some minor refactoring and seems to be working like
a charm. Still some data being dropped due to lateness (but I'm talking about
100 elements per 2 million, so no "big deal" there, I will take a look into
extending lateness and overall performance bits that I'm missing out).
A thing that worries me a lot is that the wall time has been exponentially
increasing up to 1 day and 3 hours in the stage that is in charge of writing
all that captured data into parquet files, supposedly due to .parquet file
writing code.
I suppose that this is also the reason why I still get tons of small parquet
files within a same bucket, as I should only have, in a perfect scenario, 4
files (1 each 15 minutes due to the Window object length), when I'm currently
having +60!
.apply("Write .parquet File(s)",
FileIO
.<String, GenericRecord>writeDynamic()
.by((SerializableFunction<GenericRecord, String>) event -> {
// specify partitioning here
})
.via(ParquetIO.sink(AVRO_SCHEMA))
.to(options.getOutputDirectory())
.withNaming(type -> ParquetFileNaming.getNaming(...))
.withDestinationCoder(StringUtf8Coder.of())
.withNumShards(1) // should this be 0? Could this imply
increasing of costs if set to 0?