Thanks a lot everyone for your so valuable feedback!

Just updated my code, made some minor refactoring and seems to be working like 
a charm. Still some data being dropped due to lateness (but I'm talking about 
100 elements per 2 million, so no "big deal" there, I will take a look into 
extending lateness and overall performance bits that I'm missing out).

A thing that worries me a lot is that the wall time has been exponentially 
increasing up to 1 day and 3 hours in the stage that is in charge of writing 
all that captured data into parquet files, supposedly due to .parquet file 
writing code.

I suppose that this is also the reason why I still get tons of small parquet 
files within a same bucket, as I should only have, in a perfect scenario, 4 
files (1 each 15 minutes due to the Window object length), when I'm currently 
having +60!

            .apply("Write .parquet File(s)",
                FileIO
                    .<String, GenericRecord>writeDynamic()
                    .by((SerializableFunction<GenericRecord, String>) event -> {
                        // specify partitioning here
                    })
                    .via(ParquetIO.sink(AVRO_SCHEMA))
                    .to(options.getOutputDirectory())
                    .withNaming(type -> ParquetFileNaming.getNaming(...))
                    .withDestinationCoder(StringUtf8Coder.of())
                    .withNumShards(1) // should this be 0? Could this imply 
increasing of costs if set to 0?

Reply via email to