Hello Beamers,

I’ve been struggling to get windowing working in the way that I need it to. I 
created the simple Python test pipeline below. I expected this to send out the 
grouped windows as they get created. The actual behavior is that it does group 
the outputs into windows but the windows don’t get sent to the next step until 
the entire PCollection has been processed. Is there something wrong with the 
trigger configuration? Any help would be very greatly appreciated!

Thanks,
Jon

class NumGenerator(beam.PTransform):

    def expand(self, input_or_inputs):
        ct = 1
        while ct <= 10:
            time.sleep(.1)
            print(f"generating {ct}")
            yield ct
            ct = ct + 1


def transform(x):
    print(f"transforming {x}")
    time.sleep(.2)
    return x % 2


def test_pipeline():
    print("starting")
    with Pipeline() as p:
        print("in pipeline")
        p \
            | beam.Create(list(range(1, 11))) \
            | 'Add timestamp' >> beam.Map(lambda elem: 
beam.window.TimestampedValue(elem, time.time())) \
            | 'Windowing' >> beam.WindowInto(
                window.FixedWindows(1),
                trigger=trigger.Repeatedly(trigger.AfterCount(1)),
                accumulation_mode=trigger.AccumulationMode.DISCARDING) \
            | 'Grouping by value' >> beam.GroupBy(transform) \
            | "Out2Print" >> beam.ParDo(print)
    print("finished pipeline")


if __name__ == "__main__":
    test_pipeline()

Reply via email to