Re: [PROPOSAL] Preparing for Beam 2.26.0 release

2020-10-27 Thread Reza Rokni
+1 Thanx!

On Wed, Oct 28, 2020 at 7:14 AM Valentyn Tymofieiev 
wrote:

> +1, thanks and good luck!
>
> On Tue, Oct 27, 2020 at 1:59 PM Tyson Hamilton  wrote:
>
>> Thanks Rebo! SGTM.
>>
>> On Tue, Oct 27, 2020 at 11:11 AM Udi Meiri  wrote:
>>
>>> +1 sg!
>>>
>>> On Tue, Oct 27, 2020 at 10:02 AM Robert Burke  wrote:
>>>
 Hello everyone!

 The next Beam release (2.26.0) is scheduled to be cut on November 4th
 according to the release calendar [1].

 I'd like to volunteer myself to handle this release. I plan on cutting
 the
 branch on November 5th (since I've had November 4th booked off for
 months now) and cherry-picking in release-blocking fixes
 afterwards. So unresolved release blocking JIRA issues should have
 their "Fix Version/s" marked as "2.26.0".

 Any comments or objections?

 Thanks,
 Robert Burke
 @lostluck
 [1]
 https://calendar.google.com/calendar/u/0/embed?src=0p73sl034k80oob7seouani...@group.calendar.google.com

>>>


Re: Contributor permissions for Beam Jira tickets

2020-10-27 Thread Pablo Estrada
Welcome Teodor!
I've added you as a contributor, and assigned the issue to you.
Thanks for looking into this. Your analysis was interesting, and the
improvement should benefit many.
Best
-P.

On Tue, Oct 27, 2020 at 2:00 PM Teodor Spæren 
wrote:

> Hey!
>
> My name is Teodor and I'm writing a master thesis comparing the overhead
> of using Beam versus writing native Flink. I want to contribute fixes
> for some of the problems I find. So far it's only one, [1], but I would
> like to assign the ticket to me. My jira username is rhermes, and I'm
> the reporter of [1].
>
> Hope to hear back.
>
> Best regards,
> Teodor Spæren
>
> [1] https://issues.apache.org/jira/browse/BEAM-11146
>


Contributor permissions for Beam Jira tickets

2020-10-27 Thread Teodor Spæren

Hey!

My name is Teodor and I'm writing a master thesis comparing the overhead 
of using Beam versus writing native Flink. I want to contribute fixes 
for some of the problems I find. So far it's only one, [1], but I would 
like to assign the ticket to me. My jira username is rhermes, and I'm 
the reporter of [1].


Hope to hear back.

Best regards,
Teodor Spæren

[1] https://issues.apache.org/jira/browse/BEAM-11146


Re: Possible 80% reduction in overhead for flink runner, input needed

2020-10-27 Thread Kenneth Knowles
It seems that many correct things are said on this thread.

1. Elements of a PCollection are immutable. They should be like
mathematical values.
2. For performance reasons, the author of a DoFn is responsible to not
mutate input elements and also to not mutate outputs once they have been
output.
3. The direct runner does extra work to check if your DoFn* is wrong.
4. On a production runner it is expected that serialization only occurs
when needed for shipping data**

If the FlinkRunner is serializing things that don't have to be shipped that
seems like a great easy win.

Kenn

*notably CombineFn has an API that is broken; only the first accumulator is
allowed to be mutated and a runner is responsible for cloning it as
necessary; it is expected that combining many elements will execute by
mutating one unaliased accumulator many times
**small caveat that when doing in-memory groupings you need to use
Coder#structuralValue and group by that, which may serialize but hopefully
does something smarter

On Tue, Oct 27, 2020 at 8:52 AM Reuven Lax  wrote:

> Actually I believe that the Beam model does say that input elements should
> be immutable. If I remember correctly, the DirectRunner even validates this
> in unit tests, failing tests if the input elements have been mutated.
>
> On Tue, Oct 27, 2020 at 3:49 AM David Morávek  wrote:
>
>> Hi Teodor,
>>
>> Thanks for bringing this up. This is a known, long standing "issue".
>> Unfortunately there are few things we need to consider:
>>
>> - As you correctly noted, the *Beam model doesn't enforce immutability*
>> of input / output elements, so this is the price.
>> - We* can not break *existing pipelines.
>> - Flink Runner needs to provide the *same guarantees as the Beam model*.
>>
>> There are definitely some things we can do here, to make things faster:
>>
>> - We can try the similar approach as HadoopIO
>> (HadoopInputFormatReader#isKnownImmutable), to check for known immutable
>> types (KV, primitives, protobuf, other known internal immutable structures).
>> -* If the type is immutable, we can safely reuse it.* This should cover
>> most of the performance costs without breaking the guarantees Beam model
>> provides.
>> - We can enable registration of custom "immutable" types via pipeline
>> options? (this may be an unnecessary knob, so this needs a further
>> discussion)
>>
>> WDYT?
>>
>> D.
>>
>>
>> On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren 
>> wrote:
>>
>>> Hey!
>>>
>>> I'm a student at the University of Oslo, and I'm writing a master thesis
>>> about the possibility of using Beam to benchmark stream processing
>>> systems. An important factor in this is the overhead associated with
>>> using Beam over writing code for the runner directly. [1] found that
>>> there was a large overhead associated with using Beam, but did not
>>> investigate where this overhead came from. I've done benchmarks and
>>> confirmed the findings there, where for simple chains of identity
>>> operators, Beam is 43x times slower than the Flink equivalent.
>>>
>>> These are very simple pipelines, with custom sources that just output a
>>> series of integers. By profiling I've found that most of the overhead
>>> comes from serializing and deserializing. Specifically the way
>>> TypeSerializer's, [2], is implemented in [3], where each object is
>>> serialized and then deserialized between every operator. Looking into
>>> the semantics of Beam, no operator should change the input, so we don't
>>> need to do a copy here. The function in [3] could potentially be changed
>>> to a single `return` statement.
>>>
>>> Doing this removes 80% of the overhead in my tests. This is a very
>>> synthetic example, but it's a low hanging fruit and might give a speed
>>> boost to many pipelines when run on the Flink runnner. I would like to
>>> make this my first contribution to Beam, but as the guide [4] says, I
>>> thought I'd ask here first to see if there a is a reason not to do this.
>>>
>>> Only objection I can see, is that it might break existing pipelines
>>> which rely on the Flink runner saving them from not following the
>>> immutability guarantee. I see this as a small loss as they are relying
>>> on an implementation detail of the Flink runner.
>>>
>>> I hope I have explained this adequately and eagerly away any feedback :)
>>>
>>> Best regards,
>>> Teodor Spæren
>>>
>>> [1]: https://arxiv.org/abs/1907.08302
>>> [2]:
>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
>>> [3]:
>>> https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84
>>> [4]: https://beam.apache.org/contribute/
>>>
>>


Re: Website Revamp Update - Week 1 (and how to get involved)

2020-10-27 Thread Agnieszka Sell
Hi Tyson,

Thank you for your feedback!

- We’re thinking about using the mascot –we’ll see how it works with the rest 
of the design :) 
- Beam logo will be added on the top of the screen and perhaps also on the hero 
image – we’re in the process on designing this piece right now. 
- Thank you for the comments about Twitter and „How it works” section – I’ll 
discuss it with the designers.

Best,

Agnieszka

> Wiadomość napisana przez Tyson Hamilton  w dniu 
> 27.10.2020, o godz. 19:04:
> 
> 
> Hello,
> 
> I was unable to attend the sprint meeting but had a couple comments. 
> 
>   - The Beam mascot isn't on the page anywhere, is this intentional? It would 
> be fun to include it somewhere, it's so cute and fast.
>   - The Beam logo doesn't appear until the footer. Maybe there should be one 
> higher somewhere? None of the "Beam" branding really pops out above the fold.
>   - The middle section "How it Works" confuses me, in my mind the arrangement 
> would be 'Beam Model (how users think about the pipeline) -> SDKs (how users 
> describe/express their pipeline using the model) -> Runners (execution of 
> pipeline description/expression)". On the current 'beam.apache.org' site 
> there are no arrows on the lines, which could be another way to solve the 
> issue.
>   - The section on "Stay up to date ..." should include the twitter handle?
> 
> Thanks,
> -Tyson
> 
> 
>> On Tue, Oct 27, 2020 at 9:40 AM Agnieszka Sell  
>> wrote:
>> Hi there,
>> 
>> Tomorrow at 9 am PT we'll have a second sprint review for the Beam Website 
>> Revamp project. If you want to join this meeting please use this link: 
>> https://meet.google.com/hrk-ngzu-sun.
>> 
>> What you're going to see? 
>> UX design for the home page on mobile devices.
>> UX design for the "contribute" page.
>> Hero image for the home page.
>> Please remember that we're gathering your feedback for the home page UX 
>> design – PNG file was attached to my previous email. You can send us your 
>> comments on Slack as well 
>> (https://join.slack.com/share/zt-i7u8s7vr-g8IYBC8IS3bcUmsCHmYa5Q).
>> 
>> Kid regards,
>> 
>> Agnieszka
>> 
>>> On Mon, Oct 26, 2020 at 9:36 AM Agnieszka Sell  
>>> wrote:
>>> Hi Everyone!
>>> 
>>> Attached you'll find the initial proposal of the wireframe for the Beam 
>>> Home Page. Please share your feedback :)  
>>> 
>>> Some comments to the designs:
>>> 
>>> Hero Image: it will get more compact in the upcoming designs – our UI 
>>> designer is working on the key visual and we will propose some ideas during 
>>> the next review meeting, so for now it is here for the preview.
>>> 
>>> The bar with buttons: It will be pinned to the bottom of the page and will 
>>> follow the user. So users will get “learn more” and “quick-starts” easily. 
>>> Otherwise the top navigation bar will be enough for the user.
>>> 
>>> "How it works" section: for the graph we want to make it clear and more 
>>> clean. It will be done in the UI phase, but it is still important to keep 
>>> it in for the users.
>>> 
>>> "Stay up to date with the Beam" section: in a compact way we show the 
>>> latest updates from the blog and the upcoming events.
>>> 
>>> "Start exploring" section: redirecting to social media/GitHub.
>>> 
>>> Please keep in mind that all of the copy, logos, icons are just 
>>> placeholders. It is a basic design to show the overview of the sections 
>>> that we want to implement.
>>> 
>>> Let me know if you have any questions!
>>> 
>>> Kind regards,
>>> 
>>> Agnieszka
>>> 
>>> 
 On Wed, Oct 21, 2020 at 7:33 AM Griselda Cuevas  wrote:
 Hi folks, please find here [1] the updates for week 1 of the website 
 revamp work. This document will contain notes for every sprint, which will 
 last one week. 
 
 If you want to get involved in this effort here are some resources:
 The Polidea/Utilo team will be sharing mocks and designs as they are 
 available to get feedback from the community, so share your thoughts and 
 suggestions!
 @Agnieszka Sell will create a public link for the weekly review meeting so 
 anyone interesting in joining can come. 
 I created a slack channel [2] if you want to chat about this project with 
 the working group
 Thanks all, 
 G
 
 [1] 
 https://docs.google.com/document/d/1CqssBpRt1EjpV0nCBaS9WGaGHt_4stKXbwMz9gYfmwc/edit
 [2] https://join.slack.com/share/zt-i7u8s7vr-g8IYBC8IS3bcUmsCHmYa5Q
>>> 
>>> 
>>> -- 
>>> 
>>> Agnieszka Sell
>>> Polidea | Project Manager
>>> M: +48 504 901 334
>>> E: agnieszka.s...@polidea.com
>>> 
>>> Check out our projects!
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Unique Tech
>>> Check out our projects!
>> 


Re: [PROPOSAL] Preparing for Beam 2.26.0 release

2020-10-27 Thread Udi Meiri
+1 sg!

On Tue, Oct 27, 2020 at 10:02 AM Robert Burke  wrote:

> Hello everyone!
>
> The next Beam release (2.26.0) is scheduled to be cut on November 4th
> according to the release calendar [1].
>
> I'd like to volunteer myself to handle this release. I plan on cutting the
> branch on November 5th (since I've had November 4th booked off for months
> now) and cherry-picking in release-blocking fixes
> afterwards. So unresolved release blocking JIRA issues should have
> their "Fix Version/s" marked as "2.26.0".
>
> Any comments or objections?
>
> Thanks,
> Robert Burke
> @lostluck
> [1]
> https://calendar.google.com/calendar/u/0/embed?src=0p73sl034k80oob7seouani...@group.calendar.google.com
>


smime.p7s
Description: S/MIME Cryptographic Signature


Re: Website Revamp Update - Week 1 (and how to get involved)

2020-10-27 Thread Tyson Hamilton
Hello,

I was unable to attend the sprint meeting but had a couple comments.

  - The Beam mascot isn't on the page anywhere, is this intentional? It
would be fun to include it somewhere, it's so cute and fast.
  - The Beam logo doesn't appear until the footer. Maybe there should be
one higher somewhere? None of the "Beam" branding really pops out above the
fold.
  - The middle section "How it Works" confuses me, in my mind the
arrangement would be 'Beam Model (how users think about the pipeline) ->
SDKs (how users describe/express their pipeline using the model) -> Runners
(execution of pipeline description/expression)". On the current '
beam.apache.org' site there are no arrows on the lines, which could be
another way to solve the issue.
  - The section on "Stay up to date ..." should include the twitter handle?

Thanks,
-Tyson


On Tue, Oct 27, 2020 at 9:40 AM Agnieszka Sell 
wrote:

> Hi there,
>
> Tomorrow at 9 am PT we'll have a second sprint review for the Beam Website
> Revamp project. If you want to join this meeting please use this link:
> https://meet.google.com/hrk-ngzu-sun.
>
> What you're going to see?
>
>- UX design for the home page on mobile devices.
>- UX design for the "contribute" page.
>- Hero image for the home page.
>
> Please remember that we're gathering your feedback for the home page UX
> design – PNG file was attached to my previous email. You can send us your
> comments on Slack as well (
> https://join.slack.com/share/zt-i7u8s7vr-g8IYBC8IS3bcUmsCHmYa5Q).
>
> Kid regards,
>
> Agnieszka
>
> On Mon, Oct 26, 2020 at 9:36 AM Agnieszka Sell 
> wrote:
>
>> Hi Everyone!
>>
>> Attached you'll find the initial proposal of the wireframe for the Beam
>> Home Page. Please share your feedback :)
>>
>> Some comments to the designs:
>>
>> *Hero Image:* it will get more compact in the upcoming designs – our UI
>> designer is working on the key visual and we will propose some ideas during
>> the next review meeting, so for now it is here for the preview.
>>
>> *The bar with buttons:* It will be pinned to the bottom of the page and
>> will follow the user. So users will get “learn more” and “quick-starts”
>> easily. Otherwise the top navigation bar will be enough for the user.
>>
>> *"How it works" section: *for the graph we want to make it clear and
>> more clean. It will be done in the UI phase, but it is still important to
>> keep it in for the users.
>>
>> *"Stay up to date with the Beam" section: *in a compact way we show the
>> latest updates from the blog and the upcoming events.
>>
>> *"Start exploring" section*: redirecting to social media/GitHub.
>>
>> Please keep in mind that all of the copy, logos, icons are just
>> placeholders. It is a basic design to show the overview of the sections
>> that we want to implement.
>>
>> Let me know if you have any questions!
>>
>> Kind regards,
>>
>> Agnieszka
>>
>>
>> On Wed, Oct 21, 2020 at 7:33 AM Griselda Cuevas  wrote:
>>
>>> Hi folks, please find here [1] the updates for week 1 of the website
>>> revamp work. This document will contain notes for every sprint, which will
>>> last one week.
>>>
>>> If you want to get involved in this effort here are some resources:
>>>
>>>- The Polidea/Utilo team will be sharing mocks and designs as they
>>>are available to get feedback from the community, so share your thoughts
>>>and suggestions!
>>>- @Agnieszka Sell  will create a public
>>>link for the weekly review meeting so anyone interesting in joining can
>>>come.
>>>- I created a slack channel [2] if you want to chat about this
>>>project with the working group
>>>
>>> Thanks all,
>>> G
>>>
>>> [1]
>>> https://docs.google.com/document/d/1CqssBpRt1EjpV0nCBaS9WGaGHt_4stKXbwMz9gYfmwc/edit
>>> [2] https://join.slack.com/share/zt-i7u8s7vr-g8IYBC8IS3bcUmsCHmYa5Q
>>>
>>
>>
>> --
>>
>> Agnieszka Sell
>> Polidea  | Project Manager
>>
>> M: *+48 504 901 334* <+48504901334>
>> E: agnieszka.s...@polidea.com
>> [image: Polidea] 
>>
>> Check out our projects! 
>> [image: Github]  [image: Facebook]
>>  [image: Twitter]
>>  [image: Linkedin]
>>  [image: Instagram]
>> 
>>
>> Unique Tech
>> Check out our projects! 
>>
>


Transform Logging Issues with Spark/Dataproc in GCP

2020-10-27 Thread Rion Williams
Hi all,

Recently, I deployed a very simple Apache Beam pipeline to get some
insights into how it behaved executing in Dataproc as opposed to on my
local machine. I quickly realized that after executing that any DoFn or
transform-level logging didn't appear within the job logs within the Google
Cloud Console as I would have expected and I'm not entirely sure what might
be missing.


All of the high level logging messages are emitted as expected:


*// This works*

*log.info ("Testing logging operations")*


*pipeline*

*.apply(Create.of(...))*

*.apply(ParDo.of(LoggingDoFn))*


The LoggingDoFn class here is a very basic transform that emits each of the
values that it encounters as seen below:


*object LoggingDoFn : DoFn() {*

*private val log = LoggerFactory.getLogger(LoggingDoFn::class.java)*



*@ProcessElement*

*fun processElement(c: ProcessContext) {*

*// This is never emitted within the logs*

*log.info ("Attempting to parse ${c.element()}")*

*}*

*}*


As detailed in the comments, I can see logging messages outside of
the processElement() calls (presumably because those are being executed by
the Spark runner), but is there a way to easily expose those within the
inner transform as well?


When viewing the logs related to this job, we can see the higher-level
logging present, but no mention of a "Attempting to parse ..." message from
the DoFn:


[image: gjela.png] 


The job itself is being executed by the following gcloud command, which has
the driver log levels explicitly defined, but perhaps there's another level
of logging or configuration that needs to be added:


*gcloud dataproc jobs submit spark
--jar=gs://some_bucket/deployment/example.jar --project example-project
--cluster example-cluster --region us-example --driver-log-levels
com.example=DEBUG -- --runner=SparkRunner
--output=gs://some_bucket/deployment/out*


To summarize, log messages are not being emitted to the Google Cloud
Console for tasks that would generally be assigned to the Spark runner
itself (e.g. processElement()). I'm unsure if it's a configuration-related
issue or something else entirely.


Any advice would be appreciated and I’d be glad to provide some additional
details however I can.


Thanks,


Rion


[PROPOSAL] Preparing for Beam 2.26.0 release

2020-10-27 Thread Robert Burke
Hello everyone!

The next Beam release (2.26.0) is scheduled to be cut on November 4th
according to the release calendar [1].

I'd like to volunteer myself to handle this release. I plan on cutting the
branch on November 5th (since I've had November 4th booked off for months
now) and cherry-picking in release-blocking fixes
afterwards. So unresolved release blocking JIRA issues should have
their "Fix Version/s" marked as "2.26.0".

Any comments or objections?

Thanks,
Robert Burke
@lostluck
[1]
https://calendar.google.com/calendar/u/0/embed?src=0p73sl034k80oob7seouani...@group.calendar.google.com


Re: Website Revamp Update - Week 1 (and how to get involved)

2020-10-27 Thread Agnieszka Sell
Hi there,

Tomorrow at 9 am PT we'll have a second sprint review for the Beam Website
Revamp project. If you want to join this meeting please use this link:
https://meet.google.com/hrk-ngzu-sun.

What you're going to see?

   - UX design for the home page on mobile devices.
   - UX design for the "contribute" page.
   - Hero image for the home page.

Please remember that we're gathering your feedback for the home page UX
design – PNG file was attached to my previous email. You can send us your
comments on Slack as well (
https://join.slack.com/share/zt-i7u8s7vr-g8IYBC8IS3bcUmsCHmYa5Q).

Kid regards,

Agnieszka

On Mon, Oct 26, 2020 at 9:36 AM Agnieszka Sell 
wrote:

> Hi Everyone!
>
> Attached you'll find the initial proposal of the wireframe for the Beam
> Home Page. Please share your feedback :)
>
> Some comments to the designs:
>
> *Hero Image:* it will get more compact in the upcoming designs – our UI
> designer is working on the key visual and we will propose some ideas during
> the next review meeting, so for now it is here for the preview.
>
> *The bar with buttons:* It will be pinned to the bottom of the page and
> will follow the user. So users will get “learn more” and “quick-starts”
> easily. Otherwise the top navigation bar will be enough for the user.
>
> *"How it works" section: *for the graph we want to make it clear and more
> clean. It will be done in the UI phase, but it is still important to keep
> it in for the users.
>
> *"Stay up to date with the Beam" section: *in a compact way we show the
> latest updates from the blog and the upcoming events.
>
> *"Start exploring" section*: redirecting to social media/GitHub.
>
> Please keep in mind that all of the copy, logos, icons are just
> placeholders. It is a basic design to show the overview of the sections
> that we want to implement.
>
> Let me know if you have any questions!
>
> Kind regards,
>
> Agnieszka
>
>
> On Wed, Oct 21, 2020 at 7:33 AM Griselda Cuevas  wrote:
>
>> Hi folks, please find here [1] the updates for week 1 of the website
>> revamp work. This document will contain notes for every sprint, which will
>> last one week.
>>
>> If you want to get involved in this effort here are some resources:
>>
>>- The Polidea/Utilo team will be sharing mocks and designs as they
>>are available to get feedback from the community, so share your thoughts
>>and suggestions!
>>- @Agnieszka Sell  will create a public
>>link for the weekly review meeting so anyone interesting in joining can
>>come.
>>- I created a slack channel [2] if you want to chat about this
>>project with the working group
>>
>> Thanks all,
>> G
>>
>> [1]
>> https://docs.google.com/document/d/1CqssBpRt1EjpV0nCBaS9WGaGHt_4stKXbwMz9gYfmwc/edit
>> [2] https://join.slack.com/share/zt-i7u8s7vr-g8IYBC8IS3bcUmsCHmYa5Q
>>
>
>
> --
>
> Agnieszka Sell
> Polidea  | Project Manager
>
> M: *+48 504 901 334* <+48504901334>
> E: agnieszka.s...@polidea.com
> [image: Polidea] 
>
> Check out our projects! 
> [image: Github]  [image: Facebook]
>  [image: Twitter]
>  [image: Linkedin]
>  [image: Instagram]
> 
>
> Unique Tech
> Check out our projects! 
>


Re: Possible 80% reduction in overhead for flink runner, input needed

2020-10-27 Thread Reuven Lax
Actually I believe that the Beam model does say that input elements should
be immutable. If I remember correctly, the DirectRunner even validates this
in unit tests, failing tests if the input elements have been mutated.

On Tue, Oct 27, 2020 at 3:49 AM David Morávek  wrote:

> Hi Teodor,
>
> Thanks for bringing this up. This is a known, long standing "issue".
> Unfortunately there are few things we need to consider:
>
> - As you correctly noted, the *Beam model doesn't enforce immutability*
> of input / output elements, so this is the price.
> - We* can not break *existing pipelines.
> - Flink Runner needs to provide the *same guarantees as the Beam model*.
>
> There are definitely some things we can do here, to make things faster:
>
> - We can try the similar approach as HadoopIO
> (HadoopInputFormatReader#isKnownImmutable), to check for known immutable
> types (KV, primitives, protobuf, other known internal immutable structures).
> -* If the type is immutable, we can safely reuse it.* This should cover
> most of the performance costs without breaking the guarantees Beam model
> provides.
> - We can enable registration of custom "immutable" types via pipeline
> options? (this may be an unnecessary knob, so this needs a further
> discussion)
>
> WDYT?
>
> D.
>
>
> On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren 
> wrote:
>
>> Hey!
>>
>> I'm a student at the University of Oslo, and I'm writing a master thesis
>> about the possibility of using Beam to benchmark stream processing
>> systems. An important factor in this is the overhead associated with
>> using Beam over writing code for the runner directly. [1] found that
>> there was a large overhead associated with using Beam, but did not
>> investigate where this overhead came from. I've done benchmarks and
>> confirmed the findings there, where for simple chains of identity
>> operators, Beam is 43x times slower than the Flink equivalent.
>>
>> These are very simple pipelines, with custom sources that just output a
>> series of integers. By profiling I've found that most of the overhead
>> comes from serializing and deserializing. Specifically the way
>> TypeSerializer's, [2], is implemented in [3], where each object is
>> serialized and then deserialized between every operator. Looking into
>> the semantics of Beam, no operator should change the input, so we don't
>> need to do a copy here. The function in [3] could potentially be changed
>> to a single `return` statement.
>>
>> Doing this removes 80% of the overhead in my tests. This is a very
>> synthetic example, but it's a low hanging fruit and might give a speed
>> boost to many pipelines when run on the Flink runnner. I would like to
>> make this my first contribution to Beam, but as the guide [4] says, I
>> thought I'd ask here first to see if there a is a reason not to do this.
>>
>> Only objection I can see, is that it might break existing pipelines
>> which rely on the Flink runner saving them from not following the
>> immutability guarantee. I see this as a small loss as they are relying
>> on an implementation detail of the Flink runner.
>>
>> I hope I have explained this adequately and eagerly away any feedback :)
>>
>> Best regards,
>> Teodor Spæren
>>
>> [1]: https://arxiv.org/abs/1907.08302
>> [2]:
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
>> [3]:
>> https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84
>> [4]: https://beam.apache.org/contribute/
>>
>


Re: Apache Beam case studies

2020-10-27 Thread Karolina Rosół
Hi Gris,

Thanks for bringing this up, I'll also let the users@ list know :-)

Karolina Rosół
Polidea  | Head of Cloud & OSS

M: +48 606 630 236 <+48606630236>
E: karolina.ro...@polidea.com
[image: Polidea] 

Check out our projects! 
[image: Github]  [image: Facebook]
 [image: Twitter]
 [image: Linkedin]
 [image: Instagram]
 [image: Behance]
 [image: dribbble]



On Sun, Oct 25, 2020 at 4:40 PM Griselda Cuevas  wrote:

> For context, Polidea wants to develop more like case studies for marketing
> collateral rather than UX Research.
>
> @Karolina Rosół   it might be worth also
> sending the note to the users@ list.
>
> On Fri, 23 Oct 2020 at 00:57, Karolina Rosół 
> wrote:
>
>> Hi,
>>
>> @Luke thanks a mill for cc'ing Mariann :-) Let's see if there's any
>> interest.
>>
>> Kind regards,
>>
>> Karolina Rosół
>> Polidea  | Head of Cloud & OSS
>>
>> M: +48 606 630 236 <+48606630236>
>> E: karolina.ro...@polidea.com
>> [image: Polidea] 
>>
>> Check out our projects! 
>> [image: Github]  [image: Facebook]
>>  [image: Twitter]
>>  [image: Linkedin]
>>  [image: Instagram]
>>  [image: Behance]
>>  [image: dribbble]
>> 
>>
>>
>> On Wed, Oct 21, 2020 at 5:50 PM Luke Cwik  wrote:
>>
>>> +Mariann Nagy  has been doing things like this for
>>> years now and may be interested.
>>>
>>> On Wed, Oct 21, 2020 at 12:50 AM Karolina Rosół <
>>> karolina.ro...@polidea.com> wrote:
>>>
 Hi folks,

 With some people from Polidea we came up with an idea to carry out
 interviews with Apache Beam users to spread the news about the Beam model
 and engage more people to use it.

 Ideally, we'd set up an online meeting with interested people and then
 do an interview. We'd like to ask questions such as 'how did you find out
 about Apache Beam' / 'how do you use Apache Beam in your company/product?'
 etc. We'd love to post the whole interview on Polidea and Apache Beam
 website.

 If any of you is interested, please let me know in this thread :-)

 Wish you all a happy week and stay safe!

 Karolina Rosół
 Polidea  | Head of Cloud & OSS

 M: +48 606 630 236 <+48606630236>
 E: karolina.ro...@polidea.com
 [image: Polidea] 

 Check out our projects! 
 [image: Github]  [image: Facebook]
  [image: Twitter]
  [image: Linkedin]
  [image: Instagram]
  [image: Behance]
  [image: dribbble]
 

>>>


Re: Possible 80% reduction in overhead for flink runner, input needed

2020-10-27 Thread Teodor Spæren
@David, I don't know how the direct runner does the validation, so I'm 
not sure if we could replicate that to the flink runner without a perf 
penalty. Your point about writing tests I actually think is an argument 
for removing this as soon as possible, so the prototype doesn't blow up 
in production :P But I think a flag would be best.


@Jan, This will actually be part of my master thesis and so it would be 
my pleasure to perform more benchmarks and share the results. I have not 
had the time yet to test it out, but I agree with you about the 
performance impact being less for more realistic jobs. The benchmark I 
have used here is a worst case scenario. Would continuing with the 
synthetic sources, but changing the processing steps? So far all my 
tests have been with parallelism set to 1 and this is something I'm also 
going to explore more in the thesis.


But am I correct in thinking that it's worth creating a Jira issue over 
this and assigning it to myself? As I said in the original email, I 
think the change is simple enough so that I can implement it and I would 
like to try contributing to Beam.


(Not going to lie, it would also look good on my master thesis ;) )

Best regards,
Teodor Spæren


On Tue, Oct 27, 2020 at 01:53:11PM +0100, Jan Lukavský wrote:

Hi,

I tend to be +1 for the flag, but before that, we might want to have a 
deeper analysis of the performance impact. I believe the penalty will 
be (in percentage) much lower in cases of more practical jobs (e.g. 
having at least one shuffle).


@Teodor, would you be willing to provide us with some measurements of 
jobs doing something more practical, than simple stateless mappings? 
E.g. a few jobs doing 1, 2 and 3 shuffle phases to see what is the 
impact of these more complex scenarios on the performance penalty?


Cheers,

 Jan

On 10/27/20 1:24 PM, David Morávek wrote:
you made a really good argument ;) I'm inclined to an experimental 
opt-in flag that would enable this. It would be great if we could 
automatically check for violations - kind of a safety net, for 
mistakes in user code.


Just to note, direct runner enforcement may not cover all cases, as 
it only checks binary representation after serialization. Also there 
are programmers that don't write tests, especially during 
prototyping (not an argument for perf. penalty, but something to 
keep in mind).


Max, WDYT?





On Tue, Oct 27, 2020 at 12:44 PM Teodor Spæren 
mailto:teodor_spae...@riseup.net>> 
wrote:


   Some more thoughts:

   As it says on the DirectRunner [1] page, the DirectRunner is meant to
   check that users don't rely on semantics that are not guaranteed
   by the
   Beam model.

   Programs that rely on the Flink runner deep cloning the inputs
   between
   each operator in the pipeline is relying on a semantic that is not
   guaranteed by the Beam model, and those pipelines would fail if
   ran on
   the DirectRunner.

   As I stated in the previous email, I have some example programs that
   return different outputs on the Flink runner and on the
   DirectRunner. I
   have not tested these programs on other runners, so I don't know what
   they would return. If they return different answers than the
   DirectRunner, I'm inclined to say that the DirectRunner should
   either be
   changed, or the runners be changed.

    From my very limited point of view, the Flink runner seems to be
   spending a lot of extra time implementing a semantic guarantee
   that the
   Beam model explicitly doesn't support.


   Best regards,
   Teodor Spæren

   [1]: https://beam.apache.org/documentation/runners/direct/

   On Tue, Oct 27, 2020 at 12:08:51PM +0100, Teodor Spæren wrote:
   >Hey David,
   >
   >I think I might have worded this poorly, because what I meant is
   that
   >from what I can see in [1], the BEAM model explicitly states that
   >PCollections should be treated as immutable. The direct runner also
   >tests for this. Do the other runners also protect the user from
   >misusing the system so? If not we have a situation where running the
   >same pipeline on two different runners will yield different
   answers. I
   >can show some examples that return different examples for the Flink
   >and the Direct Runner.
   >
   >I agree that a breaking existing pipelines is a no-no, but I do
   think
   >that we could simply gate this behind an option on the Flink runner.
   >
   >I also tried to search for this before, but did not find any mention
   >of it, can you link me to some discussions about this in the past?
   >
   >Thanks for reply :D
   >
   >Best regards,
   >Teodor Spæren
   >
   >[1]:
   https://beam.apache.org/documentation/programming-guide/#immutability
   >
   >
   >On Tue, Oct 27, 2020 at 11:49:45AM +0100, David Morávek wrote:
   >>Hi Teodor,
   >>
   >>Thanks for bringing this up. This is a known, long standing "issue".
   >>Unfortunately there are few things we need to consider:
   >>
   >>- As you correctly noted, the *Beam model 

Re: Possible 80% reduction in overhead for flink runner, input needed

2020-10-27 Thread Jan Lukavský

Hi,

I tend to be +1 for the flag, but before that, we might want to have a 
deeper analysis of the performance impact. I believe the penalty will be 
(in percentage) much lower in cases of more practical jobs (e.g. having 
at least one shuffle).


@Teodor, would you be willing to provide us with some measurements of 
jobs doing something more practical, than simple stateless mappings? 
E.g. a few jobs doing 1, 2 and 3 shuffle phases to see what is the 
impact of these more complex scenarios on the performance penalty?


Cheers,

 Jan

On 10/27/20 1:24 PM, David Morávek wrote:
you made a really good argument ;) I'm inclined to an experimental 
opt-in flag that would enable this. It would be great if we could 
automatically check for violations - kind of a safety net, for 
mistakes in user code.


Just to note, direct runner enforcement may not cover all cases, as it 
only checks binary representation after serialization. Also there are 
programmers that don't write tests, especially during prototyping (not 
an argument for perf. penalty, but something to keep in mind).


Max, WDYT?





On Tue, Oct 27, 2020 at 12:44 PM Teodor Spæren 
mailto:teodor_spae...@riseup.net>> wrote:


Some more thoughts:

As it says on the DirectRunner [1] page, the DirectRunner is meant to
check that users don't rely on semantics that are not guaranteed
by the
Beam model.

Programs that rely on the Flink runner deep cloning the inputs
between
each operator in the pipeline is relying on a semantic that is not
guaranteed by the Beam model, and those pipelines would fail if
ran on
the DirectRunner.

As I stated in the previous email, I have some example programs that
return different outputs on the Flink runner and on the
DirectRunner. I
have not tested these programs on other runners, so I don't know what
they would return. If they return different answers than the
DirectRunner, I'm inclined to say that the DirectRunner should
either be
changed, or the runners be changed.

 From my very limited point of view, the Flink runner seems to be
spending a lot of extra time implementing a semantic guarantee
that the
Beam model explicitly doesn't support.


Best regards,
Teodor Spæren

[1]: https://beam.apache.org/documentation/runners/direct/

On Tue, Oct 27, 2020 at 12:08:51PM +0100, Teodor Spæren wrote:
>Hey David,
>
>I think I might have worded this poorly, because what I meant is
that
>from what I can see in [1], the BEAM model explicitly states that
>PCollections should be treated as immutable. The direct runner also
>tests for this. Do the other runners also protect the user from
>misusing the system so? If not we have a situation where running the
>same pipeline on two different runners will yield different
answers. I
>can show some examples that return different examples for the Flink
>and the Direct Runner.
>
>I agree that a breaking existing pipelines is a no-no, but I do
think
>that we could simply gate this behind an option on the Flink runner.
>
>I also tried to search for this before, but did not find any mention
>of it, can you link me to some discussions about this in the past?
>
>Thanks for reply :D
>
>Best regards,
>Teodor Spæren
>
>[1]:
https://beam.apache.org/documentation/programming-guide/#immutability
>
>
>On Tue, Oct 27, 2020 at 11:49:45AM +0100, David Morávek wrote:
>>Hi Teodor,
>>
>>Thanks for bringing this up. This is a known, long standing "issue".
>>Unfortunately there are few things we need to consider:
>>
>>- As you correctly noted, the *Beam model doesn't enforce
immutability* of
>>input / output elements, so this is the price.
>>- We* can not break *existing pipelines.
>>- Flink Runner needs to provide the *same guarantees as the Beam
model*.
>>
>>There are definitely some things we can do here, to make things
faster:
>>
>>- We can try the similar approach as HadoopIO
>>(HadoopInputFormatReader#isKnownImmutable), to check for known
immutable
>>types (KV, primitives, protobuf, other known internal immutable
structures).
>>-* If the type is immutable, we can safely reuse it.* This
should cover
>>most of the performance costs without breaking the guarantees
Beam model
>>provides.
>>- We can enable registration of custom "immutable" types via
pipeline
>>options? (this may be an unnecessary knob, so this needs a further
>>discussion)
>>
>>WDYT?
>>
>>D.
>>
>>
>>On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren
mailto:teodor_spae...@riseup.net>>
>>wrote:
>>
>>>Hey!
>>>
>>>I'm a student at the University of Oslo, and I'm writing a
master thesis
>>>about the possibility of using Beam to benchmark stream processing

Re: Possible 80% reduction in overhead for flink runner, input needed

2020-10-27 Thread David Morávek
you made a really good argument ;) I'm inclined to an experimental opt-in
flag that would enable this. It would be great if we could automatically
check for violations - kind of a safety net, for mistakes in user code.

Just to note, direct runner enforcement may not cover all cases, as it only
checks binary representation after serialization. Also there are
programmers that don't write tests, especially during prototyping (not an
argument for perf. penalty, but something to keep in mind).

Max, WDYT?





On Tue, Oct 27, 2020 at 12:44 PM Teodor Spæren 
wrote:

> Some more thoughts:
>
> As it says on the DirectRunner [1] page, the DirectRunner is meant to
> check that users don't rely on semantics that are not guaranteed by the
> Beam model.
>
> Programs that rely on the Flink runner deep cloning the inputs between
> each operator in the pipeline is relying on a semantic that is not
> guaranteed by the Beam model, and those pipelines would fail if ran on
> the DirectRunner.
>
> As I stated in the previous email, I have some example programs that
> return different outputs on the Flink runner and on the DirectRunner. I
> have not tested these programs on other runners, so I don't know what
> they would return. If they return different answers than the
> DirectRunner, I'm inclined to say that the DirectRunner should either be
> changed, or the runners be changed.
>
>  From my very limited point of view, the Flink runner seems to be
> spending a lot of extra time implementing a semantic guarantee that the
> Beam model explicitly doesn't support.
>
>
> Best regards,
> Teodor Spæren
>
> [1]: https://beam.apache.org/documentation/runners/direct/
>
> On Tue, Oct 27, 2020 at 12:08:51PM +0100, Teodor Spæren wrote:
> >Hey David,
> >
> >I think I might have worded this poorly, because what I meant is that
> >from what I can see in [1], the BEAM model explicitly states that
> >PCollections should be treated as immutable. The direct runner also
> >tests for this. Do the other runners also protect the user from
> >misusing the system so? If not we have a situation where running the
> >same pipeline on two different runners will yield different answers. I
> >can show some examples that return different examples for the Flink
> >and the Direct Runner.
> >
> >I agree that a breaking existing pipelines is a no-no, but I do think
> >that we could simply gate this behind an option on the Flink runner.
> >
> >I also tried to search for this before, but did not find any mention
> >of it, can you link me to some discussions about this in the past?
> >
> >Thanks for reply :D
> >
> >Best regards,
> >Teodor Spæren
> >
> >[1]:
> https://beam.apache.org/documentation/programming-guide/#immutability
> >
> >
> >On Tue, Oct 27, 2020 at 11:49:45AM +0100, David Morávek wrote:
> >>Hi Teodor,
> >>
> >>Thanks for bringing this up. This is a known, long standing "issue".
> >>Unfortunately there are few things we need to consider:
> >>
> >>- As you correctly noted, the *Beam model doesn't enforce immutability*
> of
> >>input / output elements, so this is the price.
> >>- We* can not break *existing pipelines.
> >>- Flink Runner needs to provide the *same guarantees as the Beam model*.
> >>
> >>There are definitely some things we can do here, to make things faster:
> >>
> >>- We can try the similar approach as HadoopIO
> >>(HadoopInputFormatReader#isKnownImmutable), to check for known immutable
> >>types (KV, primitives, protobuf, other known internal immutable
> structures).
> >>-* If the type is immutable, we can safely reuse it.* This should cover
> >>most of the performance costs without breaking the guarantees Beam model
> >>provides.
> >>- We can enable registration of custom "immutable" types via pipeline
> >>options? (this may be an unnecessary knob, so this needs a further
> >>discussion)
> >>
> >>WDYT?
> >>
> >>D.
> >>
> >>
> >>On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren  >
> >>wrote:
> >>
> >>>Hey!
> >>>
> >>>I'm a student at the University of Oslo, and I'm writing a master thesis
> >>>about the possibility of using Beam to benchmark stream processing
> >>>systems. An important factor in this is the overhead associated with
> >>>using Beam over writing code for the runner directly. [1] found that
> >>>there was a large overhead associated with using Beam, but did not
> >>>investigate where this overhead came from. I've done benchmarks and
> >>>confirmed the findings there, where for simple chains of identity
> >>>operators, Beam is 43x times slower than the Flink equivalent.
> >>>
> >>>These are very simple pipelines, with custom sources that just output a
> >>>series of integers. By profiling I've found that most of the overhead
> >>>comes from serializing and deserializing. Specifically the way
> >>>TypeSerializer's, [2], is implemented in [3], where each object is
> >>>serialized and then deserialized between every operator. Looking into
> >>>the semantics of Beam, no operator should change the input, so we don't
> >>>need to 

Re: Possible 80% reduction in overhead for flink runner, input needed

2020-10-27 Thread Teodor Spæren

Some more thoughts:

As it says on the DirectRunner [1] page, the DirectRunner is meant to 
check that users don't rely on semantics that are not guaranteed by the 
Beam model.


Programs that rely on the Flink runner deep cloning the inputs between 
each operator in the pipeline is relying on a semantic that is not 
guaranteed by the Beam model, and those pipelines would fail if ran on 
the DirectRunner.


As I stated in the previous email, I have some example programs that 
return different outputs on the Flink runner and on the DirectRunner. I 
have not tested these programs on other runners, so I don't know what 
they would return. If they return different answers than the 
DirectRunner, I'm inclined to say that the DirectRunner should either be 
changed, or the runners be changed.


From my very limited point of view, the Flink runner seems to be 
spending a lot of extra time implementing a semantic guarantee that the 
Beam model explicitly doesn't support.



Best regards,
Teodor Spæren

[1]: https://beam.apache.org/documentation/runners/direct/

On Tue, Oct 27, 2020 at 12:08:51PM +0100, Teodor Spæren wrote:

Hey David,

I think I might have worded this poorly, because what I meant is that 
from what I can see in [1], the BEAM model explicitly states that 
PCollections should be treated as immutable. The direct runner also 
tests for this. Do the other runners also protect the user from 
misusing the system so? If not we have a situation where running the 
same pipeline on two different runners will yield different answers. I 
can show some examples that return different examples for the Flink 
and the Direct Runner.


I agree that a breaking existing pipelines is a no-no, but I do think 
that we could simply gate this behind an option on the Flink runner.


I also tried to search for this before, but did not find any mention 
of it, can you link me to some discussions about this in the past?


Thanks for reply :D

Best regards,
Teodor Spæren

[1]: https://beam.apache.org/documentation/programming-guide/#immutability


On Tue, Oct 27, 2020 at 11:49:45AM +0100, David Morávek wrote:

Hi Teodor,

Thanks for bringing this up. This is a known, long standing "issue".
Unfortunately there are few things we need to consider:

- As you correctly noted, the *Beam model doesn't enforce immutability* of
input / output elements, so this is the price.
- We* can not break *existing pipelines.
- Flink Runner needs to provide the *same guarantees as the Beam model*.

There are definitely some things we can do here, to make things faster:

- We can try the similar approach as HadoopIO
(HadoopInputFormatReader#isKnownImmutable), to check for known immutable
types (KV, primitives, protobuf, other known internal immutable structures).
-* If the type is immutable, we can safely reuse it.* This should cover
most of the performance costs without breaking the guarantees Beam model
provides.
- We can enable registration of custom "immutable" types via pipeline
options? (this may be an unnecessary knob, so this needs a further
discussion)

WDYT?

D.


On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren 
wrote:


Hey!

I'm a student at the University of Oslo, and I'm writing a master thesis
about the possibility of using Beam to benchmark stream processing
systems. An important factor in this is the overhead associated with
using Beam over writing code for the runner directly. [1] found that
there was a large overhead associated with using Beam, but did not
investigate where this overhead came from. I've done benchmarks and
confirmed the findings there, where for simple chains of identity
operators, Beam is 43x times slower than the Flink equivalent.

These are very simple pipelines, with custom sources that just output a
series of integers. By profiling I've found that most of the overhead
comes from serializing and deserializing. Specifically the way
TypeSerializer's, [2], is implemented in [3], where each object is
serialized and then deserialized between every operator. Looking into
the semantics of Beam, no operator should change the input, so we don't
need to do a copy here. The function in [3] could potentially be changed
to a single `return` statement.

Doing this removes 80% of the overhead in my tests. This is a very
synthetic example, but it's a low hanging fruit and might give a speed
boost to many pipelines when run on the Flink runnner. I would like to
make this my first contribution to Beam, but as the guide [4] says, I
thought I'd ask here first to see if there a is a reason not to do this.

Only objection I can see, is that it might break existing pipelines
which rely on the Flink runner saving them from not following the
immutability guarantee. I see this as a small loss as they are relying
on an implementation detail of the Flink runner.

I hope I have explained this adequately and eagerly away any feedback :)

Best regards,
Teodor Spæren

[1]: https://arxiv.org/abs/1907.08302
[2]:

Re: Possible 80% reduction in overhead for flink runner, input needed

2020-10-27 Thread Teodor Spæren

Hey David,

I think I might have worded this poorly, because what I meant is that 
from what I can see in [1], the BEAM model explicitly states that 
PCollections should be treated as immutable. The direct runner also 
tests for this. Do the other runners also protect the user from 
misusing the system so? If not we have a situation where running the 
same pipeline on two different runners will yield different answers. I 
can show some examples that return different examples for the Flink and 
the Direct Runner.


I agree that a breaking existing pipelines is a no-no, but I do think 
that we could simply gate this behind an option on the Flink runner.


I also tried to search for this before, but did not find any mention of 
it, can you link me to some discussions about this in the past?


Thanks for reply :D

Best regards,
Teodor Spæren

[1]: https://beam.apache.org/documentation/programming-guide/#immutability


On Tue, Oct 27, 2020 at 11:49:45AM +0100, David Morávek wrote:

Hi Teodor,

Thanks for bringing this up. This is a known, long standing "issue".
Unfortunately there are few things we need to consider:

- As you correctly noted, the *Beam model doesn't enforce immutability* of
input / output elements, so this is the price.
- We* can not break *existing pipelines.
- Flink Runner needs to provide the *same guarantees as the Beam model*.

There are definitely some things we can do here, to make things faster:

- We can try the similar approach as HadoopIO
(HadoopInputFormatReader#isKnownImmutable), to check for known immutable
types (KV, primitives, protobuf, other known internal immutable structures).
-* If the type is immutable, we can safely reuse it.* This should cover
most of the performance costs without breaking the guarantees Beam model
provides.
- We can enable registration of custom "immutable" types via pipeline
options? (this may be an unnecessary knob, so this needs a further
discussion)

WDYT?

D.


On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren 
wrote:


Hey!

I'm a student at the University of Oslo, and I'm writing a master thesis
about the possibility of using Beam to benchmark stream processing
systems. An important factor in this is the overhead associated with
using Beam over writing code for the runner directly. [1] found that
there was a large overhead associated with using Beam, but did not
investigate where this overhead came from. I've done benchmarks and
confirmed the findings there, where for simple chains of identity
operators, Beam is 43x times slower than the Flink equivalent.

These are very simple pipelines, with custom sources that just output a
series of integers. By profiling I've found that most of the overhead
comes from serializing and deserializing. Specifically the way
TypeSerializer's, [2], is implemented in [3], where each object is
serialized and then deserialized between every operator. Looking into
the semantics of Beam, no operator should change the input, so we don't
need to do a copy here. The function in [3] could potentially be changed
to a single `return` statement.

Doing this removes 80% of the overhead in my tests. This is a very
synthetic example, but it's a low hanging fruit and might give a speed
boost to many pipelines when run on the Flink runnner. I would like to
make this my first contribution to Beam, but as the guide [4] says, I
thought I'd ask here first to see if there a is a reason not to do this.

Only objection I can see, is that it might break existing pipelines
which rely on the Flink runner saving them from not following the
immutability guarantee. I see this as a small loss as they are relying
on an implementation detail of the Flink runner.

I hope I have explained this adequately and eagerly away any feedback :)

Best regards,
Teodor Spæren

[1]: https://arxiv.org/abs/1907.08302
[2]:
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
[3]:
https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84
[4]: https://beam.apache.org/contribute/



Re: Possible 80% reduction in overhead for flink runner, input needed

2020-10-27 Thread David Morávek
Hi Teodor,

Thanks for bringing this up. This is a known, long standing "issue".
Unfortunately there are few things we need to consider:

- As you correctly noted, the *Beam model doesn't enforce immutability* of
input / output elements, so this is the price.
- We* can not break *existing pipelines.
- Flink Runner needs to provide the *same guarantees as the Beam model*.

There are definitely some things we can do here, to make things faster:

- We can try the similar approach as HadoopIO
(HadoopInputFormatReader#isKnownImmutable), to check for known immutable
types (KV, primitives, protobuf, other known internal immutable structures).
-* If the type is immutable, we can safely reuse it.* This should cover
most of the performance costs without breaking the guarantees Beam model
provides.
- We can enable registration of custom "immutable" types via pipeline
options? (this may be an unnecessary knob, so this needs a further
discussion)

WDYT?

D.


On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren 
wrote:

> Hey!
>
> I'm a student at the University of Oslo, and I'm writing a master thesis
> about the possibility of using Beam to benchmark stream processing
> systems. An important factor in this is the overhead associated with
> using Beam over writing code for the runner directly. [1] found that
> there was a large overhead associated with using Beam, but did not
> investigate where this overhead came from. I've done benchmarks and
> confirmed the findings there, where for simple chains of identity
> operators, Beam is 43x times slower than the Flink equivalent.
>
> These are very simple pipelines, with custom sources that just output a
> series of integers. By profiling I've found that most of the overhead
> comes from serializing and deserializing. Specifically the way
> TypeSerializer's, [2], is implemented in [3], where each object is
> serialized and then deserialized between every operator. Looking into
> the semantics of Beam, no operator should change the input, so we don't
> need to do a copy here. The function in [3] could potentially be changed
> to a single `return` statement.
>
> Doing this removes 80% of the overhead in my tests. This is a very
> synthetic example, but it's a low hanging fruit and might give a speed
> boost to many pipelines when run on the Flink runnner. I would like to
> make this my first contribution to Beam, but as the guide [4] says, I
> thought I'd ask here first to see if there a is a reason not to do this.
>
> Only objection I can see, is that it might break existing pipelines
> which rely on the Flink runner saving them from not following the
> immutability guarantee. I see this as a small loss as they are relying
> on an implementation detail of the Flink runner.
>
> I hope I have explained this adequately and eagerly away any feedback :)
>
> Best regards,
> Teodor Spæren
>
> [1]: https://arxiv.org/abs/1907.08302
> [2]:
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
> [3]:
> https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84
> [4]: https://beam.apache.org/contribute/
>