Hello Beamers,
I’ve been struggling to get windowing working in the way that I need it to. I
created the simple Python test pipeline below. I expected this to send out the
grouped windows as they get created. The actual behavior is that it does group
the outputs into windows but the windows don’t get sent to the next step until
the entire PCollection has been processed. Is there something wrong with the
trigger configuration? Any help would be very greatly appreciated!
Thanks,
Jon
class NumGenerator(beam.PTransform):
def expand(self, input_or_inputs):
ct = 1
while ct <= 10:
time.sleep(.1)
print(f"generating {ct}")
yield ct
ct = ct + 1
def transform(x):
print(f"transforming {x}")
time.sleep(.2)
return x % 2
def test_pipeline():
print("starting")
with Pipeline() as p:
print("in pipeline")
p \
| beam.Create(list(range(1, 11))) \
| 'Add timestamp' >> beam.Map(lambda elem:
beam.window.TimestampedValue(elem, time.time())) \
| 'Windowing' >> beam.WindowInto(
window.FixedWindows(1),
trigger=trigger.Repeatedly(trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.DISCARDING) \
| 'Grouping by value' >> beam.GroupBy(transform) \
| "Out2Print" >> beam.ParDo(print)
print("finished pipeline")
if __name__ == "__main__":
test_pipeline()