Re: beam rebuilds numpy on pipeline run

2020-10-09 Thread Brian Hulette
+Valentyn Tymofieiev 

This sounds like it's related to ARROW-8983 (pyarrow takes a long time to
download after 0.16.0), discussed on the arrow dev list [2]. I'm not sure
what would've triggered this to start happening for you today though.

[1] https://issues.apache.org/jira/browse/ARROW-8983
[2]
https://lists.apache.org/thread.html/r9baa48a9d1517834c285f0f238f29fcf54405cb7cf1e681314239d7f%40%3Cdev.arrow.apache.org%3E

On Fri, Oct 9, 2020 at 12:10 PM Ross Vandegrift <
ross.vandegr...@cleardata.com> wrote:

> Hello,
>
> Starting today, running a beam pipeline triggers a large reinstallation of
> python modules.  For some reason, it forces full rebuilds from source -
> since
> beam depends on numpy, this takes a long time.
>
> There's nothing strange about my python setup.  I'm using python3.7 on
> debian
> buster with the dataflow runner.  My venv is setup like this:
>  python3 -m venv ~/.venvs/beam
>  . ~/.venvs/beam/bin/activate
>  python3 -m pip install --upgrade wheel
>  python3 -m pip install --upgrade pip setuptools
>  python3 -m pip install -r requirements.txt
>
> My requirements.txt has:
>   apache-beam[gcp]==2.23.0
>   boto3==1.15.0
>
> When it's building, `ps ax | grep python` shows me this:
>   /home/ross/.venvs/beam/bin/python -m pip download --dest /tmp/dataflow-
> requirements-cache -r requirements.txt --exists-action i --no-binary :all:
>
> How do I prevent this?  It's far too slow to develop with, and our
> compliance
> folks are likely to prohibit a tool that silently downloads & builds
> unknown
> code.
>
> Ross
>


Re: Triggers in sideInputs

2020-10-09 Thread Luke Cwik
If you don't have any watermark based triggers then using the global window
state and timers makes sense and you can rewindow into a different window
after it. A hacky was to be able to rewindow into a different windowing
strategy is to output the data to a message queue and ingest the data in
the same pipeline. People have used this to create loops and work around
some windowing issues like this.

The graph would look like:
EventStream -> ParDo(GlobalWindow State and Timers) -> ParDo(Output summary
to Pubsub)
Input summary from Pubsub -> Window.into(My favorite window strategy) ->
Additional processing

On Fri, Oct 9, 2020 at 1:35 PM Andrés Garagiola 
wrote:

> Hi Luke,
>
> Thanks for your answer.
>
> I was studying the state/timer approach. What doesn't convince me, is the
> fact that I would have to use a global window in the main input, otherwise
> I could lost some states when I don't receive state events for a while. By
> using side inputs I keep two independents window strategies, global for
> states but fixed (or whatever) for the measurements. Do you see other way
> to overcome this?
>
> Regards
>
>
>
> On Fri, Oct 9, 2020, 7:38 PM Luke Cwik  wrote:
>
>> Only data along the main input edge causes that DoFn to be executed
>> again. Side inputs don't cause main inputs to be reprocessed. The trigger
>> on the side input controls when the side input data becomes available and
>> is updated.
>>
>> You could choose to have a generator that produces an event on the main
>> input every hour and you could use it to look at the side input and compute
>> all the outputs that you want.
>>
>> I do think that a solution that uses state and timers would likely fit
>> more naturally to solve the problem. This blog[1] is a good starting point.
>>
>> 1: https://beam.apache.org/blog/timely-processing/
>>
>> On Fri, Oct 9, 2020 at 4:15 AM Andrés Garagiola <
>> andresgaragi...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>>
>>> I have a question regarding triggers in sideinput. I would try to
>>> explain my doubt with an example.
>>>
>>> Suppose we have a data stream of sensor events as follow:
>>>
>>>- id, timestamp, type, value
>>>
>>> Where:
>>>
>>>- id is the sensor id
>>>- timestamp is the event timestamp
>>>- type could be some of this value (measurement, new-state)
>>>- value is a conditional field following this rule. If the type is
>>>'measurement' it is a real number with the measured value. If the type
>>>field is 'new-state' value it is a string with a new sensor state.
>>>
>>> We want to produce aggregated data in a time based way (for example by
>>> an hour) as follow:
>>>
>>> Report:
>>>
>>> {
>>>
>>> 'timestamp: 'report timestamp',
>>>
>>> 'sensors': {
>>>
>>> 'sensorId1': {
>>>
>>> 'stateA': 'sum of measures in stateA',
>>>
>>> 'stateB': 'sum of measures in stateB',
>>>
>>> 'stateC': 'sum of measures in stateC',
>>>
>>> 
>>>
>>> }
>>>
>>> }
>>>
>>> }
>>>
>>> The state at a given timestamp X is the last known state of that sensor
>>> at that moment. We could have late events (suppose max 15' late).
>>>
>>>
>>> I thought this solution:
>>>
>>>
>>> 1) Create a 'sideInput' with the states (in a global window)
>>>
>>>
>>> Window> sideInputWindow =
>>>
>>> Window.>*into*(new GlobalWindows()) //
>>>
>>> 
>>> .triggering(Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1)))
>>> //
>>>
>>> .accumulatingFiredPanes();
>>>
>>>
>>> PCollectionView>>>
>>> sensorStates =
>>>
>>> 
>>>
>>>
>>> Where I have an entry to this map for every sensor, and the TreeMap has
>>> an entry for every new state (in the last 15') + 1 we need the previous one
>>> in case we didn't receive state changes in the last 15'.
>>>
>>>
>>> 2) Then, the solution aggregates values based on the sideInput
>>>
>>>
>>> Window> mainInputWindow = FixedWindow of an hour
>>> with allowed lateness 15' and accumulating fired panes.
>>>
>>>
>>> The solution is not working as I expected in this scenario (row order is
>>> processing time order):
>>>
>>>
>>> Timestamp Sensor Type Value Output Expected
>>>
>>> t0 s1 new-state A s1 {A: 0} s1 {A: 0}
>>>
>>> t1 s1 measurement 10 s1 {A: 10} s1 {A: 10}
>>>
>>> t3 s1 measurement 20 s1 {A: 30} s1 {A: 30}
>>>
>>> t2 s1 new-state B No output s1 {A:10 B:20}
>>>
>>>
>>> I assumed that a new fire in the 'sideInput' would force a new fire in
>>> the 'mainInput'. So when t2 arrives, it fires a trigger of the sideInput
>>> producing the new state, and then the mainInput will be recomputed
>>> producing the expected value. Is my assumption wrong?
>>>
>>>
>>> Thank you
>>>
>>>
>>>
>>>
>>


Re: Triggers in sideInputs

2020-10-09 Thread Andrés Garagiola
Hi Luke,

Thanks for your answer.

I was studying the state/timer approach. What doesn't convince me, is the
fact that I would have to use a global window in the main input, otherwise
I could lost some states when I don't receive state events for a while. By
using side inputs I keep two independents window strategies, global for
states but fixed (or whatever) for the measurements. Do you see other way
to overcome this?

Regards



On Fri, Oct 9, 2020, 7:38 PM Luke Cwik  wrote:

> Only data along the main input edge causes that DoFn to be executed again.
> Side inputs don't cause main inputs to be reprocessed. The trigger on the
> side input controls when the side input data becomes available and is
> updated.
>
> You could choose to have a generator that produces an event on the main
> input every hour and you could use it to look at the side input and compute
> all the outputs that you want.
>
> I do think that a solution that uses state and timers would likely fit
> more naturally to solve the problem. This blog[1] is a good starting point.
>
> 1: https://beam.apache.org/blog/timely-processing/
>
> On Fri, Oct 9, 2020 at 4:15 AM Andrés Garagiola 
> wrote:
>
>> Hi all,
>>
>>
>> I have a question regarding triggers in sideinput. I would try to explain
>> my doubt with an example.
>>
>> Suppose we have a data stream of sensor events as follow:
>>
>>- id, timestamp, type, value
>>
>> Where:
>>
>>- id is the sensor id
>>- timestamp is the event timestamp
>>- type could be some of this value (measurement, new-state)
>>- value is a conditional field following this rule. If the type is
>>'measurement' it is a real number with the measured value. If the type
>>field is 'new-state' value it is a string with a new sensor state.
>>
>> We want to produce aggregated data in a time based way (for example by an
>> hour) as follow:
>>
>> Report:
>>
>> {
>>
>> 'timestamp: 'report timestamp',
>>
>> 'sensors': {
>>
>> 'sensorId1': {
>>
>> 'stateA': 'sum of measures in stateA',
>>
>> 'stateB': 'sum of measures in stateB',
>>
>> 'stateC': 'sum of measures in stateC',
>>
>> 
>>
>> }
>>
>> }
>>
>> }
>>
>> The state at a given timestamp X is the last known state of that sensor
>> at that moment. We could have late events (suppose max 15' late).
>>
>>
>> I thought this solution:
>>
>>
>> 1) Create a 'sideInput' with the states (in a global window)
>>
>>
>> Window> sideInputWindow =
>>
>> Window.>*into*(new GlobalWindows()) //
>>
>> .triggering(Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1)))
>> //
>>
>> .accumulatingFiredPanes();
>>
>>
>> PCollectionView>>>
>> sensorStates =
>>
>> 
>>
>>
>> Where I have an entry to this map for every sensor, and the TreeMap has
>> an entry for every new state (in the last 15') + 1 we need the previous one
>> in case we didn't receive state changes in the last 15'.
>>
>>
>> 2) Then, the solution aggregates values based on the sideInput
>>
>>
>> Window> mainInputWindow = FixedWindow of an hour
>> with allowed lateness 15' and accumulating fired panes.
>>
>>
>> The solution is not working as I expected in this scenario (row order is
>> processing time order):
>>
>>
>> Timestamp Sensor Type Value Output Expected
>>
>> t0 s1 new-state A s1 {A: 0} s1 {A: 0}
>>
>> t1 s1 measurement 10 s1 {A: 10} s1 {A: 10}
>>
>> t3 s1 measurement 20 s1 {A: 30} s1 {A: 30}
>>
>> t2 s1 new-state B No output s1 {A:10 B:20}
>>
>>
>> I assumed that a new fire in the 'sideInput' would force a new fire in
>> the 'mainInput'. So when t2 arrives, it fires a trigger of the sideInput
>> producing the new state, and then the mainInput will be recomputed
>> producing the expected value. Is my assumption wrong?
>>
>>
>> Thank you
>>
>>
>>
>>
>


beam rebuilds numpy on pipeline run

2020-10-09 Thread Ross Vandegrift
Hello,

Starting today, running a beam pipeline triggers a large reinstallation of
python modules.  For some reason, it forces full rebuilds from source - since
beam depends on numpy, this takes a long time.

There's nothing strange about my python setup.  I'm using python3.7 on debian
buster with the dataflow runner.  My venv is setup like this:
 python3 -m venv ~/.venvs/beam
 . ~/.venvs/beam/bin/activate
 python3 -m pip install --upgrade wheel
 python3 -m pip install --upgrade pip setuptools
 python3 -m pip install -r requirements.txt

My requirements.txt has:
  apache-beam[gcp]==2.23.0
  boto3==1.15.0

When it's building, `ps ax | grep python` shows me this:
  /home/ross/.venvs/beam/bin/python -m pip download --dest /tmp/dataflow-
requirements-cache -r requirements.txt --exists-action i --no-binary :all:

How do I prevent this?  It's far too slow to develop with, and our compliance
folks are likely to prohibit a tool that silently downloads & builds unknown
code.

Ross


Re: Triggers in sideInputs

2020-10-09 Thread Luke Cwik
Only data along the main input edge causes that DoFn to be executed again.
Side inputs don't cause main inputs to be reprocessed. The trigger on the
side input controls when the side input data becomes available and is
updated.

You could choose to have a generator that produces an event on the main
input every hour and you could use it to look at the side input and compute
all the outputs that you want.

I do think that a solution that uses state and timers would likely fit more
naturally to solve the problem. This blog[1] is a good starting point.

1: https://beam.apache.org/blog/timely-processing/

On Fri, Oct 9, 2020 at 4:15 AM Andrés Garagiola 
wrote:

> Hi all,
>
>
> I have a question regarding triggers in sideinput. I would try to explain
> my doubt with an example.
>
> Suppose we have a data stream of sensor events as follow:
>
>- id, timestamp, type, value
>
> Where:
>
>- id is the sensor id
>- timestamp is the event timestamp
>- type could be some of this value (measurement, new-state)
>- value is a conditional field following this rule. If the type is
>'measurement' it is a real number with the measured value. If the type
>field is 'new-state' value it is a string with a new sensor state.
>
> We want to produce aggregated data in a time based way (for example by an
> hour) as follow:
>
> Report:
>
> {
>
> 'timestamp: 'report timestamp',
>
> 'sensors': {
>
> 'sensorId1': {
>
> 'stateA': 'sum of measures in stateA',
>
> 'stateB': 'sum of measures in stateB',
>
> 'stateC': 'sum of measures in stateC',
>
> 
>
> }
>
> }
>
> }
>
> The state at a given timestamp X is the last known state of that sensor at
> that moment. We could have late events (suppose max 15' late).
>
>
> I thought this solution:
>
>
> 1) Create a 'sideInput' with the states (in a global window)
>
>
> Window> sideInputWindow =
>
> Window.>*into*(new GlobalWindows()) //
>
> .triggering(Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1)))
> //
>
> .accumulatingFiredPanes();
>
>
> PCollectionView>>>
> sensorStates =
>
> 
>
>
> Where I have an entry to this map for every sensor, and the TreeMap has an
> entry for every new state (in the last 15') + 1 we need the previous one in
> case we didn't receive state changes in the last 15'.
>
>
> 2) Then, the solution aggregates values based on the sideInput
>
>
> Window> mainInputWindow = FixedWindow of an hour
> with allowed lateness 15' and accumulating fired panes.
>
>
> The solution is not working as I expected in this scenario (row order is
> processing time order):
>
>
> Timestamp Sensor Type Value Output Expected
>
> t0 s1 new-state A s1 {A: 0} s1 {A: 0}
>
> t1 s1 measurement 10 s1 {A: 10} s1 {A: 10}
>
> t3 s1 measurement 20 s1 {A: 30} s1 {A: 30}
>
> t2 s1 new-state B No output s1 {A:10 B:20}
>
>
> I assumed that a new fire in the 'sideInput' would force a new fire in the
> 'mainInput'. So when t2 arrives, it fires a trigger of the sideInput
> producing the new state, and then the mainInput will be recomputed
> producing the expected value. Is my assumption wrong?
>
>
> Thank you
>
>
>
>


Triggers in sideInputs

2020-10-09 Thread Andrés Garagiola
Hi all,


I have a question regarding triggers in sideinput. I would try to explain
my doubt with an example.

Suppose we have a data stream of sensor events as follow:

   - id, timestamp, type, value

Where:

   - id is the sensor id
   - timestamp is the event timestamp
   - type could be some of this value (measurement, new-state)
   - value is a conditional field following this rule. If the type is
   'measurement' it is a real number with the measured value. If the type
   field is 'new-state' value it is a string with a new sensor state.

We want to produce aggregated data in a time based way (for example by an
hour) as follow:

Report:

{

'timestamp: 'report timestamp',

'sensors': {

'sensorId1': {

'stateA': 'sum of measures in stateA',

'stateB': 'sum of measures in stateB',

'stateC': 'sum of measures in stateC',



}

}

}

The state at a given timestamp X is the last known state of that sensor at
that moment. We could have late events (suppose max 15' late).


I thought this solution:


1) Create a 'sideInput' with the states (in a global window)


Window> sideInputWindow =

Window.>*into*(new GlobalWindows()) //

.triggering(Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1)))
//

.accumulatingFiredPanes();


PCollectionView>>>
sensorStates =




Where I have an entry to this map for every sensor, and the TreeMap has an
entry for every new state (in the last 15') + 1 we need the previous one in
case we didn't receive state changes in the last 15'.


2) Then, the solution aggregates values based on the sideInput


Window> mainInputWindow = FixedWindow of an hour
with allowed lateness 15' and accumulating fired panes.


The solution is not working as I expected in this scenario (row order is
processing time order):


Timestamp Sensor Type Value Output Expected

t0 s1 new-state A s1 {A: 0} s1 {A: 0}

t1 s1 measurement 10 s1 {A: 10} s1 {A: 10}

t3 s1 measurement 20 s1 {A: 30} s1 {A: 30}

t2 s1 new-state B No output s1 {A:10 B:20}


I assumed that a new fire in the 'sideInput' would force a new fire in the
'mainInput'. So when t2 arrives, it fires a trigger of the sideInput
producing the new state, and then the mainInput will be recomputed
producing the expected value. Is my assumption wrong?


Thank you