Re: periodic impulse bug

2020-12-21 Thread Manninger, Matyas
Hey Ning,

Thanks for the answer. Shouldn't the start and end be timestamps? If I set
them to ints manually what is now for start? I assume max int would be end
if I never want it to end?

On Mon, 21 Dec 2020 at 18:45, Ning Kang  wrote:

> I also ran into this issue some time ago. Couldn't figure out why, but
> explicitly setting the end and start to some integer value when building
> the `PeriodicImpulse` transform could be a workaround.
>
> On Mon, Dec 21, 2020 at 4:19 AM Manninger, Matyas <
> matyas.mannin...@veolia.com> wrote:
>
>> Dear Beam users,
>>
>> In the python SDK I tried using the PeriodicImpulse but seems like there
>> is an internal bug. In the periodicsequence.py on line 42 there is a
>> division where a type Duration is being divided, but no division operation
>> is defined. What am I missing? Is there a workaround to this? Here is the
>> error message that lead to this "discovery":
>> File
>> "/home/user/.virtualenvs/dflowenv/lib/python3.8/site-packages/apache_beam/transforms/periodicsequence.py",
>> line 42, in initial_restriction
>>total_outputs = math.ceil((end - start) / interval)
>> TypeError: unsupported operand type(s) for /: 'Duration' and 'Duration'
>> [while running 'read/periodic_impulse/GenSequence/PairWithRestriction']
>>
>>


Re: periodic impulse bug

2020-12-21 Thread Ning Kang
I also ran into this issue some time ago. Couldn't figure out why, but
explicitly setting the end and start to some integer value when building
the `PeriodicImpulse` transform could be a workaround.

On Mon, Dec 21, 2020 at 4:19 AM Manninger, Matyas <
matyas.mannin...@veolia.com> wrote:

> Dear Beam users,
>
> In the python SDK I tried using the PeriodicImpulse but seems like there
> is an internal bug. In the periodicsequence.py on line 42 there is a
> division where a type Duration is being divided, but no division operation
> is defined. What am I missing? Is there a workaround to this? Here is the
> error message that lead to this "discovery":
> File
> "/home/user/.virtualenvs/dflowenv/lib/python3.8/site-packages/apache_beam/transforms/periodicsequence.py",
> line 42, in initial_restriction
>total_outputs = math.ceil((end - start) / interval)
> TypeError: unsupported operand type(s) for /: 'Duration' and 'Duration'
> [while running 'read/periodic_impulse/GenSequence/PairWithRestriction']
>
>


Re: Help measuring upcoming performance increase in flink runner on production systems

2020-12-21 Thread Robert Bradshaw
I agree. Borrowing the mutation detection from the direct runner as an
intermediate point sounds like a good idea.

On Mon, Dec 21, 2020 at 8:57 AM Kenneth Knowles  wrote:

> I really think we should make a plan to make this the default. If you test
> with the DirectRunner it will do mutation checking and catch pipelines that
> depend on the runner cloning every element. (also the DirectRunner doesn't
> clone). Since the cloning is similar in cost to the mutation detection,
> could we actually add some mutation detection to FlinkRunner pipelines and
> also directly warn if a pipeline is depending on it?
>
> Kenn
>
> On Mon, Dec 21, 2020 at 5:04 AM Teodor Spæren 
> wrote:
>
>> Hey! My option is not default as of now, since it can break pipelines
>> which rely on the faulty flink implementation. I'm creating my own
>> benchmarks locally and will run against those, but the idea of adding it
>> to the official benchmark runs sounds interesting, thanks for bringing
>> it up!
>>
>> Teodor
>>
>> On Tue, Dec 15, 2020 at 06:51:38PM -0800, Ahmet Altay wrote:
>> >Hi Teodor,
>> >
>> >Thank you for working on this. If I remember correctly, there were some
>> >opportunities to improve in the previous paper (e.g. not focusing
>> >deprecated runners, long running benchmarks, varying data sizes). And I
>> am
>> >excited that you are keeping the community as part of your research
>> process
>> >and we will be happy to help you where we can.
>> >
>> >Related to your question. Was the new option used by default? If that
>> >is the case you will probably see its impact on the metrics dashboard
>> [1].
>> >And if it is not on by default, you can add your variant as a new
>> benchmark
>> >and compare the difference across many runs in a controlled benchmarking
>> >environment. Would that help?
>> >
>> >Ahmet
>> >
>> >[1] http://metrics.beam.apache.org/d/1/getting-started?orgId=1
>> >
>> >
>> >On Tue, Dec 15, 2020 at 5:48 AM Teodor Spæren > >
>> >wrote:
>> >
>> >> Hey!
>> >>
>> >> Yeah, that paper was what prompted my master thesis! I definitivly will
>> >> post here, once I get more data :)
>> >>
>> >> Teodor
>> >>
>> >> On Mon, Dec 14, 2020 at 06:56:30AM -0600, Rion Williams wrote:
>> >> >Hi Teodor,
>> >> >
>> >> >Although I’m sure you’ve come across it, this might have some valuable
>> >> resources or methodologies to consider as you explore this a bit more:
>> >> >
>> >> >https://arxiv.org/pdf/1907.08302.pdf
>> >> >
>> >> >I’m looking forward to reading about your finding, especially using a
>> >> more recent iteration of Beam!
>> >> >
>> >> >Rion
>> >> >
>> >> >> On Dec 14, 2020, at 6:37 AM, Teodor Spæren <
>> teodor_spae...@riseup.net>
>> >> wrote:
>> >> >>
>> >> >> Just bumping this so people see it now that 2.26.0 is out :)
>> >> >>
>> >> >>> On Wed, Nov 25, 2020 at 11:09:52AM +0100, Teodor Spæren wrote:
>> >> >>> Hey!
>> >> >>>
>> >> >>> My name is Teodor Spæren and I'm writing a master thesis
>> investigating
>> >> the performance overhead of using Beam instead of using the underlying
>> >> systems directly. My focus has been on Flink and I've made a discovery
>> >> about some unnecessary copying between operators in the Flink
>> runner[1][2].
>> >> I wrote a fixed for this and it got accepted and merged,
>> >> >>> and will be in the upcoming 2.26.0 release[3].
>> >> >>>
>> >> >>> I'm writing this email to ask if anyone on these mailing lists
>> would
>> >> be willing to send me some result of applying this option when the new
>> >> version of beam releases. Anything will be very much appreciated,
>> stories,
>> >> screenshots of performance monitoring before and after, hard numbers,
>> >> anything! If you include the cluster size and the workload that would
>> be
>> >> awesome too! My master thesis is set to be complete the coming summer,
>> so
>> >> there is no real hurry :)
>> >> >>>
>> >> >>> The thesis will be freely accessible[4] and I hope that these
>> findings
>> >> will be of help to the beam community. If anyone wishes to submit
>> stories,
>> >> but remain anonymous that is also ok :)
>> >> >>>
>> >> >>> The best way to contact me would be to send an email my way here,
>> or
>> >> on teod...@mail.uio.no.
>> >> >>>
>> >> >>> Any help is appreciated, thanks for your attention!
>> >> >>>
>> >> >>> Best regards,
>> >> >>> Teodor Spæren
>> >> >>>
>> >> >>>
>> >> >>> [1]:
>> >>
>> https://lists.apache.org/thread.html/r24129dba98782e1cf4d18ec738ab9714dceb05ac23f13adfac5baad1%40%3Cdev.beam.apache.org%3E
>> >> >>> [2]: https://issues.apache.org/jira/browse/BEAM-11146
>> >> >>> [3]: https://github.com/apache/beam/pull/13240
>> >> >>> [4]: https://www.duo.uio.no/
>> >>
>>
>


Session window ad sideinput

2020-12-21 Thread Manninger, Matyas
Dear Beam users,

I am writing a streaming pipeline that has static tables as side inputs.
These tables change from time to time and I want the side inputs to be
updated at some intervals. I am planning on triggering the update by
sending a message through pubsub. When a new message arrives, the side
input should be updated. I would like to do this with session windows but
on the beam documentation page session windows are depicted as lasting from
the first input in the window to the last input in the window and the gap
seems to not belong to any window. So if I would use this as a side
input how would my main stream be matched to windows? If I send a signal
every day and the gap is set to 1 hour, for example, would the window close
after 1 hour and for the next 23 hours all the elements in the main
stream would be matched to no side input?

Thanks for any help or tips on how to solve this or what is the expected
behaviour.

BR,
Matyas Manninger


Re: Help measuring upcoming performance increase in flink runner on production systems

2020-12-21 Thread Kenneth Knowles
I really think we should make a plan to make this the default. If you test
with the DirectRunner it will do mutation checking and catch pipelines that
depend on the runner cloning every element. (also the DirectRunner doesn't
clone). Since the cloning is similar in cost to the mutation detection,
could we actually add some mutation detection to FlinkRunner pipelines and
also directly warn if a pipeline is depending on it?

Kenn

On Mon, Dec 21, 2020 at 5:04 AM Teodor Spæren 
wrote:

> Hey! My option is not default as of now, since it can break pipelines
> which rely on the faulty flink implementation. I'm creating my own
> benchmarks locally and will run against those, but the idea of adding it
> to the official benchmark runs sounds interesting, thanks for bringing
> it up!
>
> Teodor
>
> On Tue, Dec 15, 2020 at 06:51:38PM -0800, Ahmet Altay wrote:
> >Hi Teodor,
> >
> >Thank you for working on this. If I remember correctly, there were some
> >opportunities to improve in the previous paper (e.g. not focusing
> >deprecated runners, long running benchmarks, varying data sizes). And I am
> >excited that you are keeping the community as part of your research
> process
> >and we will be happy to help you where we can.
> >
> >Related to your question. Was the new option used by default? If that
> >is the case you will probably see its impact on the metrics dashboard [1].
> >And if it is not on by default, you can add your variant as a new
> benchmark
> >and compare the difference across many runs in a controlled benchmarking
> >environment. Would that help?
> >
> >Ahmet
> >
> >[1] http://metrics.beam.apache.org/d/1/getting-started?orgId=1
> >
> >
> >On Tue, Dec 15, 2020 at 5:48 AM Teodor Spæren 
> >wrote:
> >
> >> Hey!
> >>
> >> Yeah, that paper was what prompted my master thesis! I definitivly will
> >> post here, once I get more data :)
> >>
> >> Teodor
> >>
> >> On Mon, Dec 14, 2020 at 06:56:30AM -0600, Rion Williams wrote:
> >> >Hi Teodor,
> >> >
> >> >Although I’m sure you’ve come across it, this might have some valuable
> >> resources or methodologies to consider as you explore this a bit more:
> >> >
> >> >https://arxiv.org/pdf/1907.08302.pdf
> >> >
> >> >I’m looking forward to reading about your finding, especially using a
> >> more recent iteration of Beam!
> >> >
> >> >Rion
> >> >
> >> >> On Dec 14, 2020, at 6:37 AM, Teodor Spæren <
> teodor_spae...@riseup.net>
> >> wrote:
> >> >>
> >> >> Just bumping this so people see it now that 2.26.0 is out :)
> >> >>
> >> >>> On Wed, Nov 25, 2020 at 11:09:52AM +0100, Teodor Spæren wrote:
> >> >>> Hey!
> >> >>>
> >> >>> My name is Teodor Spæren and I'm writing a master thesis
> investigating
> >> the performance overhead of using Beam instead of using the underlying
> >> systems directly. My focus has been on Flink and I've made a discovery
> >> about some unnecessary copying between operators in the Flink
> runner[1][2].
> >> I wrote a fixed for this and it got accepted and merged,
> >> >>> and will be in the upcoming 2.26.0 release[3].
> >> >>>
> >> >>> I'm writing this email to ask if anyone on these mailing lists would
> >> be willing to send me some result of applying this option when the new
> >> version of beam releases. Anything will be very much appreciated,
> stories,
> >> screenshots of performance monitoring before and after, hard numbers,
> >> anything! If you include the cluster size and the workload that would be
> >> awesome too! My master thesis is set to be complete the coming summer,
> so
> >> there is no real hurry :)
> >> >>>
> >> >>> The thesis will be freely accessible[4] and I hope that these
> findings
> >> will be of help to the beam community. If anyone wishes to submit
> stories,
> >> but remain anonymous that is also ok :)
> >> >>>
> >> >>> The best way to contact me would be to send an email my way here, or
> >> on teod...@mail.uio.no.
> >> >>>
> >> >>> Any help is appreciated, thanks for your attention!
> >> >>>
> >> >>> Best regards,
> >> >>> Teodor Spæren
> >> >>>
> >> >>>
> >> >>> [1]:
> >>
> https://lists.apache.org/thread.html/r24129dba98782e1cf4d18ec738ab9714dceb05ac23f13adfac5baad1%40%3Cdev.beam.apache.org%3E
> >> >>> [2]: https://issues.apache.org/jira/browse/BEAM-11146
> >> >>> [3]: https://github.com/apache/beam/pull/13240
> >> >>> [4]: https://www.duo.uio.no/
> >>
>


Re: Help measuring upcoming performance increase in flink runner on production systems

2020-12-21 Thread Teodor Spæren
Hey! My option is not default as of now, since it can break pipelines 
which rely on the faulty flink implementation. I'm creating my own 
benchmarks locally and will run against those, but the idea of adding it 
to the official benchmark runs sounds interesting, thanks for bringing 
it up!


Teodor

On Tue, Dec 15, 2020 at 06:51:38PM -0800, Ahmet Altay wrote:

Hi Teodor,

Thank you for working on this. If I remember correctly, there were some
opportunities to improve in the previous paper (e.g. not focusing
deprecated runners, long running benchmarks, varying data sizes). And I am
excited that you are keeping the community as part of your research process
and we will be happy to help you where we can.

Related to your question. Was the new option used by default? If that
is the case you will probably see its impact on the metrics dashboard [1].
And if it is not on by default, you can add your variant as a new benchmark
and compare the difference across many runs in a controlled benchmarking
environment. Would that help?

Ahmet

[1] http://metrics.beam.apache.org/d/1/getting-started?orgId=1


On Tue, Dec 15, 2020 at 5:48 AM Teodor Spæren 
wrote:


Hey!

Yeah, that paper was what prompted my master thesis! I definitivly will
post here, once I get more data :)

Teodor

On Mon, Dec 14, 2020 at 06:56:30AM -0600, Rion Williams wrote:
>Hi Teodor,
>
>Although I’m sure you’ve come across it, this might have some valuable
resources or methodologies to consider as you explore this a bit more:
>
>https://arxiv.org/pdf/1907.08302.pdf
>
>I’m looking forward to reading about your finding, especially using a
more recent iteration of Beam!
>
>Rion
>
>> On Dec 14, 2020, at 6:37 AM, Teodor Spæren 
wrote:
>>
>> Just bumping this so people see it now that 2.26.0 is out :)
>>
>>> On Wed, Nov 25, 2020 at 11:09:52AM +0100, Teodor Spæren wrote:
>>> Hey!
>>>
>>> My name is Teodor Spæren and I'm writing a master thesis investigating
the performance overhead of using Beam instead of using the underlying
systems directly. My focus has been on Flink and I've made a discovery
about some unnecessary copying between operators in the Flink runner[1][2].
I wrote a fixed for this and it got accepted and merged,
>>> and will be in the upcoming 2.26.0 release[3].
>>>
>>> I'm writing this email to ask if anyone on these mailing lists would
be willing to send me some result of applying this option when the new
version of beam releases. Anything will be very much appreciated, stories,
screenshots of performance monitoring before and after, hard numbers,
anything! If you include the cluster size and the workload that would be
awesome too! My master thesis is set to be complete the coming summer, so
there is no real hurry :)
>>>
>>> The thesis will be freely accessible[4] and I hope that these findings
will be of help to the beam community. If anyone wishes to submit stories,
but remain anonymous that is also ok :)
>>>
>>> The best way to contact me would be to send an email my way here, or
on teod...@mail.uio.no.
>>>
>>> Any help is appreciated, thanks for your attention!
>>>
>>> Best regards,
>>> Teodor Spæren
>>>
>>>
>>> [1]:
https://lists.apache.org/thread.html/r24129dba98782e1cf4d18ec738ab9714dceb05ac23f13adfac5baad1%40%3Cdev.beam.apache.org%3E
>>> [2]: https://issues.apache.org/jira/browse/BEAM-11146
>>> [3]: https://github.com/apache/beam/pull/13240
>>> [4]: https://www.duo.uio.no/



periodic impulse bug

2020-12-21 Thread Manninger, Matyas
Dear Beam users,

In the python SDK I tried using the PeriodicImpulse but seems like there is
an internal bug. In the periodicsequence.py on line 42 there is a division
where a type Duration is being divided, but no division operation is
defined. What am I missing? Is there a workaround to this? Here is the
error message that lead to this "discovery":
File
"/home/user/.virtualenvs/dflowenv/lib/python3.8/site-packages/apache_beam/transforms/periodicsequence.py",
line 42, in initial_restriction
   total_outputs = math.ceil((end - start) / interval)
TypeError: unsupported operand type(s) for /: 'Duration' and 'Duration'
[while running 'read/periodic_impulse/GenSequence/PairWithRestriction']