Re: beam rebuilds numpy on pipeline run
+Valentyn Tymofieiev This sounds like it's related to ARROW-8983 (pyarrow takes a long time to download after 0.16.0), discussed on the arrow dev list [2]. I'm not sure what would've triggered this to start happening for you today though. [1] https://issues.apache.org/jira/browse/ARROW-8983 [2] https://lists.apache.org/thread.html/r9baa48a9d1517834c285f0f238f29fcf54405cb7cf1e681314239d7f%40%3Cdev.arrow.apache.org%3E On Fri, Oct 9, 2020 at 12:10 PM Ross Vandegrift < ross.vandegr...@cleardata.com> wrote: > Hello, > > Starting today, running a beam pipeline triggers a large reinstallation of > python modules. For some reason, it forces full rebuilds from source - > since > beam depends on numpy, this takes a long time. > > There's nothing strange about my python setup. I'm using python3.7 on > debian > buster with the dataflow runner. My venv is setup like this: > python3 -m venv ~/.venvs/beam > . ~/.venvs/beam/bin/activate > python3 -m pip install --upgrade wheel > python3 -m pip install --upgrade pip setuptools > python3 -m pip install -r requirements.txt > > My requirements.txt has: > apache-beam[gcp]==2.23.0 > boto3==1.15.0 > > When it's building, `ps ax | grep python` shows me this: > /home/ross/.venvs/beam/bin/python -m pip download --dest /tmp/dataflow- > requirements-cache -r requirements.txt --exists-action i --no-binary :all: > > How do I prevent this? It's far too slow to develop with, and our > compliance > folks are likely to prohibit a tool that silently downloads & builds > unknown > code. > > Ross >
Re: Triggers in sideInputs
If you don't have any watermark based triggers then using the global window state and timers makes sense and you can rewindow into a different window after it. A hacky was to be able to rewindow into a different windowing strategy is to output the data to a message queue and ingest the data in the same pipeline. People have used this to create loops and work around some windowing issues like this. The graph would look like: EventStream -> ParDo(GlobalWindow State and Timers) -> ParDo(Output summary to Pubsub) Input summary from Pubsub -> Window.into(My favorite window strategy) -> Additional processing On Fri, Oct 9, 2020 at 1:35 PM Andrés Garagiola wrote: > Hi Luke, > > Thanks for your answer. > > I was studying the state/timer approach. What doesn't convince me, is the > fact that I would have to use a global window in the main input, otherwise > I could lost some states when I don't receive state events for a while. By > using side inputs I keep two independents window strategies, global for > states but fixed (or whatever) for the measurements. Do you see other way > to overcome this? > > Regards > > > > On Fri, Oct 9, 2020, 7:38 PM Luke Cwik wrote: > >> Only data along the main input edge causes that DoFn to be executed >> again. Side inputs don't cause main inputs to be reprocessed. The trigger >> on the side input controls when the side input data becomes available and >> is updated. >> >> You could choose to have a generator that produces an event on the main >> input every hour and you could use it to look at the side input and compute >> all the outputs that you want. >> >> I do think that a solution that uses state and timers would likely fit >> more naturally to solve the problem. This blog[1] is a good starting point. >> >> 1: https://beam.apache.org/blog/timely-processing/ >> >> On Fri, Oct 9, 2020 at 4:15 AM Andrés Garagiola < >> andresgaragi...@gmail.com> wrote: >> >>> Hi all, >>> >>> >>> I have a question regarding triggers in sideinput. I would try to >>> explain my doubt with an example. >>> >>> Suppose we have a data stream of sensor events as follow: >>> >>>- id, timestamp, type, value >>> >>> Where: >>> >>>- id is the sensor id >>>- timestamp is the event timestamp >>>- type could be some of this value (measurement, new-state) >>>- value is a conditional field following this rule. If the type is >>>'measurement' it is a real number with the measured value. If the type >>>field is 'new-state' value it is a string with a new sensor state. >>> >>> We want to produce aggregated data in a time based way (for example by >>> an hour) as follow: >>> >>> Report: >>> >>> { >>> >>> 'timestamp: 'report timestamp', >>> >>> 'sensors': { >>> >>> 'sensorId1': { >>> >>> 'stateA': 'sum of measures in stateA', >>> >>> 'stateB': 'sum of measures in stateB', >>> >>> 'stateC': 'sum of measures in stateC', >>> >>> >>> >>> } >>> >>> } >>> >>> } >>> >>> The state at a given timestamp X is the last known state of that sensor >>> at that moment. We could have late events (suppose max 15' late). >>> >>> >>> I thought this solution: >>> >>> >>> 1) Create a 'sideInput' with the states (in a global window) >>> >>> >>> Window> sideInputWindow = >>> >>> Window.>*into*(new GlobalWindows()) // >>> >>> >>> .triggering(Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1))) >>> // >>> >>> .accumulatingFiredPanes(); >>> >>> >>> PCollectionView>>> >>> sensorStates = >>> >>> >>> >>> >>> Where I have an entry to this map for every sensor, and the TreeMap has >>> an entry for every new state (in the last 15') + 1 we need the previous one >>> in case we didn't receive state changes in the last 15'. >>> >>> >>> 2) Then, the solution aggregates values based on the sideInput >>> >>> >>> Window> mainInputWindow = FixedWindow of an hour >>> with allowed lateness 15' and accumulating fired panes. >>> >>> >>> The solution is not working as I expected in this scenario (row order is >>> processing time order): >>> >>> >>> Timestamp Sensor Type Value Output Expected >>> >>> t0 s1 new-state A s1 {A: 0} s1 {A: 0} >>> >>> t1 s1 measurement 10 s1 {A: 10} s1 {A: 10} >>> >>> t3 s1 measurement 20 s1 {A: 30} s1 {A: 30} >>> >>> t2 s1 new-state B No output s1 {A:10 B:20} >>> >>> >>> I assumed that a new fire in the 'sideInput' would force a new fire in >>> the 'mainInput'. So when t2 arrives, it fires a trigger of the sideInput >>> producing the new state, and then the mainInput will be recomputed >>> producing the expected value. Is my assumption wrong? >>> >>> >>> Thank you >>> >>> >>> >>> >>
Re: Triggers in sideInputs
Hi Luke, Thanks for your answer. I was studying the state/timer approach. What doesn't convince me, is the fact that I would have to use a global window in the main input, otherwise I could lost some states when I don't receive state events for a while. By using side inputs I keep two independents window strategies, global for states but fixed (or whatever) for the measurements. Do you see other way to overcome this? Regards On Fri, Oct 9, 2020, 7:38 PM Luke Cwik wrote: > Only data along the main input edge causes that DoFn to be executed again. > Side inputs don't cause main inputs to be reprocessed. The trigger on the > side input controls when the side input data becomes available and is > updated. > > You could choose to have a generator that produces an event on the main > input every hour and you could use it to look at the side input and compute > all the outputs that you want. > > I do think that a solution that uses state and timers would likely fit > more naturally to solve the problem. This blog[1] is a good starting point. > > 1: https://beam.apache.org/blog/timely-processing/ > > On Fri, Oct 9, 2020 at 4:15 AM Andrés Garagiola > wrote: > >> Hi all, >> >> >> I have a question regarding triggers in sideinput. I would try to explain >> my doubt with an example. >> >> Suppose we have a data stream of sensor events as follow: >> >>- id, timestamp, type, value >> >> Where: >> >>- id is the sensor id >>- timestamp is the event timestamp >>- type could be some of this value (measurement, new-state) >>- value is a conditional field following this rule. If the type is >>'measurement' it is a real number with the measured value. If the type >>field is 'new-state' value it is a string with a new sensor state. >> >> We want to produce aggregated data in a time based way (for example by an >> hour) as follow: >> >> Report: >> >> { >> >> 'timestamp: 'report timestamp', >> >> 'sensors': { >> >> 'sensorId1': { >> >> 'stateA': 'sum of measures in stateA', >> >> 'stateB': 'sum of measures in stateB', >> >> 'stateC': 'sum of measures in stateC', >> >> >> >> } >> >> } >> >> } >> >> The state at a given timestamp X is the last known state of that sensor >> at that moment. We could have late events (suppose max 15' late). >> >> >> I thought this solution: >> >> >> 1) Create a 'sideInput' with the states (in a global window) >> >> >> Window> sideInputWindow = >> >> Window.>*into*(new GlobalWindows()) // >> >> .triggering(Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1))) >> // >> >> .accumulatingFiredPanes(); >> >> >> PCollectionView>>> >> sensorStates = >> >> >> >> >> Where I have an entry to this map for every sensor, and the TreeMap has >> an entry for every new state (in the last 15') + 1 we need the previous one >> in case we didn't receive state changes in the last 15'. >> >> >> 2) Then, the solution aggregates values based on the sideInput >> >> >> Window> mainInputWindow = FixedWindow of an hour >> with allowed lateness 15' and accumulating fired panes. >> >> >> The solution is not working as I expected in this scenario (row order is >> processing time order): >> >> >> Timestamp Sensor Type Value Output Expected >> >> t0 s1 new-state A s1 {A: 0} s1 {A: 0} >> >> t1 s1 measurement 10 s1 {A: 10} s1 {A: 10} >> >> t3 s1 measurement 20 s1 {A: 30} s1 {A: 30} >> >> t2 s1 new-state B No output s1 {A:10 B:20} >> >> >> I assumed that a new fire in the 'sideInput' would force a new fire in >> the 'mainInput'. So when t2 arrives, it fires a trigger of the sideInput >> producing the new state, and then the mainInput will be recomputed >> producing the expected value. Is my assumption wrong? >> >> >> Thank you >> >> >> >> >
beam rebuilds numpy on pipeline run
Hello, Starting today, running a beam pipeline triggers a large reinstallation of python modules. For some reason, it forces full rebuilds from source - since beam depends on numpy, this takes a long time. There's nothing strange about my python setup. I'm using python3.7 on debian buster with the dataflow runner. My venv is setup like this: python3 -m venv ~/.venvs/beam . ~/.venvs/beam/bin/activate python3 -m pip install --upgrade wheel python3 -m pip install --upgrade pip setuptools python3 -m pip install -r requirements.txt My requirements.txt has: apache-beam[gcp]==2.23.0 boto3==1.15.0 When it's building, `ps ax | grep python` shows me this: /home/ross/.venvs/beam/bin/python -m pip download --dest /tmp/dataflow- requirements-cache -r requirements.txt --exists-action i --no-binary :all: How do I prevent this? It's far too slow to develop with, and our compliance folks are likely to prohibit a tool that silently downloads & builds unknown code. Ross
Re: Triggers in sideInputs
Only data along the main input edge causes that DoFn to be executed again. Side inputs don't cause main inputs to be reprocessed. The trigger on the side input controls when the side input data becomes available and is updated. You could choose to have a generator that produces an event on the main input every hour and you could use it to look at the side input and compute all the outputs that you want. I do think that a solution that uses state and timers would likely fit more naturally to solve the problem. This blog[1] is a good starting point. 1: https://beam.apache.org/blog/timely-processing/ On Fri, Oct 9, 2020 at 4:15 AM Andrés Garagiola wrote: > Hi all, > > > I have a question regarding triggers in sideinput. I would try to explain > my doubt with an example. > > Suppose we have a data stream of sensor events as follow: > >- id, timestamp, type, value > > Where: > >- id is the sensor id >- timestamp is the event timestamp >- type could be some of this value (measurement, new-state) >- value is a conditional field following this rule. If the type is >'measurement' it is a real number with the measured value. If the type >field is 'new-state' value it is a string with a new sensor state. > > We want to produce aggregated data in a time based way (for example by an > hour) as follow: > > Report: > > { > > 'timestamp: 'report timestamp', > > 'sensors': { > > 'sensorId1': { > > 'stateA': 'sum of measures in stateA', > > 'stateB': 'sum of measures in stateB', > > 'stateC': 'sum of measures in stateC', > > > > } > > } > > } > > The state at a given timestamp X is the last known state of that sensor at > that moment. We could have late events (suppose max 15' late). > > > I thought this solution: > > > 1) Create a 'sideInput' with the states (in a global window) > > > Window> sideInputWindow = > > Window.>*into*(new GlobalWindows()) // > > .triggering(Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1))) > // > > .accumulatingFiredPanes(); > > > PCollectionView>>> > sensorStates = > > > > > Where I have an entry to this map for every sensor, and the TreeMap has an > entry for every new state (in the last 15') + 1 we need the previous one in > case we didn't receive state changes in the last 15'. > > > 2) Then, the solution aggregates values based on the sideInput > > > Window> mainInputWindow = FixedWindow of an hour > with allowed lateness 15' and accumulating fired panes. > > > The solution is not working as I expected in this scenario (row order is > processing time order): > > > Timestamp Sensor Type Value Output Expected > > t0 s1 new-state A s1 {A: 0} s1 {A: 0} > > t1 s1 measurement 10 s1 {A: 10} s1 {A: 10} > > t3 s1 measurement 20 s1 {A: 30} s1 {A: 30} > > t2 s1 new-state B No output s1 {A:10 B:20} > > > I assumed that a new fire in the 'sideInput' would force a new fire in the > 'mainInput'. So when t2 arrives, it fires a trigger of the sideInput > producing the new state, and then the mainInput will be recomputed > producing the expected value. Is my assumption wrong? > > > Thank you > > > >
Triggers in sideInputs
Hi all, I have a question regarding triggers in sideinput. I would try to explain my doubt with an example. Suppose we have a data stream of sensor events as follow: - id, timestamp, type, value Where: - id is the sensor id - timestamp is the event timestamp - type could be some of this value (measurement, new-state) - value is a conditional field following this rule. If the type is 'measurement' it is a real number with the measured value. If the type field is 'new-state' value it is a string with a new sensor state. We want to produce aggregated data in a time based way (for example by an hour) as follow: Report: { 'timestamp: 'report timestamp', 'sensors': { 'sensorId1': { 'stateA': 'sum of measures in stateA', 'stateB': 'sum of measures in stateB', 'stateC': 'sum of measures in stateC', } } } The state at a given timestamp X is the last known state of that sensor at that moment. We could have late events (suppose max 15' late). I thought this solution: 1) Create a 'sideInput' with the states (in a global window) Window> sideInputWindow = Window.>*into*(new GlobalWindows()) // .triggering(Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1))) // .accumulatingFiredPanes(); PCollectionView>>> sensorStates = Where I have an entry to this map for every sensor, and the TreeMap has an entry for every new state (in the last 15') + 1 we need the previous one in case we didn't receive state changes in the last 15'. 2) Then, the solution aggregates values based on the sideInput Window> mainInputWindow = FixedWindow of an hour with allowed lateness 15' and accumulating fired panes. The solution is not working as I expected in this scenario (row order is processing time order): Timestamp Sensor Type Value Output Expected t0 s1 new-state A s1 {A: 0} s1 {A: 0} t1 s1 measurement 10 s1 {A: 10} s1 {A: 10} t3 s1 measurement 20 s1 {A: 30} s1 {A: 30} t2 s1 new-state B No output s1 {A:10 B:20} I assumed that a new fire in the 'sideInput' would force a new fire in the 'mainInput'. So when t2 arrives, it fires a trigger of the sideInput producing the new state, and then the mainInput will be recomputed producing the expected value. Is my assumption wrong? Thank you