Hi together,

I want to create a cumulative sum over a time series in a bounded batch 
processing in Apache beam with the Python API. What you can do is to write a 
cummulative sum with a stateful DoFn, but the problem you would still face is 
that you cannot handle it this way when the data in unordered, which is the 
case in a PCollection. Is there a way to make the cumulative sum over time in a 
batch process? This is what i did (whithout order):
import apache_beam as beam
from apache_beam import TimeDomain
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec, 
TimerSpec, CombiningValueStateSpec
from apache_beam.transforms.window import FixedWindows, GlobalWindows


class TimestampedSumAccumulator(beam.DoFn):
    SUM_STATE = 'sum'

    def process(
        self, element,
        timestamp=beam.DoFn.TimestampParam,
        sum_state=beam.DoFn.StateParam(ReadModifyWriteStateSpec(SUM_STATE, 
beam.coders.FloatCoder()))
    ):
        sum_value = sum_state.read() or 0.0
        # print(element)
        sum_value += element[1]
        sum_state.write(sum_value)
        yield beam.transforms.window.TimestampedValue(sum_value, timestamp)


with beam.Pipeline() as p:
    sums = (p
        | 'Create' >> beam.Create([
            (3.1, 3),
            (1.5, 1),
            (4.2, 4),
            (5.4, 5),
            (2.3, 2)
        ])
        | 'AddTimestamps' >> beam.Map(lambda x: 
beam.transforms.window.TimestampedValue(x[0], x[1]))
        | 'AddKeys' >> beam.Map(lambda x: ('sum_key', x))
        | 'Window' >> beam.WindowInto(FixedWindows(10))
        | 'Accumulate' >> beam.ParDo(TimestampedSumAccumulator())
        | 'Print' >> beam.Map(print))

How could that be done to make the cumulative sum in the “right” order?

Thank you very much in advance.


________________________________
This e-mail and any attachments may be confidential or legally privileged. If 
you received this message in error or are not the intended recipient, you 
should destroy the e-mail message and any attachments or copies, and you are 
prohibited from retaining, distributing, disclosing or using any information 
contained herein. Please inform us of the erroneous delivery by return e-mail. 
Thank you for your cooperation. For more information on how we use your 
personal data please see our Privacy 
Notice<https://www.oliverwyman.com/policies/privacy-notice.html>.

Reply via email to