You are correct in that the data may arrive in an unordered way. However, once a window finishes, you are guaranteed to have seen all the data up to that point (modulo late data) and can then confidently compute your ordered cumulative sum.
You could do something like this: def cumulative_sums(key, timestamped_values): running = 0 for _, x in sorted(timestamped_values): yield x sums = (timestamped_data | beam.Map(lambda x, t=DoFn.TimestampParam: (t, x) | beam.WindowInto(...) | beam.GroupByKey() | beam.FlatMapTuple(cumulative_sums)) On Mon, Apr 24, 2023 at 8:23 AM Guagliardo, Patrizio via user <user@beam.apache.org> wrote: > > Hi together, > > > > I want to create a cumulative sum over a time series in a bounded batch > processing in Apache beam with the Python API. What you can do is to write a > cummulative sum with a stateful DoFn, but the problem you would still face is > that you cannot handle it this way when the data in unordered, which is the > case in a PCollection. Is there a way to make the cumulative sum over time in > a batch process? This is what i did (whithout order): > > import apache_beam as beam > > from apache_beam import TimeDomain > > from apache_beam.transforms.userstate import ReadModifyWriteStateSpec, > TimerSpec, CombiningValueStateSpec > > from apache_beam.transforms.window import FixedWindows, GlobalWindows > > > > > > class TimestampedSumAccumulator(beam.DoFn): > > SUM_STATE = 'sum' > > > > def process( > > self, element, > > timestamp=beam.DoFn.TimestampParam, > > sum_state=beam.DoFn.StateParam(ReadModifyWriteStateSpec(SUM_STATE, > beam.coders.FloatCoder())) > > ): > > sum_value = sum_state.read() or 0.0 > > # print(element) > > sum_value += element[1] > > sum_state.write(sum_value) > > yield beam.transforms.window.TimestampedValue(sum_value, timestamp) > > > > > > with beam.Pipeline() as p: > > sums = (p > > | 'Create' >> beam.Create([ > > (3.1, 3), > > (1.5, 1), > > (4.2, 4), > > (5.4, 5), > > (2.3, 2) > > ]) > > | 'AddTimestamps' >> beam.Map(lambda x: > beam.transforms.window.TimestampedValue(x[0], x[1])) > > | 'AddKeys' >> beam.Map(lambda x: ('sum_key', x)) > > | 'Window' >> beam.WindowInto(FixedWindows(10)) > > | 'Accumulate' >> beam.ParDo(TimestampedSumAccumulator()) > > | 'Print' >> beam.Map(print)) > > > > How could that be done to make the cumulative sum in the “right” order? > > > > Thank you very much in advance. > > > > > ________________________________ > This e-mail and any attachments may be confidential or legally privileged. If > you received this message in error or are not the intended recipient, you > should destroy the e-mail message and any attachments or copies, and you are > prohibited from retaining, distributing, disclosing or using any information > contained herein. Please inform us of the erroneous delivery by return > e-mail. Thank you for your cooperation. For more information on how we use > your personal data please see our Privacy Notice.