[jira] [Commented] (BEAM-1146) Decrease spark runner startup overhead
[ https://issues.apache.org/jira/browse/BEAM-1146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15767378#comment-15767378 ] ASF GitHub Bot commented on BEAM-1146: -- GitHub user aviemzur opened a pull request: https://github.com/apache/incubator-beam/pull/1674 [BEAM-1146] Decrease spark runner startup overhead Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- Replace finding all `Source` and `Coder` implementations for serialization registration with wrapper classes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aviemzur/incubator-beam decrease-spark-runner-startup-overhead Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1674.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1674 commit 8501cdc88ee9c89f643120e34381ec9bc2562965 Author: Aviem ZurDate: 2016-12-21T15:49:34Z [BEAM-1146] Decrease spark runner startup overhead Replace finding all `Source` and `Coder` implementations for serialization registration with wrapper classes. > Decrease spark runner startup overhead > -- > > Key: BEAM-1146 > URL: https://issues.apache.org/jira/browse/BEAM-1146 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Aviem Zur >Assignee: Aviem Zur > > BEAM-921 introduced a lazy singleton instantiated once in each machine > (driver & executors) which utilizes reflection to find all subclasses of > Source and Coder > While this is beneficial in it's own right, the change added about one minute > of overhead in spark runner startup time (which cause the first job/stage to > take up to a minute). > The change is in class {{BeamSparkRunnerRegistrator}} > The reason reflection (specifically reflections library) was used here is > because there is no current way of knowing all the source and coder classes > at runtime. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-1146) Decrease spark runner startup overhead
[ https://issues.apache.org/jira/browse/BEAM-1146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15760560#comment-15760560 ] Amit Sela commented on BEAM-1146: - As for {{Spark}} and {{Kryo}} - we can't change Spark dependencies easily since Kryo is used via Twitter's {{Chill}} (Kryo in Scala). I think this is a larger question about serialization in Beam: # User Data - Using Coders (explicit) avoids serialization issues that might occur due to different types of user data (e.g., for runners using Kryo and a pipeline processing data that is not Kryo serializable) and provides better performance than (implicit) reflection-based serialization frameworks (knowing what you serialize ahead is always more efficient, right ?). # The "Closure" - This is everything that is being sent to the worker to process "bundles"; such as Coders, Sources, etc. I'm suggesting we "tag" them in Beam so it's clear it's expected to be "shipped" as part of the processing task. It seems that while user-data is handled properly in Beam, the rest is still somewhat undecided, which might be fine since it integrates with runners (not pipelines like user data), but I wonder if we can do more here.. WDYT ? [~aviemzur] suggested that the Spark runner will wrap Sources and Coders so it'll always klnow to serialize them with the {{JavaSerializer}}, which is a great solution for the Spark runner. > Decrease spark runner startup overhead > -- > > Key: BEAM-1146 > URL: https://issues.apache.org/jira/browse/BEAM-1146 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Aviem Zur >Assignee: Amit Sela > > BEAM-921 introduced a lazy singleton instantiated once in each machine > (driver & executors) which utilizes reflection to find all subclasses of > Source and Coder > While this is beneficial in it's own right, the change added about one minute > of overhead in spark runner startup time (which cause the first job/stage to > take up to a minute). > The change is in class {{BeamSparkRunnerRegistrator}} > The reason reflection (specifically reflections library) was used here is > because there is no current way of knowing all the source and coder classes > at runtime. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-1146) Decrease spark runner startup overhead
[ https://issues.apache.org/jira/browse/BEAM-1146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15745479#comment-15745479 ] Amit Sela commented on BEAM-1146: - [~davor] you have thoughts about this ? I know there are thoughts about how to deal with serialization in {{Beam}} in general, but this is actually a good example of the problems we could face (and {{Apex}} is actually suffering from {{Kryo}} issues lately too). The Spark runner uses {{Kryo}} as it is faster than Java serialization, but for data, the runner tries to use coders and shuffle bytes (preventing {{Kryo}} from encoding/decoding). Spark classes that need to pass to workers are clearly Kryo-Serializable, however Beam classes are not always. The way I see it, we can either: # Lookup all SDK classes that are shipped to cluster (this presents a startup overhead). # Use annotations in Beam to "tag" classes that are supposed to ship to workers (e.g., {{Sources}} and {{Coders}}). # The Spark runner (and other runners that use {{Kryo}} for instance) will provide a {{Kryo}} registration mechanism for users to register any {{Source}}/{{Coder}} they add. Option 1 is too slow, and 3 is not very elegant. I was wondering about 2 and how you'd feel about annotating Classes in the SDK as {{@ShipsToCluster}} or some much-better name ;) > Decrease spark runner startup overhead > -- > > Key: BEAM-1146 > URL: https://issues.apache.org/jira/browse/BEAM-1146 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Aviem Zur >Assignee: Amit Sela > > BEAM-921 introduced a lazy singleton instantiated once in each machine > (driver & executors) which utilizes reflection to find all subclasses of > Source and Coder > While this is beneficial in it's own right, the change added about one minute > of overhead in spark runner startup time (which cause the first job/stage to > take up to a minute). > The change is in class {{BeamSparkRunnerRegistrator}} > The reason reflection (specifically reflections library) was used here is > because there is no current way of knowing all the source and coder classes > at runtime. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-1146) Decrease spark runner startup overhead
[ https://issues.apache.org/jira/browse/BEAM-1146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15745427#comment-15745427 ] Aviem Zur commented on BEAM-1146: - Possible solutions: # Limiting reflections search criteria to a few specific packages cuts the time down to 1 second. However, custom user Coders and Sources may not fall within these packages, and could encounter Kryo serialization errors. # Adding an annotation to all Coders and sources, so we can register them specifically, similar solutions to similar problems exist in beam code today and utilize ServiceLoader and annotations (For example to find implementations of {{IOChannelFactoryRegistrar}}). # Some combination of the previous 2 solutions. > Decrease spark runner startup overhead > -- > > Key: BEAM-1146 > URL: https://issues.apache.org/jira/browse/BEAM-1146 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Aviem Zur >Assignee: Amit Sela > > BEAM-921 introduced a lazy singleton instantiated once in each machine > (driver & executors) which utilizes reflection to find all subclasses of > Source and Coder > While this is beneficial in it's own right, the change added about one minute > of overhead in spark runner startup time (which cause the first job/stage to > take up to a minute). > The change is in class {{BeamSparkRunnerRegistrator}} > The reason reflection (specifically reflections library) was used here is > because there is no current way of knowing all the source and coder classes > at runtime. -- This message was sent by Atlassian JIRA (v6.3.4#6332)