Hi together, I want to create a cumulative sum over a time series in a bounded batch processing in Apache beam with the Python API. What you can do is to write a cummulative sum with a stateful DoFn, but the problem you would still face is that you cannot handle it this way when the data in unordered, which is the case in a PCollection. Is there a way to make the cumulative sum over time in a batch process? This is what i did (whithout order): import apache_beam as beam from apache_beam import TimeDomain from apache_beam.transforms.userstate import ReadModifyWriteStateSpec, TimerSpec, CombiningValueStateSpec from apache_beam.transforms.window import FixedWindows, GlobalWindows
class TimestampedSumAccumulator(beam.DoFn): SUM_STATE = 'sum' def process( self, element, timestamp=beam.DoFn.TimestampParam, sum_state=beam.DoFn.StateParam(ReadModifyWriteStateSpec(SUM_STATE, beam.coders.FloatCoder())) ): sum_value = sum_state.read() or 0.0 # print(element) sum_value += element[1] sum_state.write(sum_value) yield beam.transforms.window.TimestampedValue(sum_value, timestamp) with beam.Pipeline() as p: sums = (p | 'Create' >> beam.Create([ (3.1, 3), (1.5, 1), (4.2, 4), (5.4, 5), (2.3, 2) ]) | 'AddTimestamps' >> beam.Map(lambda x: beam.transforms.window.TimestampedValue(x[0], x[1])) | 'AddKeys' >> beam.Map(lambda x: ('sum_key', x)) | 'Window' >> beam.WindowInto(FixedWindows(10)) | 'Accumulate' >> beam.ParDo(TimestampedSumAccumulator()) | 'Print' >> beam.Map(print)) How could that be done to make the cumulative sum in the “right” order? Thank you very much in advance. ________________________________ This e-mail and any attachments may be confidential or legally privileged. If you received this message in error or are not the intended recipient, you should destroy the e-mail message and any attachments or copies, and you are prohibited from retaining, distributing, disclosing or using any information contained herein. Please inform us of the erroneous delivery by return e-mail. Thank you for your cooperation. For more information on how we use your personal data please see our Privacy Notice<https://www.oliverwyman.com/policies/privacy-notice.html>.