Been recently developing a Beam (Dataflow) consumer which read from a PubSub 
subscription and outputs to Parquet files the combination of all those objects 
grouped within the same window.

While I was doing testing of this without a huge load everything seemed to work 
fine.

However, after performing some heavy testing I can see that from 1.000.000 
events sent to that PubSub queue, only 1000 make it to Parquet!

According to multiple wall times across different stages, the one which parses 
the events prior applying the window seems to last 58 minutes. The last stage 
which writes to Parquet files lasts 1h and 32 minutes.

I will show now the most relevant parts of the code within, hope you can shed 
some light if its due to the logic that comes before the Window object 
definition or if it's the Window object iself.

pipeline
        .apply("Reading PubSub Events",
            PubsubIO.readMessagesWithAttributes()
                .fromSubscription(options.getSubscription()))
        .apply("Map to AvroSchemaRecord (GenericRecord)",
            ParDo.of(new PubsubMessageToGenericRecord()))
        .setCoder(AvroCoder.of(AVRO_SCHEMA))
        .apply("15m window",
            
Window.<GenericRecord>into(FixedWindows.of(Duration.standardMinutes(15)))
                .triggering(AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(1)))
                .withAllowedLateness(Duration.ZERO)
                .accumulatingFiredPanes()
        )

Also note that I'm running Beam 2.9.0.

Tried moving the logic after the Window definition but still, most messages 
don't make it to the Parquet file.

Could the logic inside the second stage be too heavy so that messages arrive 
too late and get discarded in the Window? The logic basically consists reading 
the payload, parsing into a POJO (reading inner Map attributes, filtering and 
such)

However, if I sent a million events to PubSub, all those million events make it 
till the Parquet write to file stage, but when reading those parquets in Spark 
and checking the records they aren't complete. Does that make sense?

Reply via email to