[jira] [Commented] (BEAM-757) The SparkRunner should utilize the SDK's DoFnRunner instead of writing it's own.
[ https://issues.apache.org/jira/browse/BEAM-757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15747054#comment-15747054 ] ASF GitHub Bot commented on BEAM-757: - Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/1578 > The SparkRunner should utilize the SDK's DoFnRunner instead of writing it's > own. > > > Key: BEAM-757 > URL: https://issues.apache.org/jira/browse/BEAM-757 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > The SDK now provides DoFnRunner implementations, and so to avoid maintaining > against the SDK, the runner should leverage the runner API instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-757) The SparkRunner should utilize the SDK's DoFnRunner instead of writing it's own.
[ https://issues.apache.org/jira/browse/BEAM-757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15741480#comment-15741480 ] ASF GitHub Bot commented on BEAM-757: - GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/1578 [BEAM-757, BEAM-807] Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam new-do-fn Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1578.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1578 commit 5748fe9cfd9c36d92b2870acde5e071ea696ac78 Author: Sela Date: 2016-12-11T12:30:24Z Use DoFnRunner in the implementation os FlatMapFunction. Migrate to new DoFn. commit 7f73c991426e0602d318a47cd9a38a3eebd979bf Author: Sela Date: 2016-12-11T12:31:59Z Implement AggregatorFactory for Spark runner, to be used by DoFnRunner. commit b2b0d463b0cf2105248d02411a190f6406f26c69 Author: Sela Date: 2016-12-11T12:32:49Z Migrate to new DoFn. commit 00219617355e1d77d0bba6c272c0a2b3595eac61 Author: Sela Date: 2016-12-11T19:09:47Z Add a custom AssignWindows implementation. commit cd574d62825d636bb30ab0fd13172d2f8bb5cbb7 Author: Sela Date: 2016-12-12T09:33:58Z Setup and teardown DoFn. commit 9355b1ef4943b29e7d26735484aad2e63bb1d1eb Author: Sela Date: 2016-12-12T09:34:17Z Add implementation for GroupAlsoByWindow via flatMap. > The SparkRunner should utilize the SDK's DoFnRunner instead of writing it's > own. > > > Key: BEAM-757 > URL: https://issues.apache.org/jira/browse/BEAM-757 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > The SDK now provides DoFnRunner implementations, and so to avoid maintaining > against the SDK, the runner should leverage the runner API instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-757) The SparkRunner should utilize the SDK's DoFnRunner instead of writing it's own.
[ https://issues.apache.org/jira/browse/BEAM-757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15602812#comment-15602812 ] Kenneth Knowles commented on BEAM-757: -- For the {{DoFn}} switch, you could open a new issue for SparkRunner or just reference BEAM-498. I suspect it is one hopefully-easy PR. > The SparkRunner should utilize the SDK's DoFnRunner instead of writing it's > own. > > > Key: BEAM-757 > URL: https://issues.apache.org/jira/browse/BEAM-757 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > The SDK now provides DoFnRunner implementations, and so to avoid maintaining > against the SDK, the runner should leverage the runner API instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-757) The SparkRunner should utilize the SDK's DoFnRunner instead of writing it's own.
[ https://issues.apache.org/jira/browse/BEAM-757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15602671#comment-15602671 ] Amit Sela commented on BEAM-757: Yeah, I've noticed that using {{KeyedWorkItem}} means that the input is not wrapped in {{WindowedValue}} anymore (the elements are, but internally), so my {{DoFnFunction}}s signature breaks. Flink seemed to handle this by implementing separate {{DoFnOpertaor}} and {{WindowDoFnOpertaor}}. I was going for separate {{DoFnRunner}}s (hence my comment in the PR), but it definitely makes sense to implement directly. I'm actually going the way you suggested, but stopped when I hit the {{DoFnFunction}} issue. Now I think of opening a separate issue to replace {{OldDoFn}} with {{DoFn}} first, WDYT ? > The SparkRunner should utilize the SDK's DoFnRunner instead of writing it's > own. > > > Key: BEAM-757 > URL: https://issues.apache.org/jira/browse/BEAM-757 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > The SDK now provides DoFnRunner implementations, and so to avoid maintaining > against the SDK, the runner should leverage the runner API instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-757) The SparkRunner should utilize the SDK's DoFnRunner instead of writing it's own.
[ https://issues.apache.org/jira/browse/BEAM-757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15602638#comment-15602638 ] Kenneth Knowles commented on BEAM-757: -- Oh, and by the way the PR that merged that also fixed the issue with setup/teardown, and hopefully the others that you encountered. I'm hoping that its use in the DirectRunner is enough to give you a smooth path. > The SparkRunner should utilize the SDK's DoFnRunner instead of writing it's > own. > > > Key: BEAM-757 > URL: https://issues.apache.org/jira/browse/BEAM-757 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > The SDK now provides DoFnRunner implementations, and so to avoid maintaining > against the SDK, the runner should leverage the runner API instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-757) The SparkRunner should utilize the SDK's DoFnRunner instead of writing it's own.
[ https://issues.apache.org/jira/browse/BEAM-757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15602623#comment-15602623 ] Kenneth Knowles commented on BEAM-757: -- Also related is BEAM-788 for executing {{ReduceFn}} directly. > The SparkRunner should utilize the SDK's DoFnRunner instead of writing it's > own. > > > Key: BEAM-757 > URL: https://issues.apache.org/jira/browse/BEAM-757 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > The SDK now provides DoFnRunner implementations, and so to avoid maintaining > against the SDK, the runner should leverage the runner API instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-757) The SparkRunner should utilize the SDK's DoFnRunner instead of writing it's own.
[ https://issues.apache.org/jira/browse/BEAM-757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15602617#comment-15602617 ] Kenneth Knowles commented on BEAM-757: -- I'd say you can probably go ahead with it now. Then when I break things I'll have to fix them for the Spark runner :-) I've added a {{SimpleDoFnRunner}} for new {{DoFn}}, and you can use {{DoFnRunners.createDefault(Serializable, ...)}} to make the runner agnostic about new vs old {{DoFn}}. See also [the porting of the direct runner|https://github.com/apache/incubator-beam/pull/1157/commits/1919d8b3a850bd146137652546da851ee461cd28] which now works only with new {{DoFn}}, by adapting old ones. You might be able to go straight to that, without the detour through the above method. > The SparkRunner should utilize the SDK's DoFnRunner instead of writing it's > own. > > > Key: BEAM-757 > URL: https://issues.apache.org/jira/browse/BEAM-757 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > The SDK now provides DoFnRunner implementations, and so to avoid maintaining > against the SDK, the runner should leverage the runner API instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-757) The SparkRunner should utilize the SDK's DoFnRunner instead of writing it's own.
[ https://issues.apache.org/jira/browse/BEAM-757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15602527#comment-15602527 ] Amit Sela commented on BEAM-757: [~kenn] should I wait for the refactor to finish ? or use {{SimpleOldDoFnRunner}} for now ? I guess I'm asking how far along are you ? > The SparkRunner should utilize the SDK's DoFnRunner instead of writing it's > own. > > > Key: BEAM-757 > URL: https://issues.apache.org/jira/browse/BEAM-757 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > The SDK now provides DoFnRunner implementations, and so to avoid maintaining > against the SDK, the runner should leverage the runner API instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-757) The SparkRunner should utilize the SDK's DoFnRunner instead of writing it's own.
[ https://issues.apache.org/jira/browse/BEAM-757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15584706#comment-15584706 ] Amit Sela commented on BEAM-757: I have something (working!) using the {{SimpleDoFnRunner}} here: https://github.com/amitsela/incubator-beam/blob/BEAM-757-WIP/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkDoFnRunner.java I had to expose OldDoFn and OutputManager for that (https://github.com/amitsela/incubator-beam/blob/BEAM-757-WIP/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java). As for OldDoFn - I had to call setup() and teardown() - the DoFnRunner didn't seem to do that, maybe it should ? (Spark might need to override this anyway for teardown after finishBundle, but still). OutputManager needed to be exposed to allow the runner to access the output, and in Spark's case clear it in-between element processing because Spark partitions (~bundles) can be quite big. Other then that, it was pretty straight forward for me to use it. I'll PR once pending PRs are merged. > The SparkRunner should utilize the SDK's DoFnRunner instead of writing it's > own. > > > Key: BEAM-757 > URL: https://issues.apache.org/jira/browse/BEAM-757 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > The SDK now provides DoFnRunner implementations, and so to avoid maintaining > against the SDK, the runner should leverage the runner API instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-757) The SparkRunner should utilize the SDK's DoFnRunner instead of writing it's own.
[ https://issues.apache.org/jira/browse/BEAM-757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15583565#comment-15583565 ] Kenneth Knowles commented on BEAM-757: -- Just a note here that I'm going to have to refactor them a bit to run {{DoFn}} directly instead of via wrapping to {{OldDoFn}}. So if there are things you see in {{DoFnRunner}} that seem like leftovers from before Beam, or other concerns, let me know. > The SparkRunner should utilize the SDK's DoFnRunner instead of writing it's > own. > > > Key: BEAM-757 > URL: https://issues.apache.org/jira/browse/BEAM-757 > Project: Beam > Issue Type: Improvement > Components: runner-spark >Reporter: Amit Sela >Assignee: Amit Sela > > The SDK now provides DoFnRunner implementations, and so to avoid maintaining > against the SDK, the runner should leverage the runner API instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)