Ok great, so what I did at the end was:

def cumulative_sums(key, timestamped_values):
  running = 0
  for x in sorted(timestamped_values, key=lambda x: x[1]):
    running += x[0]
    yield key, running


with beam.Pipeline() as p:
    sums = (p
        | 'Create' >> beam.Create([
            (3.1, 3),
            (4.2, 4),
            (5.4, 5),
            (2.3, 2),
            (1.5, 6)
        ])
        | 'AddTimestamps' >> beam.Map(lambda x: 
beam.transforms.window.TimestampedValue(x, x[1]))
        | beam.Map(lambda x: ('key', x))
        | 'Window' >> beam.WindowInto(FixedWindows(11))
        | beam.GroupByKey()
        | beam.FlatMapTuple(cumulative_sums)
        | 'Print' >> beam.Map(print))

However I am asking if there is a way to take a state from one window to 
another. I am asking this as I would like to do also other transformations 
where for example you take one value from one time step to the next for 
whatever reason: calculate timediff, fill in missing value (taken the value 
from time step before), etc etc...Can that be done? I have read something about 
looping timers, but could no get the details in Python. Is there a manner to do 
this?

Thanks a lot.

-----Original Message-----
From: Robert Bradshaw <rober...@google.com>
Sent: lunes, 24 de abril de 2023 18:00
To: user@beam.apache.org; Guagliardo, Patrizio 
<patrizio.guaglia...@oliverwyman.com>
Subject: Re: [Question] - Time series - cumulative sum in right order with 
python api in a batch process


CAUTION: This email originated outside the company. Do not click links or open 
attachments unless you are expecting them from the sender.



You are correct in that the data may arrive in an unordered way.
However, once a window finishes, you are guaranteed to have seen all the data 
up to that point (modulo late data) and can then confidently compute your 
ordered cumulative sum.

You could do something like this:

def cumulative_sums(key, timestamped_values):
  running = 0
  for _, x in sorted(timestamped_values):
    yield x

sums = (timestamped_data
  | beam.Map(lambda x, t=DoFn.TimestampParam: (t, x)
  | beam.WindowInto(...)
  | beam.GroupByKey()
  | beam.FlatMapTuple(cumulative_sums))



On Mon, Apr 24, 2023 at 8:23 AM Guagliardo, Patrizio via user 
<user@beam.apache.org> wrote:
>
> Hi together,
>
>
>
> I want to create a cumulative sum over a time series in a bounded batch 
> processing in Apache beam with the Python API. What you can do is to write a 
> cummulative sum with a stateful DoFn, but the problem you would still face is 
> that you cannot handle it this way when the data in unordered, which is the 
> case in a PCollection. Is there a way to make the cumulative sum over time in 
> a batch process? This is what i did (whithout order):
>
> import apache_beam as beam
>
> from apache_beam import TimeDomain
>
> from apache_beam.transforms.userstate import ReadModifyWriteStateSpec,
> TimerSpec, CombiningValueStateSpec
>
> from apache_beam.transforms.window import FixedWindows, GlobalWindows
>
>
>
>
>
> class TimestampedSumAccumulator(beam.DoFn):
>
>     SUM_STATE = 'sum'
>
>
>
>     def process(
>
>         self, element,
>
>         timestamp=beam.DoFn.TimestampParam,
>
>
> sum_state=beam.DoFn.StateParam(ReadModifyWriteStateSpec(SUM_STATE,
> beam.coders.FloatCoder()))
>
>     ):
>
>         sum_value = sum_state.read() or 0.0
>
>         # print(element)
>
>         sum_value += element[1]
>
>         sum_state.write(sum_value)
>
>         yield beam.transforms.window.TimestampedValue(sum_value,
> timestamp)
>
>
>
>
>
> with beam.Pipeline() as p:
>
>     sums = (p
>
>         | 'Create' >> beam.Create([
>
>             (3.1, 3),
>
>             (1.5, 1),
>
>             (4.2, 4),
>
>             (5.4, 5),
>
>             (2.3, 2)
>
>         ])
>
>         | 'AddTimestamps' >> beam.Map(lambda x:
> beam.transforms.window.TimestampedValue(x[0], x[1]))
>
>         | 'AddKeys' >> beam.Map(lambda x: ('sum_key', x))
>
>         | 'Window' >> beam.WindowInto(FixedWindows(10))
>
>         | 'Accumulate' >> beam.ParDo(TimestampedSumAccumulator())
>
>         | 'Print' >> beam.Map(print))
>
>
>
> How could that be done to make the cumulative sum in the “right” order?
>
>
>
> Thank you very much in advance.
>
>
>
>
> ________________________________
> This e-mail and any attachments may be confidential or legally privileged. If 
> you received this message in error or are not the intended recipient, you 
> should destroy the e-mail message and any attachments or copies, and you are 
> prohibited from retaining, distributing, disclosing or using any information 
> contained herein. Please inform us of the erroneous delivery by return 
> e-mail. Thank you for your cooperation. For more information on how we use 
> your personal data please see our Privacy Notice.

________________________________
This e-mail and any attachments may be confidential or legally privileged. If 
you received this message in error or are not the intended recipient, you 
should destroy the e-mail message and any attachments or copies, and you are 
prohibited from retaining, distributing, disclosing or using any information 
contained herein. Please inform us of the erroneous delivery by return e-mail. 
Thank you for your cooperation. For more information on how we use your 
personal data please see our Privacy 
Notice<https://www.oliverwyman.com/policies/privacy-notice.html>.

Reply via email to