[jira] (BEAM-843) Use New DoFn Directly in Flink Runner
Title: Message Title Aljoscha Krettek assigned an issue to Jingsong Lee Beam / BEAM-843 Use New DoFn Directly in Flink Runner Change By: Aljoscha Krettek Assignee: Jingsong Lee Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (BEAM-843) Use New DoFn Directly in Flink Runner
Title: Message Title Aljoscha Krettek commented on BEAM-843 Re: Use New DoFn Directly in Flink Runner Jingsong Lee: I assigned the issue to you and am now merging your PR. In the future it would be good to comment on an issue before you start implementing to avoid duplicate work. Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (BEAM-1346) Drop Late Data in ReduceFnRunner
Title: Message Title Aljoscha Krettek commented on BEAM-1346 Re: Drop Late Data in ReduceFnRunner Kenneth Knowles, this is related to BEAM-241 . Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (BEAM-1346) Drop Late Data in ReduceFnRunner
Title: Message Title Aljoscha Krettek created an issue Beam / BEAM-1346 Drop Late Data in ReduceFnRunner Issue Type: Bug Affects Versions: 0.5.0 Assignee: Kenneth Knowles Components: runner-core Created: 30/Jan/17 11:39 Priority: Major Reporter: Aljoscha Krettek I think these two commits recently broke late-data dropping for the Flink Runner (and maybe for other runners as well): https://github.com/apache/beam/commit/2b26ec8 https://github.com/apache/beam/commit/8989473 It boils down to the LateDataDroppingDoFnRunner not being used anymore because DoFnRunners.lateDataDroppingRunner() is not called anymore when a DoFn is a ReduceFnExecutor (because that interface was removed). Maybe we should think about dropping late data in another place, my suggestion is ReduceFnRunner but that's open for discussion.
[jira] (BEAM-1346) Drop Late Data in ReduceFnRunner
Title: Message Title Aljoscha Krettek assigned an issue to Unassigned Beam / BEAM-1346 Drop Late Data in ReduceFnRunner Change By: Aljoscha Krettek Assignee: Kenneth Knowles Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[2/2] beam git commit: This closes #1787
This closes #1787 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/582c4a8a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/582c4a8a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/582c4a8a Branch: refs/heads/master Commit: 582c4a8a4dd33d698beae2990d682848b593de21 Parents: 34b4a6d 1dcda72 Author: Aljoscha Krettek Authored: Mon Jan 30 13:48:19 2017 +0100 Committer: Aljoscha Krettek Committed: Mon Jan 30 13:48:19 2017 +0100 -- .../GroupAlsoByWindowViaWindowSetNewDoFn.java | 156 +++ .../wrappers/streaming/DoFnOperator.java| 69 .../wrappers/streaming/WindowDoFnOperator.java | 143 + 3 files changed, 264 insertions(+), 104 deletions(-) --
[jira] (BEAM-843) Use New DoFn Directly in Flink Runner
Title: Message Title ASF GitHub Bot commented on BEAM-843 Re: Use New DoFn Directly in Flink Runner Github user asfgit closed the pull request at: https://github.com/apache/beam/pull/1787 Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[1/2] beam git commit: [BEAM-843] Use New DoFn Directly in Flink Runner
Repository: beam Updated Branches: refs/heads/master 34b4a6d9d -> 582c4a8a4 [BEAM-843] Use New DoFn Directly in Flink Runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1dcda726 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1dcda726 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1dcda726 Branch: refs/heads/master Commit: 1dcda72686fa0a9a6e2033939aa53f7a7a31d548 Parents: 34b4a6d Author: JingsongLi Authored: Wed Jan 18 11:34:06 2017 +0800 Committer: Aljoscha Krettek Committed: Mon Jan 30 12:04:09 2017 +0100 -- .../GroupAlsoByWindowViaWindowSetNewDoFn.java | 156 +++ .../wrappers/streaming/DoFnOperator.java| 69 .../wrappers/streaming/WindowDoFnOperator.java | 143 + 3 files changed, 264 insertions(+), 104 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/1dcda726/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java new file mode 100644 index 000..cff6e00 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.util.Collection; +import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; +import org.apache.beam.runners.core.triggers.TriggerStateMachines; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateInternalsFactory; +import org.apache.beam.sdk.util.state.TimerInternalsFactory; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; + +/** + * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the + * {@link ReduceFnRunner}. + */ +@SystemDoFnInternal +public class GroupAlsoByWindowViaWindowSetNewDoFn< +K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem> +extends DoFn> { + + private static final long serialVersionUID = 1L; + + public static + DoFn, KV> create( + WindowingStrategy strategy, + StateInternalsFactory stateInternalsFactory, + TimerInternalsFactory timerInternalsFactory, + SideInputReader sideInputReader, + SystemReduceFn reduceFn, + DoFnRunners.OutputManager outputManager, + TupleTag> mainTag) { +return new GroupAlsoByWindowViaWindowSetNewDoFn<>( +strategy, stateInternalsFactory, timerInternalsFactory, sideInputReader, +reduceFn, outputManager, mainTag); + } + + protected final Aggregator droppedDueToClosedWindow = + createAggregator( + GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs()); + protected final Aggregator droppedDueToLateness = + createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs()); + private final WindowingStrategy windowingStrategy; + private SystemReduceFn reduceFn; + private transient StateInternalsFactory stateInternalsFactory; + private transient TimerInternalsFactory timerInternalsFactory; + private transient SideInput
[GitHub] beam pull request #1787: [BEAM-843] Flink DoFnOpeartor use new DoFn
Github user asfgit closed the pull request at: https://github.com/apache/beam/pull/1787 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] (BEAM-843) Use New DoFn Directly in Flink Runner
Title: Message Title Aljoscha Krettek closed an issue as Fixed Beam / BEAM-843 Use New DoFn Directly in Flink Runner Change By: Aljoscha Krettek Resolution: Fixed Fix Version/s: 0.5.0 Status: Open Closed Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (BEAM-1335) ValueState could use an initial/default value
Title: Message Title Aljoscha Krettek commented on BEAM-1335 Re: ValueState could use an initial/default value In Flink, we actually have default values for StateDescriptors (roughly the equivalent of StateSpec) but we recently decided to deprecate that. My argument was that having a default value complicates the StateSpec because we would have to keep a user object in there and make sure that it is serialized/deserialized using the correct Coder when the StateSpec itself is serialised using Java Serialization. Just a data point to consider. I'm not against adding this for Beam if we think it makes sense. Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (BEAM-1346) Drop Late Data in ReduceFnRunner
Title: Message Title Kenneth Knowles commented on BEAM-1346 Re: Drop Late Data in ReduceFnRunner Yikes, that is a troubling lack of test coverage that it could go away without being caught. I probably reviewed it Previously, because GroupAlsoByWindowViaWindowSets followed the same code path as a user-defined DoFn, the way we would distinguish it was checking for ReduceFnExecutor (technically, I think ReduceFnExecutor only applied if the GABW took KeyedWorkItem as input, which was always true for streaming runners, kind of a random collection of dependencies). The right way to fix this IMO is for the flink WindowDoFnOperator to explicitly create a lateDataDroppingRunner() since that code already knows it is using GABW. Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (BEAM-1347) Basic Java harness capable of understanding process bundle tasks and sending data over the Fn Api
Title: Message Title Luke Cwik created an issue Beam / BEAM-1347 Basic Java harness capable of understanding process bundle tasks and sending data over the Fn Api Issue Type: Improvement Assignee: Luke Cwik Components: beam-model-fn-api Created: 30/Jan/17 16:25 Priority: Major Reporter: Luke Cwik Create a basic Java harness capable of understanding process bundle requests and able to stream data over the Fn Api. Overview: https://s.apache.org/beam-fn-api Add Comment
[jira] (BEAM-1348) Model the Fn Api
Title: Message Title Luke Cwik created an issue Beam / BEAM-1348 Model the Fn Api Issue Type: Improvement Assignee: Luke Cwik Components: beam-model-fn-api Created: 30/Jan/17 16:26 Priority: Major Reporter: Luke Cwik Create a proto representation of the services and data types required to execute the Fn Api. Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (BEAM-1346) Drop Late Data in ReduceFnRunner
Title: Message Title Daniel Halperin commented on BEAM-1346 Re: Drop Late Data in ReduceFnRunner The commits referenced are in the 0.5.0-RC1 candidate – so I'm guessing this should be release blocking. Aljoscha Krettek can you please confirm & add this information to the vote thread? Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (BEAM-843) Use New DoFn Directly in Flink Runner
Title: Message Title Daniel Halperin updated an issue Beam / BEAM-843 Use New DoFn Directly in Flink Runner Change By: Daniel Halperin Fix Version/s: 0.5.0 Fix Version/s: 0.6.0 Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (BEAM-843) Use New DoFn Directly in Flink Runner
Title: Message Title Daniel Halperin commented on BEAM-843 Re: Use New DoFn Directly in Flink Runner Aljoscha Krettek, is this needed for 0.5.0? You marked it as "FV 0.5.0" although if you merged it today, it will not be on the 0.5.0 release branch and so not in that release. Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (BEAM-843) Use New DoFn Directly in Flink Runner
Title: Message Title Daniel Halperin reopened an issue Reopen in order to set fix version Beam / BEAM-843 Use New DoFn Directly in Flink Runner Change By: Daniel Halperin Status: Closed Reopened Resolution: Fixed Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[GitHub] beam-site pull request #135: Use PMC instead of PPMC
GitHub user jbonofre opened a pull request: https://github.com/apache/beam-site/pull/135 Use PMC instead of PPMC Minor change to reflect the graduation: we don't have PPMC anymore, but PMC. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jbonofre/beam-site PPMC_REMOVE Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam-site/pull/135.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #135 commit f9110658c09958012bba1caf7d3e94d89988f548 Author: Jean-Baptiste Onofré Date: 2017-01-30T17:02:48Z Use PMC instead of PPMC --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/3] beam-site git commit: Regenerate website
Regenerate website Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/38c9a1e6 Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/38c9a1e6 Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/38c9a1e6 Branch: refs/heads/asf-site Commit: 38c9a1e6f06e86bb0b8406c7b2688b4598ad29a7 Parents: f911065 Author: Davor Bonaci Authored: Mon Jan 30 09:14:01 2017 -0800 Committer: Davor Bonaci Committed: Mon Jan 30 09:14:01 2017 -0800 -- content/contribute/release-guide/index.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam-site/blob/38c9a1e6/content/contribute/release-guide/index.html -- diff --git a/content/contribute/release-guide/index.html b/content/contribute/release-guide/index.html index 624df88..5ef3dfd 100644 --- a/content/contribute/release-guide/index.html +++ b/content/contribute/release-guide/index.html @@ -580,7 +580,7 @@ The complete staging area is available for your review, which includes: * source code tag "v1.2.3-RC3" [5], * website pull request listing the release and publishing the API reference manual [6]. -The vote will be open for at least 72 hours. It is adopted by majority approval, with at least 3 PPMC affirmative votes. +The vote will be open for at least 72 hours. It is adopted by majority approval, with at least 3 PMC affirmative votes. Thanks, Release Manager
[3/3] beam-site git commit: This closes #135
This closes #135 Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/fedd4ab9 Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/fedd4ab9 Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/fedd4ab9 Branch: refs/heads/asf-site Commit: fedd4ab9d0294de50f4a47cc91cebda9daf8db75 Parents: f1d931e 38c9a1e Author: Davor Bonaci Authored: Mon Jan 30 09:14:01 2017 -0800 Committer: Davor Bonaci Committed: Mon Jan 30 09:14:01 2017 -0800 -- content/contribute/release-guide/index.html | 2 +- src/contribute/release-guide.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) --
[GitHub] beam-site pull request #135: Use PMC instead of PPMC
Github user asfgit closed the pull request at: https://github.com/apache/beam-site/pull/135 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/3] beam-site git commit: Use PMC instead of PPMC
Repository: beam-site Updated Branches: refs/heads/asf-site f1d931ea9 -> fedd4ab9d Use PMC instead of PPMC Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/f9110658 Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/f9110658 Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/f9110658 Branch: refs/heads/asf-site Commit: f9110658c09958012bba1caf7d3e94d89988f548 Parents: f1d931e Author: Jean-Baptiste Onofré Authored: Mon Jan 30 18:02:48 2017 +0100 Committer: Jean-Baptiste Onofré Committed: Mon Jan 30 18:02:48 2017 +0100 -- src/contribute/release-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam-site/blob/f9110658/src/contribute/release-guide.md -- diff --git a/src/contribute/release-guide.md b/src/contribute/release-guide.md index bb6a95f..c2fea8f 100644 --- a/src/contribute/release-guide.md +++ b/src/contribute/release-guide.md @@ -319,7 +319,7 @@ Start the review-and-vote thread on the dev@ mailing list. Hereâs an email tem * source code tag "v1.2.3-RC3" [5], * website pull request listing the release and publishing the API reference manual [6]. -The vote will be open for at least 72 hours. It is adopted by majority approval, with at least 3 PPMC affirmative votes. +The vote will be open for at least 72 hours. It is adopted by majority approval, with at least 3 PMC affirmative votes. Thanks, Release Manager
[GitHub] beam pull request #1869: [BEAM-886] Some performance improvements to NewDoFn
GitHub user sb2nov opened a pull request: https://github.com/apache/beam/pull/1869 [BEAM-886] Some performance improvements to NewDoFn - Add types to some of variables for performance - Do minimal work in the process function by stashing the placeholders to be replaced. R: @robertwb PTAL CC: @aaltay --- Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [x] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [x] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/sb2nov/incubator-beam BEAM-886-Add-NewDoFn-class-1-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/1869.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1869 commit 3427ee1da41ec1d42418615c1eac99be7ffe22a7 Author: Sourabh Bajaj Date: 2017-01-30T19:16:16Z Some performance improvements to NewDoFn --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] (BEAM-886) Support new DoFn in Python SDK
Title: Message Title ASF GitHub Bot commented on BEAM-886 Re: Support new DoFn in Python SDK GitHub user sb2nov opened a pull request: https://github.com/apache/beam/pull/1869 BEAM-886 Some performance improvements to NewDoFn Add types to some of variables for performance Do minimal work in the process function by stashing the placeholders to be replaced. R: @robertwb PTAL CC: @aaltay — Be sure to do all of the following to help us incorporate your contribution quickly and easily: [x] Make sure the PR title is formatted like: `BEAM- Description of pull request` [x] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). [x] Replace `` in the title with the actual Jira issue number, if there is one. [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). — You can merge this pull request into a Git repository by running: $ git pull https://github.com/sb2nov/incubator-beam BEAM-886-Add-NewDoFn-class-1-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/1869.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1869 commit 3427ee1da41ec1d42418615c1eac99be7ffe22a7 Author: Sourabh Bajaj Date: 2017-01-30T19:16:16Z Some performance improvements to NewDoFn
[GitHub] beam pull request #1870: Merge master into python-sdk
GitHub user aaltay opened a pull request: https://github.com/apache/beam/pull/1870 Merge master into python-sdk You can merge this pull request into a Git repository by running: $ git pull https://github.com/aaltay/incubator-beam merge2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/1870.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1870 commit 1dcda72686fa0a9a6e2033939aa53f7a7a31d548 Author: JingsongLi Date: 2017-01-18T03:34:06Z [BEAM-843] Use New DoFn Directly in Flink Runner commit 582c4a8a4dd33d698beae2990d682848b593de21 Author: Aljoscha Krettek Date: 2017-01-30T12:48:19Z This closes #1787 commit c97dd6cbf05e5670eba2c7180c9b6e059418a2fd Author: Ahmet Altay Date: 2017-01-30T20:14:34Z Merge remote-tracking branch 'origin/master' into merge2 commit 408d9b6b8e8fe22637f88c7553980b068468b5f8 Author: Ahmet Altay Date: 2017-01-30T20:21:28Z Update the version.py file to match the latest beam version. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/3] beam git commit: [BEAM-843] Use New DoFn Directly in Flink Runner
[BEAM-843] Use New DoFn Directly in Flink Runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4aaaf8fb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4aaaf8fb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4aaaf8fb Branch: refs/heads/python-sdk Commit: 4aaaf8fb94222e6e283fa79d9c7dc8d9a730d278 Parents: 27cf68e Author: JingsongLi Authored: Wed Jan 18 11:34:06 2017 +0800 Committer: Davor Bonaci Committed: Mon Jan 30 12:38:38 2017 -0800 -- .../GroupAlsoByWindowViaWindowSetNewDoFn.java | 156 +++ .../wrappers/streaming/DoFnOperator.java| 69 .../wrappers/streaming/WindowDoFnOperator.java | 143 + 3 files changed, 264 insertions(+), 104 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/4aaaf8fb/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java new file mode 100644 index 000..cff6e00 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.util.Collection; +import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; +import org.apache.beam.runners.core.triggers.TriggerStateMachines; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateInternalsFactory; +import org.apache.beam.sdk.util.state.TimerInternalsFactory; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; + +/** + * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the + * {@link ReduceFnRunner}. + */ +@SystemDoFnInternal +public class GroupAlsoByWindowViaWindowSetNewDoFn< +K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem> +extends DoFn> { + + private static final long serialVersionUID = 1L; + + public static + DoFn, KV> create( + WindowingStrategy strategy, + StateInternalsFactory stateInternalsFactory, + TimerInternalsFactory timerInternalsFactory, + SideInputReader sideInputReader, + SystemReduceFn reduceFn, + DoFnRunners.OutputManager outputManager, + TupleTag> mainTag) { +return new GroupAlsoByWindowViaWindowSetNewDoFn<>( +strategy, stateInternalsFactory, timerInternalsFactory, sideInputReader, +reduceFn, outputManager, mainTag); + } + + protected final Aggregator droppedDueToClosedWindow = + createAggregator( + GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs()); + protected final Aggregator droppedDueToLateness = + createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs()); + private final WindowingStrategy windowingStrategy; + private SystemReduceFn reduceFn; + private transient StateInternalsFactory stateInternalsFactory; + private transient TimerInternalsFactory timerInternalsFactory; + private transient SideInputReader sideInputReader; + private transient DoFnRunners.OutputManager outputMan
[3/3] beam git commit: This closes #1870
This closes #1870 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f29527f6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f29527f6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f29527f6 Branch: refs/heads/python-sdk Commit: f29527f68b8de92caf18b183e3a7e97eb190f67e Parents: 27cf68e 38575a1 Author: Davor Bonaci Authored: Mon Jan 30 12:38:53 2017 -0800 Committer: Davor Bonaci Committed: Mon Jan 30 12:38:53 2017 -0800 -- .../GroupAlsoByWindowViaWindowSetNewDoFn.java | 156 +++ .../wrappers/streaming/DoFnOperator.java| 69 .../wrappers/streaming/WindowDoFnOperator.java | 143 + sdks/python/apache_beam/version.py | 3 +- 4 files changed, 265 insertions(+), 106 deletions(-) --
[1/3] beam git commit: Update the version.py file to match the latest beam version.
Repository: beam Updated Branches: refs/heads/python-sdk 27cf68ee7 -> f29527f68 Update the version.py file to match the latest beam version. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/38575a14 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/38575a14 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/38575a14 Branch: refs/heads/python-sdk Commit: 38575a14e2b17c93de2d0e27fe6213daa7101695 Parents: 4aaaf8f Author: Ahmet Altay Authored: Mon Jan 30 12:21:28 2017 -0800 Committer: Davor Bonaci Committed: Mon Jan 30 12:38:38 2017 -0800 -- sdks/python/apache_beam/version.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/38575a14/sdks/python/apache_beam/version.py -- diff --git a/sdks/python/apache_beam/version.py b/sdks/python/apache_beam/version.py index 60d9634..12509fb 100644 --- a/sdks/python/apache_beam/version.py +++ b/sdks/python/apache_beam/version.py @@ -21,7 +21,7 @@ import re -__version__ = '0.3.0-incubating.dev' # TODO: PEP 440 and incubating suffix +__version__ = '0.6.0.dev' # The following utilities are legacy code from the Maven integration; @@ -40,7 +40,6 @@ def get_version_from_pom(): search = pattern.search(pom) version = search.group(1) version = version.replace("-SNAPSHOT", ".dev") -# TODO: PEP 440 and incubating suffix return version
[1/2] beam git commit: Updates places in SDK that creates thread pools.
Repository: beam Updated Branches: refs/heads/python-sdk f29527f68 -> 475707f0f Updates places in SDK that creates thread pools. Moves ThreadPool creation to a util function. Records and resets logging level due to this being reset by apitools when used with a ThreadPool. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/51afc1cc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/51afc1cc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/51afc1cc Branch: refs/heads/python-sdk Commit: 51afc1ccfe78a0657b5f9bc139d1d4e7938ed672 Parents: f29527f Author: Chamikara Jayalath Authored: Sat Jan 28 08:54:33 2017 -0800 Committer: Davor Bonaci Committed: Mon Jan 30 12:43:37 2017 -0800 -- sdks/python/apache_beam/internal/util.py | 33 ++ sdks/python/apache_beam/io/filebasedsource.py | 17 +++ sdks/python/apache_beam/io/fileio.py | 11 ++-- 3 files changed, 40 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/51afc1cc/sdks/python/apache_beam/internal/util.py -- diff --git a/sdks/python/apache_beam/internal/util.py b/sdks/python/apache_beam/internal/util.py index 2d12d49..5b31e88 100644 --- a/sdks/python/apache_beam/internal/util.py +++ b/sdks/python/apache_beam/internal/util.py @@ -17,6 +17,11 @@ """Utility functions used throughout the package.""" +import logging +from multiprocessing.pool import ThreadPool +import threading +import weakref + class ArgumentPlaceholder(object): """A place holder object replacing PValues in argument lists. @@ -92,3 +97,31 @@ def insert_values_in_args(args, kwargs, values): (k, v_iter.next()) if isinstance(v, ArgumentPlaceholder) else (k, v) for k, v in sorted(kwargs.iteritems())) return (new_args, new_kwargs) + + +def run_using_threadpool(fn_to_execute, inputs, pool_size): + """Runs the given function on given inputs using a thread pool. + + Args: +fn_to_execute: Function to execute +inputs: Inputs on which given function will be executed in parallel. +pool_size: Size of thread pool. + Returns: +Results retrieved after executing the given function on given inputs. + """ + + # ThreadPool crashes in old versions of Python (< 2.7.5) if created + # from a child thread. (http://bugs.python.org/issue10015) + if not hasattr(threading.current_thread(), '_children'): +threading.current_thread()._children = weakref.WeakKeyDictionary() + pool = ThreadPool(min(pool_size, len(inputs))) + try: +# We record and reset logging level here since 'apitools' library Beam +# depends on updates the logging level when used with a threadpool - +# https://github.com/google/apitools/issues/141 +# TODO: Remove this once above issue in 'apitools' is fixed. +old_level = logging.getLogger().level +return pool.map(fn_to_execute, inputs) + finally: +pool.terminate() +logging.getLogger().setLevel(old_level) http://git-wip-us.apache.org/repos/asf/beam/blob/51afc1cc/sdks/python/apache_beam/io/filebasedsource.py -- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index 1bfde25..582d673 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -26,11 +26,9 @@ For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``. """ import random -import threading -import weakref -from multiprocessing.pool import ThreadPool from apache_beam.internal import pickler +from apache_beam.internal import util from apache_beam.io import concat_source from apache_beam.io import fileio from apache_beam.io import iobase @@ -158,16 +156,9 @@ class FileBasedSource(iobase.BoundedSource): return [fileio.ChannelFactory.size_in_bytes(file_names[0])] else: if pattern is None: -# ThreadPool crashes in old versions of Python (< 2.7.5) if created -# from a child thread. (http://bugs.python.org/issue10015) -if not hasattr(threading.current_thread(), '_children'): - threading.current_thread()._children = weakref.WeakKeyDictionary() -pool = ThreadPool( -min(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION, len(file_names))) -try: - return pool.map(fileio.ChannelFactory.size_in_bytes, file_names) -finally: - pool.terminate() +return util.run_using_threadpool( +fileio.ChannelFactory.size_in_bytes, file_names, +MAX_NUM_THREADS_FOR_SIZE_ESTIMATION) else: file_sizes = fileio.ChannelFactory.size_of_files_in_glob(pattern,
[2/2] beam git commit: This closes #1866
This closes #1866 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/475707f0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/475707f0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/475707f0 Branch: refs/heads/python-sdk Commit: 475707f0ffd7bc82ca78fa3f3c9e78f661478b99 Parents: f29527f 51afc1c Author: Davor Bonaci Authored: Mon Jan 30 12:43:48 2017 -0800 Committer: Davor Bonaci Committed: Mon Jan 30 12:43:48 2017 -0800 -- sdks/python/apache_beam/internal/util.py | 33 ++ sdks/python/apache_beam/io/filebasedsource.py | 17 +++ sdks/python/apache_beam/io/fileio.py | 11 ++-- 3 files changed, 40 insertions(+), 21 deletions(-) --
[2/2] beam git commit: This closes #1863
This closes #1863 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1390699c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1390699c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1390699c Branch: refs/heads/python-sdk Commit: 1390699c37596ebe34a773627660b6c496375a8e Parents: 475707f e02ddac Author: Davor Bonaci Authored: Mon Jan 30 12:45:03 2017 -0800 Committer: Davor Bonaci Committed: Mon Jan 30 12:45:03 2017 -0800 -- sdks/python/apache_beam/io/bigquery_test.py | 26 1 file changed, 18 insertions(+), 8 deletions(-) --
[1/2] beam git commit: Add mock time to slow bigquery unit tests.
Repository: beam Updated Branches: refs/heads/python-sdk 475707f0f -> 1390699c3 Add mock time to slow bigquery unit tests. Unit tests, testing retries does not need to use real time. This change reduces the total tox time for unit tests from 235 seconds to 73 seconds locally. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e02ddac3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e02ddac3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e02ddac3 Branch: refs/heads/python-sdk Commit: e02ddac308b8b1ea0bd0cb0ae4f9ba4908a50595 Parents: 475707f Author: Ahmet Altay Authored: Fri Jan 27 17:35:24 2017 -0800 Committer: Davor Bonaci Committed: Mon Jan 30 12:44:55 2017 -0800 -- sdks/python/apache_beam/io/bigquery_test.py | 26 1 file changed, 18 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/e02ddac3/sdks/python/apache_beam/io/bigquery_test.py -- diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py index b8682d1..14eb035 100644 --- a/sdks/python/apache_beam/io/bigquery_test.py +++ b/sdks/python/apache_beam/io/bigquery_test.py @@ -539,7 +539,8 @@ class TestBigQueryReader(unittest.TestCase): class TestBigQueryWriter(unittest.TestCase): - def test_no_table_and_create_never(self): + @mock.patch('time.sleep', return_value=None) + def test_no_table_and_create_never(self, patched_time_sleep): client = mock.Mock() client.tables.Get.side_effect = HttpError( response={'status': '404'}, url='', content='') @@ -572,7 +573,9 @@ class TestBigQueryWriter(unittest.TestCase): self.assertTrue(client.tables.Get.called) self.assertTrue(client.tables.Insert.called) - def test_no_table_and_create_if_needed_and_no_schema(self): + @mock.patch('time.sleep', return_value=None) + def test_no_table_and_create_if_needed_and_no_schema( + self, patched_time_sleep): client = mock.Mock() client.tables.Get.side_effect = HttpError( response={'status': '404'}, url='', content='') @@ -587,7 +590,9 @@ class TestBigQueryWriter(unittest.TestCase): 'Table project:dataset.table requires a schema. None can be inferred ' 'because the table does not exist.') - def test_table_not_empty_and_write_disposition_empty(self): + @mock.patch('time.sleep', return_value=None) + def test_table_not_empty_and_write_disposition_empty( + self, patched_time_sleep): client = mock.Mock() client.tables.Get.return_value = bigquery.Table( tableReference=bigquery.TableReference( @@ -712,7 +717,8 @@ class TestBigQueryWrapper(unittest.TestCase): wrapper._delete_dataset('', '') self.assertTrue(client.datasets.Delete.called) - def test_delete_dataset_retries_fail(self): + @mock.patch('time.sleep', return_value=None) + def test_delete_dataset_retries_fail(self, patched_time_sleep): client = mock.Mock() client.datasets.Delete.side_effect = ValueError("Cannot delete") wrapper = beam.io.bigquery.BigQueryWrapper(client) @@ -730,7 +736,8 @@ class TestBigQueryWrapper(unittest.TestCase): wrapper._delete_table('', '', '') self.assertTrue(client.tables.Delete.called) - def test_delete_table_retries_fail(self): + @mock.patch('time.sleep', return_value=None) + def test_delete_table_retries_fail(self, patched_time_sleep): client = mock.Mock() client.tables.Delete.side_effect = ValueError("Cannot delete") wrapper = beam.io.bigquery.BigQueryWrapper(client) @@ -738,7 +745,8 @@ class TestBigQueryWrapper(unittest.TestCase): wrapper._delete_table('', '', '') self.assertTrue(client.tables.Delete.called) - def test_delete_dataset_retries_for_timeouts(self): + @mock.patch('time.sleep', return_value=None) + def test_delete_dataset_retries_for_timeouts(self, patched_time_sleep): client = mock.Mock() client.datasets.Delete.side_effect = [ HttpError( @@ -749,7 +757,8 @@ class TestBigQueryWrapper(unittest.TestCase): wrapper._delete_dataset('', '') self.assertTrue(client.datasets.Delete.called) - def test_delete_table_retries_for_timeouts(self): + @mock.patch('time.sleep', return_value=None) + def test_delete_table_retries_for_timeouts(self, patched_time_sleep): client = mock.Mock() client.tables.Delete.side_effect = [ HttpError( @@ -760,7 +769,8 @@ class TestBigQueryWrapper(unittest.TestCase): wrapper._delete_table('', '', '') self.assertTrue(client.tables.Delete.called) - def test_temporary_dataset_is_unique(self): + @mock.patch('time.sleep', return_value=None) + def test_temporary_dataset_is_unique(self, patched_time_sleep): client = mock.Mock() c
[3/6] beam git commit: A proposal for a portability framework to execute user definable functions.
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java -- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java new file mode 100644 index 000..92042d0 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.BytesValue; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; +import org.apache.beam.fn.harness.data.BeamFnDataClient; +import org.apache.beam.fn.harness.fn.ThrowingConsumer; +import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.Serializer; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Registers as a consumer for data over the Beam Fn API. Multiplexes any received data + * to all consumers in the specified output map. + * + * Can be re-used serially across {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s. + * For each request, call {@link #registerInputLocation()} to start and call + * {@link #blockTillReadFinishes()} to finish. + */ +public class BeamFnDataReadRunner { + private static final Logger LOGGER = LoggerFactory.getLogger(BeamFnDataReadRunner.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor; + private final Collection>> consumers; + private final Supplier processBundleInstructionIdSupplier; + private final BeamFnDataClient beamFnDataClientFactory; + private final Coder> coder; + private final BeamFnApi.Target inputTarget; + + private CompletableFuture readFuture; + + public BeamFnDataReadRunner( + BeamFnApi.FunctionSpec functionSpec, + Supplier processBundleInstructionIdSupplier, + BeamFnApi.Target inputTarget, + BeamFnApi.Coder coderSpec, + BeamFnDataClient beamFnDataClientFactory, + Map>>> outputMap) + throws IOException { +this.apiServiceDescriptor = functionSpec.getData().unpack(BeamFnApi.RemoteGrpcPort.class) +.getApiServiceDescriptor(); +this.inputTarget = inputTarget; +this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier; +this.beamFnDataClientFactory = beamFnDataClientFactory; +this.consumers = ImmutableList.copyOf(FluentIterable.concat(outputMap.values())); + +@SuppressWarnings("unchecked") +Coder> coder = Serializer.deserialize( +OBJECT_MAPPER.readValue( + coderSpec.getFunctionSpec().getData().unpack(BytesValue.class).getValue().newInput(), +Map.class), +Coder.class); +this.coder = coder; + } + + public void registerInputLocation() { +this.readFuture = beamFnDataClientFactory.forInboundConsumer( +apiServiceDescriptor, +KV.of(processBundleInstructionIdSupplier.get(), inputTarget), +coder, +this::multiplexToConsumers); + } + + public void blockTillReadFinishes() throws Exception { +LOGGER.debug("Waiting for process bundle instruction {} and target {} to close.", +processBundleInstructionIdSupplier.get(), inputTarget); +readFuture.get(); + } + + private void multiplexToConsumers(WindowedValue value) throws Exception { +for (ThrowingConsumer> consumer : consumers) { + consumer.accept(value); +} + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java --
[4/6] beam git commit: A proposal for a portability framework to execute user definable functions.
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java -- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java new file mode 100644 index 000..14e26f0 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.control; + +import com.google.protobuf.Message; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.fn.v1.BeamFnApi.RegisterResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A handler and datastore for types that be can be registered via the Fn API. + * + * Allows for {@link org.apache.beam.fn.v1.BeamFnApi.RegisterRequest}s to occur in parallel with + * subsequent requests that may lookup registered values by blocking lookups until registration + * occurs. + */ +public class RegisterHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(RegisterHandler.class); + private final ConcurrentMap> idToObject; + + public RegisterHandler() { +idToObject = new ConcurrentHashMap<>(); + } + + public T getById(long id) { +try { + @SuppressWarnings("unchecked") + CompletableFuture returnValue = (CompletableFuture) computeIfAbsent(id); + /* + * TODO: Even though the register request instruction occurs before the process bundle + * instruction in the control stream, the instructions are being processed in parallel + * in the Java harness causing a data race which is why we use a future. This will block + * forever in the case of a runner having a bug sending the wrong ids. Instead of blocking + * forever, we could do a timed wait or come up with another way of ordering the instruction + * processing to remove the data race. + */ + return returnValue.get(); +} catch (ExecutionException e) { + throw new RuntimeException(String.format("Failed to load %s", id), e); +} catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(String.format("Failed to load %s", id), e); +} + } + + public BeamFnApi.InstructionResponse.Builder register(BeamFnApi.InstructionRequest request) { +BeamFnApi.InstructionResponse.Builder response = BeamFnApi.InstructionResponse.newBuilder() +.setRegister(RegisterResponse.getDefaultInstance()); + +BeamFnApi.RegisterRequest registerRequest = request.getRegister(); +for (BeamFnApi.ProcessBundleDescriptor processBundleDescriptor +: registerRequest.getProcessBundleDescriptorList()) { + LOGGER.debug("Registering {} with type {}", + processBundleDescriptor.getId(), + processBundleDescriptor.getClass()); + computeIfAbsent(processBundleDescriptor.getId()).complete(processBundleDescriptor); + for (BeamFnApi.Coder coder : processBundleDescriptor.getCodersList()) { +LOGGER.debug("Registering {} with type {}", +coder.getFunctionSpec().getId(), +coder.getClass()); +computeIfAbsent(coder.getFunctionSpec().getId()).complete(coder); + } +} + +return response; + } + + private CompletableFuture computeIfAbsent(long id) { +return idToObject.computeIfAbsent(id, (Long ignored) -> new CompletableFuture<>()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/package-info.java -- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control
[5/6] beam git commit: A proposal for a portability framework to execute user definable functions.
A proposal for a portability framework to execute user definable functions. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b4b2bec Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b4b2bec Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b4b2bec Branch: refs/heads/master Commit: 0b4b2becb45b9f637ba31f599ebe8be0331bd633 Parents: 582c4a8 Author: Luke Cwik Authored: Thu Jan 19 15:16:55 2017 -0800 Committer: Luke Cwik Committed: Mon Jan 30 12:47:55 2017 -0800 -- pom.xml | 36 +- runners/apex/pom.xml| 2 +- sdks/common/fn-api/pom.xml | 111 +++ .../fn-api/src/main/proto/beam_fn_api.proto | 771 +++ sdks/common/pom.xml | 38 + .../src/main/resources/beam/findbugs-filter.xml | 32 +- sdks/java/harness/pom.xml | 167 .../org/apache/beam/fn/harness/FnHarness.java | 131 .../harness/channel/ManagedChannelFactory.java | 80 ++ .../harness/channel/SocketAddressFactory.java | 64 ++ .../beam/fn/harness/channel/package-info.java | 22 + .../fn/harness/control/BeamFnControlClient.java | 165 .../harness/control/ProcessBundleHandler.java | 334 .../fn/harness/control/RegisterHandler.java | 92 +++ .../beam/fn/harness/control/package-info.java | 22 + .../BeamFnDataBufferingOutboundObserver.java| 135 .../beam/fn/harness/data/BeamFnDataClient.java | 64 ++ .../fn/harness/data/BeamFnDataGrpcClient.java | 122 +++ .../harness/data/BeamFnDataGrpcMultiplexer.java | 140 .../harness/data/BeamFnDataInboundObserver.java | 81 ++ .../beam/fn/harness/data/package-info.java | 22 + .../fn/harness/fake/FakeAggregatorFactory.java | 52 ++ .../beam/fn/harness/fake/FakeStepContext.java | 70 ++ .../beam/fn/harness/fake/package-info.java | 22 + .../harness/fn/CloseableThrowingConsumer.java | 23 + .../beam/fn/harness/fn/ThrowingBiFunction.java | 32 + .../beam/fn/harness/fn/ThrowingConsumer.java| 32 + .../beam/fn/harness/fn/ThrowingFunction.java| 32 + .../beam/fn/harness/fn/ThrowingRunnable.java| 30 + .../apache/beam/fn/harness/fn/package-info.java | 22 + .../fn/harness/logging/BeamFnLoggingClient.java | 308 .../beam/fn/harness/logging/package-info.java | 22 + .../apache/beam/fn/harness/package-info.java| 22 + .../beam/fn/harness/stream/AdvancingPhaser.java | 36 + .../harness/stream/BufferingStreamObserver.java | 166 .../fn/harness/stream/DirectStreamObserver.java | 71 ++ .../ForwardingClientResponseObserver.java | 63 ++ .../harness/stream/StreamObserverFactory.java | 91 +++ .../beam/fn/harness/stream/package-info.java| 22 + .../beam/runners/core/BeamFnDataReadRunner.java | 104 +++ .../runners/core/BeamFnDataWriteRunner.java | 87 +++ .../beam/runners/core/BoundedSourceRunner.java | 105 +++ .../apache/beam/runners/core/package-info.java | 22 + .../apache/beam/fn/harness/FnHarnessTest.java | 130 .../channel/ManagedChannelFactoryTest.java | 74 ++ .../channel/SocketAddressFactoryTest.java | 56 ++ .../control/BeamFnControlClientTest.java| 182 + .../control/ProcessBundleHandlerTest.java | 674 .../fn/harness/control/RegisterHandlerTest.java | 80 ++ ...BeamFnDataBufferingOutboundObserverTest.java | 142 .../harness/data/BeamFnDataGrpcClientTest.java | 309 .../data/BeamFnDataGrpcMultiplexerTest.java | 96 +++ .../data/BeamFnDataInboundObserverTest.java | 116 +++ .../logging/BeamFnLoggingClientTest.java| 169 .../fn/harness/stream/AdvancingPhaserTest.java | 48 ++ .../stream/BufferingStreamObserverTest.java | 146 .../stream/DirectStreamObserverTest.java| 139 .../ForwardingClientResponseObserverTest.java | 60 ++ .../stream/StreamObserverFactoryTest.java | 84 ++ .../beam/fn/harness/test/TestExecutors.java | 85 ++ .../beam/fn/harness/test/TestExecutorsTest.java | 160 .../beam/fn/harness/test/TestStreams.java | 162 .../beam/fn/harness/test/TestStreamsTest.java | 84 ++ .../runners/core/BeamFnDataReadRunnerTest.java | 187 + .../runners/core/BeamFnDataWriteRunnerTest.java | 155 .../runners/core/BoundedSourceRunnerTest.java | 113 +++ sdks/java/pom.xml | 1 + sdks/pom.xml| 1 + 68 files changed, 7514 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/pom.xml -- diff --git a/pom.xml b/pom.xml index d09bf59..a53453b 100644 --- a/pom.xml +++ b/pom.xml @@ -11
[GitHub] beam pull request #1801: [BEAM-1347, BEAM-1348] Beam Fn API Basic Java Harne...
Github user asfgit closed the pull request at: https://github.com/apache/beam/pull/1801 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[6/6] beam git commit: [BEAM-1347, BEAM-1348] Beam Fn API Basic Java Harness and Proto Model
[BEAM-1347, BEAM-1348] Beam Fn API Basic Java Harness and Proto Model This closes #1801 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/343176c0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/343176c0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/343176c0 Branch: refs/heads/master Commit: 343176c008832f4d3776a6e591d36be84dfb022a Parents: 582c4a8 0b4b2be Author: Luke Cwik Authored: Mon Jan 30 12:48:35 2017 -0800 Committer: Luke Cwik Committed: Mon Jan 30 12:48:35 2017 -0800 -- pom.xml | 36 +- runners/apex/pom.xml| 2 +- sdks/common/fn-api/pom.xml | 111 +++ .../fn-api/src/main/proto/beam_fn_api.proto | 771 +++ sdks/common/pom.xml | 38 + .../src/main/resources/beam/findbugs-filter.xml | 32 +- sdks/java/harness/pom.xml | 167 .../org/apache/beam/fn/harness/FnHarness.java | 131 .../harness/channel/ManagedChannelFactory.java | 80 ++ .../harness/channel/SocketAddressFactory.java | 64 ++ .../beam/fn/harness/channel/package-info.java | 22 + .../fn/harness/control/BeamFnControlClient.java | 165 .../harness/control/ProcessBundleHandler.java | 334 .../fn/harness/control/RegisterHandler.java | 92 +++ .../beam/fn/harness/control/package-info.java | 22 + .../BeamFnDataBufferingOutboundObserver.java| 135 .../beam/fn/harness/data/BeamFnDataClient.java | 64 ++ .../fn/harness/data/BeamFnDataGrpcClient.java | 122 +++ .../harness/data/BeamFnDataGrpcMultiplexer.java | 140 .../harness/data/BeamFnDataInboundObserver.java | 81 ++ .../beam/fn/harness/data/package-info.java | 22 + .../fn/harness/fake/FakeAggregatorFactory.java | 52 ++ .../beam/fn/harness/fake/FakeStepContext.java | 70 ++ .../beam/fn/harness/fake/package-info.java | 22 + .../harness/fn/CloseableThrowingConsumer.java | 23 + .../beam/fn/harness/fn/ThrowingBiFunction.java | 32 + .../beam/fn/harness/fn/ThrowingConsumer.java| 32 + .../beam/fn/harness/fn/ThrowingFunction.java| 32 + .../beam/fn/harness/fn/ThrowingRunnable.java| 30 + .../apache/beam/fn/harness/fn/package-info.java | 22 + .../fn/harness/logging/BeamFnLoggingClient.java | 308 .../beam/fn/harness/logging/package-info.java | 22 + .../apache/beam/fn/harness/package-info.java| 22 + .../beam/fn/harness/stream/AdvancingPhaser.java | 36 + .../harness/stream/BufferingStreamObserver.java | 166 .../fn/harness/stream/DirectStreamObserver.java | 71 ++ .../ForwardingClientResponseObserver.java | 63 ++ .../harness/stream/StreamObserverFactory.java | 91 +++ .../beam/fn/harness/stream/package-info.java| 22 + .../beam/runners/core/BeamFnDataReadRunner.java | 104 +++ .../runners/core/BeamFnDataWriteRunner.java | 87 +++ .../beam/runners/core/BoundedSourceRunner.java | 105 +++ .../apache/beam/runners/core/package-info.java | 22 + .../apache/beam/fn/harness/FnHarnessTest.java | 130 .../channel/ManagedChannelFactoryTest.java | 74 ++ .../channel/SocketAddressFactoryTest.java | 56 ++ .../control/BeamFnControlClientTest.java| 182 + .../control/ProcessBundleHandlerTest.java | 674 .../fn/harness/control/RegisterHandlerTest.java | 80 ++ ...BeamFnDataBufferingOutboundObserverTest.java | 142 .../harness/data/BeamFnDataGrpcClientTest.java | 309 .../data/BeamFnDataGrpcMultiplexerTest.java | 96 +++ .../data/BeamFnDataInboundObserverTest.java | 116 +++ .../logging/BeamFnLoggingClientTest.java| 169 .../fn/harness/stream/AdvancingPhaserTest.java | 48 ++ .../stream/BufferingStreamObserverTest.java | 146 .../stream/DirectStreamObserverTest.java| 139 .../ForwardingClientResponseObserverTest.java | 60 ++ .../stream/StreamObserverFactoryTest.java | 84 ++ .../beam/fn/harness/test/TestExecutors.java | 85 ++ .../beam/fn/harness/test/TestExecutorsTest.java | 160 .../beam/fn/harness/test/TestStreams.java | 162 .../beam/fn/harness/test/TestStreamsTest.java | 84 ++ .../runners/core/BeamFnDataReadRunnerTest.java | 187 + .../runners/core/BeamFnDataWriteRunnerTest.java | 155 .../runners/core/BoundedSourceRunnerTest.java | 113 +++ sdks/java/pom.xml | 1 + sdks/pom.xml| 1 + 68 files changed, 7514 insertions(+), 4 deletions(-) --
[1/6] beam git commit: A proposal for a portability framework to execute user definable functions.
Repository: beam Updated Branches: refs/heads/master 582c4a8a4 -> 343176c00 http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java -- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java new file mode 100644 index 000..73860ef --- /dev/null +++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BoundedSourceRunnerTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core; + +import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.collection.IsEmptyCollection.empty; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.beam.fn.harness.fn.ThrowingConsumer; +import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.WindowedValue; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link BoundedSourceRunner}. */ +@RunWith(JUnit4.class) +public class BoundedSourceRunnerTest { + @Test + public void testRunReadLoopWithMultipleSources() throws Exception { +List> out1ValuesA = new ArrayList<>(); +List> out1ValuesB = new ArrayList<>(); +List> out2Values = new ArrayList<>(); +Map>>> outputMap = ImmutableMap.of( +"out1", ImmutableList.of(out1ValuesA::add, out1ValuesB::add), +"out2", ImmutableList.of(out2Values::add)); + +BoundedSourceRunner, Long> runner = +new BoundedSourceRunner<>( +PipelineOptionsFactory.create(), +BeamFnApi.FunctionSpec.getDefaultInstance(), +outputMap); + +runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(2))); +runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(1))); + +assertThat(out1ValuesA, +contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(0L))); +assertThat(out1ValuesB, +contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(0L))); +assertThat(out2Values, +contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueInGlobalWindow(0L))); + } + + @Test + public void testRunReadLoopWithEmptySource() throws Exception { +List> out1Values = new ArrayList<>(); +Map>>> outputMap = ImmutableMap.of( +"out1", ImmutableList.of(out1Values::add)); + +BoundedSourceRunner, Long> runner = +new BoundedSourceRunner<>( +PipelineOptionsFactory.create(), +BeamFnApi.FunctionSpec.getDefaultInstance(), +outputMap); + +runner.runReadLoop(valueInGlobalWindow(CountingSource.upTo(0))); + +assertThat(out1Values, empty()); + } + + @Test + public void testStart() throws Exception { +List> outValues = new ArrayList<>(); +Map>>> outputMap = ImmutableMap.of( +"out", ImmutableList.of(outValues::add)); + +ByteString encodedSource = + ByteString.copyFrom(SerializableUtils.serializeToByteArray(CountingSource.upTo(3))); + +BoundedSourceRunner, Long> runner = +new BoundedSourceRunner<>( +PipelineOptionsFactory.create(), +BeamFnApi.FunctionSpec.newBuilder().setData( + Any.pack(BytesValue.newBuilder().setValue(encodedSource).build())).build(), +outputMap); + +runner.start(); + +assertThat(outValues, +contains(valueInGlobalWindow(0L), valueInGlobalWindow(1L), valueIn
[2/6] beam git commit: A proposal for a portability framework to execute user definable functions.
http://git-wip-us.apache.org/repos/asf/beam/blob/0b4b2bec/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java -- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java new file mode 100644 index 000..20566ea --- /dev/null +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/BeamFnDataGrpcClientTest.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness.data; + +import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; +import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.collection.IsEmptyCollection.empty; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.CallStreamObserver; +import io.grpc.stub.StreamObserver; +import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; +import org.apache.beam.fn.harness.fn.ThrowingConsumer; +import org.apache.beam.fn.harness.test.TestStreams; +import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.fn.v1.BeamFnDataGrpc; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.LengthPrefixCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link BeamFnDataGrpcClient}. */ +@RunWith(JUnit4.class) +public class BeamFnDataGrpcClientTest { + private static final Coder> CODER = + LengthPrefixCoder.of( + WindowedValue.getFullCoder(StringUtf8Coder.of(), + GlobalWindow.Coder.INSTANCE)); + private static final KV KEY_A = KV.of( + 12L, + BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(34L).setName("targetA").build()); + private static final KV KEY_B = KV.of( + 56L, + BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(78L).setName("targetB").build()); + + private static final BeamFnApi.Elements ELEMENTS_A_1; + private static final BeamFnApi.Elements ELEMENTS_A_2; + private static final BeamFnApi.Elements ELEMENTS_B_1; + static { +try { +ELEMENTS_A_1 = BeamFnApi.Elements.newBuilder() +.addData(BeamFnApi.Elements.Data.newBuilder() +.setInstructionReference(KEY_A.getKey()) +.setTarget(KEY_A.getValue()) +.setData(ByteString.copyFrom(encodeToByteArray(CODER, valueInGlobalWindow("ABC"))) +.concat(ByteString.copyFrom(encodeToByteArray(CODER, valueInGlobalWindow("DEF")) +.build(); +ELEMENTS_A_2 = BeamFnApi.Elements.newBuilder() +.addData(BeamFnApi.Elements.Data.newBuilder() +.setInstructionReference(KEY_A.getKey()) +.setTarget(KEY_A.getValue()) +.setData(ByteString.copyFrom(encodeToByteArray(CODER, valueInGlobalWindow("GHI") +.addData(BeamFnApi.Elements.Data.newBuilder() +.setInstructionReference(KEY_A.getKey()) +.setTarget(KEY_A.getValue())) +.build(); +ELEMENTS_B_1 = BeamFnApi.El
[jira] (BEAM-1347) Basic Java harness capable of understanding process bundle tasks and sending data over the Fn Api
Title: Message Title ASF GitHub Bot commented on BEAM-1347 Re: Basic Java harness capable of understanding process bundle tasks and sending data over the Fn Api Github user asfgit closed the pull request at: https://github.com/apache/beam/pull/1801 Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[GitHub] beam pull request #1871: Revert python-sdk only changes in travis, and clean...
GitHub user aaltay opened a pull request: https://github.com/apache/beam/pull/1871 Revert python-sdk only changes in travis, and clean incubator keywords. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/aaltay/incubator-beam travis Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/1871.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1871 commit d888060153101a5339a2869440ee67aaf793d987 Author: Ahmet Altay Date: 2017-01-30T20:51:15Z Revert python-sdk only changes in travis, and clean incubator keywords. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #1866: [BEAM-1338] Moves ThreadPool creation to a util fun...
Github user chamikaramj closed the pull request at: https://github.com/apache/beam/pull/1866 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] (BEAM-1349) Merge python-sdk to master
Title: Message Title Ahmet Altay created an issue Beam / BEAM-1349 Merge python-sdk to master Issue Type: Bug Assignee: Ahmet Altay Components: sdk-py Created: 30/Jan/17 20:54 Fix Versions: 0.6.0 Priority: Major Reporter: Ahmet Altay Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (BEAM-1338) Move ThreadPool creation logic to a util function
Title: Message Title ASF GitHub Bot commented on BEAM-1338 Re: Move ThreadPool creation logic to a util function Github user chamikaramj closed the pull request at: https://github.com/apache/beam/pull/1866 Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[GitHub] beam pull request #1863: [BEAM-1333] Add mock time to slow bigquery unit tes...
Github user aaltay closed the pull request at: https://github.com/apache/beam/pull/1863 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] beam pull request #1870: Merge master into python-sdk
Github user aaltay closed the pull request at: https://github.com/apache/beam/pull/1870 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] (BEAM-1333) Biq query unit tests are slow
Title: Message Title ASF GitHub Bot commented on BEAM-1333 Re: Biq query unit tests are slow Github user aaltay closed the pull request at: https://github.com/apache/beam/pull/1863 Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[2/2] beam git commit: This closes #1815
This closes #1815 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/847e4e9f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/847e4e9f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/847e4e9f Branch: refs/heads/master Commit: 847e4e9f0b84efa4726692cc8b7c9a0610703888 Parents: 343176c 62f9e7b Author: Sela Authored: Mon Jan 30 22:53:56 2017 +0200 Committer: Sela Committed: Mon Jan 30 22:53:56 2017 +0200 -- .../apache/beam/runners/spark/SparkRunner.java | 21 - .../spark/aggregators/AccumulatorSingleton.java | 96 ++-- .../spark/aggregators/SparkAggregators.java | 20 +++- .../translation/streaming/CheckpointDir.java| 69 ++ .../SparkRunnerStreamingContextFactory.java | 44 ++--- .../ResumeFromCheckpointStreamingTest.java | 5 +- 6 files changed, 230 insertions(+), 25 deletions(-) --
[1/2] beam git commit: [BEAM-648] Persist and restore Aggergator values in case of recovery from failure
Repository: beam Updated Branches: refs/heads/master 343176c00 -> 847e4e9f0 [BEAM-648] Persist and restore Aggergator values in case of recovery from failure Added javadoc and minor refactor Moved creation of beam checkpoint dir Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62f9e7b1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62f9e7b1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62f9e7b1 Branch: refs/heads/master Commit: 62f9e7b1e1a8a8f2317e3508ccce615f2b30d4f6 Parents: 343176c Author: Aviem Zur Authored: Sun Jan 22 14:30:44 2017 +0200 Committer: Sela Committed: Mon Jan 30 22:53:34 2017 +0200 -- .../apache/beam/runners/spark/SparkRunner.java | 21 - .../spark/aggregators/AccumulatorSingleton.java | 96 ++-- .../spark/aggregators/SparkAggregators.java | 20 +++- .../translation/streaming/CheckpointDir.java| 69 ++ .../SparkRunnerStreamingContextFactory.java | 44 ++--- .../ResumeFromCheckpointStreamingTest.java | 5 +- 6 files changed, 230 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/62f9e7b1/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 92c07bb..578ed21 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -18,12 +18,14 @@ package org.apache.beam.runners.spark; +import com.google.common.base.Optional; import com.google.common.collect.Iterables; import java.util.Collection; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.aggregators.SparkAggregators; import org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetricSource; @@ -32,6 +34,7 @@ import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; import org.apache.beam.runners.spark.translation.TransformEvaluator; import org.apache.beam.runners.spark.translation.TransformTranslator; +import org.apache.beam.runners.spark.translation.streaming.CheckpointDir; import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; @@ -54,6 +57,7 @@ import org.apache.spark.SparkEnv$; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.metrics.MetricsSystem; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,7 +134,11 @@ public final class SparkRunner extends PipelineRunner { } private void registerMetrics(final SparkPipelineOptions opts, final JavaSparkContext jsc) { -final Accumulator accum = SparkAggregators.getNamedAggregators(jsc); +Optional maybeCheckpointDir = +opts.isStreaming() ? Optional.of(new CheckpointDir(opts.getCheckpointDir())) +: Optional.absent(); +final Accumulator accum = +SparkAggregators.getOrCreateNamedAggregators(jsc, maybeCheckpointDir); final NamedAggregators initialValue = accum.value(); if (opts.getEnableSparkMetricSinks()) { @@ -154,10 +162,17 @@ public final class SparkRunner extends PipelineRunner { detectTranslationMode(pipeline); if (mOptions.isStreaming()) { + CheckpointDir checkpointDir = new CheckpointDir(mOptions.getCheckpointDir()); final SparkRunnerStreamingContextFactory contextFactory = - new SparkRunnerStreamingContextFactory(pipeline, mOptions); + new SparkRunnerStreamingContextFactory(pipeline, mOptions, checkpointDir); final JavaStreamingContext jssc = - JavaStreamingContext.getOrCreate(mOptions.getCheckpointDir(), contextFactory); + JavaStreamingContext.getOrCreate(checkpointDir.getSparkCheckpointDir().toString(), + contextFactory); + + // Checkpoint aggregator values + jssc.addStreamingListener( + new JavaStreamingListenerWrapper( + new AccumulatorSingleton.AccumulatorCheckpointingSparkListener())); startPipeline = executorService.submit(new Runnable() { http
[jira] (BEAM-648) Persist and restore Aggergator values in case of recovery from failure
Title: Message Title ASF GitHub Bot commented on BEAM-648 Re: Persist and restore Aggergator values in case of recovery from failure Github user asfgit closed the pull request at: https://github.com/apache/beam/pull/1815 Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[GitHub] beam pull request #1815: [BEAM-648] Persist and restore Aggergator values in...
Github user asfgit closed the pull request at: https://github.com/apache/beam/pull/1815 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] (BEAM-648) Persist and restore Aggergator values in case of recovery from failure
Title: Message Title Amit Sela resolved as Fixed Beam / BEAM-648 Persist and restore Aggergator values in case of recovery from failure Change By: Amit Sela Resolution: Fixed Fix Version/s: 0.6.0 Status: Open Resolved Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[1/2] beam git commit: Revert python-sdk only changes in travis, and clean incubator keywords.
Repository: beam Updated Branches: refs/heads/python-sdk 1390699c3 -> be0e32e36 Revert python-sdk only changes in travis, and clean incubator keywords. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b4ee73a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b4ee73a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b4ee73a Branch: refs/heads/python-sdk Commit: 0b4ee73a36f47a8f1b5c7ece775eae2c68af4245 Parents: 1390699 Author: Ahmet Altay Authored: Mon Jan 30 12:51:15 2017 -0800 Committer: Davor Bonaci Committed: Mon Jan 30 13:25:32 2017 -0800 -- .travis.yml | 15 +++ .../examples/cookbook/datastore_wordcount.py | 2 +- sdks/python/setup.py | 4 ++-- 3 files changed, 18 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/0b4ee73a/.travis.yml -- diff --git a/.travis.yml b/.travis.yml index cb6f790..a392f7d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -39,6 +39,20 @@ env: matrix: include: +# On OSX, run with default JDK only. +- os: osx + +# On Linux, run with specific JDKs only. +- os: linux + env: CUSTOM_JDK="oraclejdk8" MAVEN_OVERRIDE="$MAVEN_OVERRIDE $MAVEN_CONTAINER_OVERRIDE" +- os: linux + env: CUSTOM_JDK="oraclejdk7" MAVEN_OVERRIDE="$MAVEN_OVERRIDE $MAVEN_CONTAINER_OVERRIDE" +- os: linux + env: CUSTOM_JDK="openjdk7" MAVEN_OVERRIDE="$MAVEN_OVERRIDE $MAVEN_CONTAINER_OVERRIDE" +- os: linux + env: MAVEN_OVERRIDE="-Peclipse-jdt -DskipTests $MAVEN_OVERRIDE $MAVEN_CONTAINER_OVERRIDE" CUSTOM_JDK="oraclejdk8" + +# Python SDK tests. - os: osx env: TEST_PYTHON="1" - os: linux @@ -51,6 +65,7 @@ before_install: - cat ~/.mavenrc - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi - if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi + - export BEAM_SUREFIRE_ARGLINE="-Xmx512m" # Python SDK environment settings. - export TOX_ENV=py27 - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export TOX_HOME=$HOME/Library/Python/2.7/bin; fi http://git-wip-us.apache.org/repos/asf/beam/blob/0b4ee73a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py -- diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py index 25abb3e..067cb80 100644 --- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py +++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py @@ -23,7 +23,7 @@ Cloud Datastore operations. See https://developers.google.com/datastore/ for more details on Google Cloud Datastore. -See http://beam.incubator.apache.org/get-started/quickstart on +See https://beam.apache.org/get-started/quickstart on how to run a Beam pipeline. Read-only Mode: In this mode, this example reads Cloud Datastore entities using http://git-wip-us.apache.org/repos/asf/beam/blob/0b4ee73a/sdks/python/setup.py -- diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 37125c2..e75a583 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -37,10 +37,10 @@ def get_version(): PACKAGE_NAME = 'apache-beam-sdk' PACKAGE_VERSION = get_version() PACKAGE_DESCRIPTION = 'Apache Beam SDK for Python' -PACKAGE_URL = 'https://beam.incubator.apache.org' +PACKAGE_URL = 'https://beam.apache.org' PACKAGE_DOWNLOAD_URL = 'TBD' PACKAGE_AUTHOR = 'Apache Software Foundation' -PACKAGE_EMAIL = 'd...@beam.incubator.apache.org' +PACKAGE_EMAIL = 'd...@beam.apache.org' PACKAGE_KEYWORDS = 'apache beam' PACKAGE_LONG_DESCRIPTION = ''' TBD
[2/2] beam git commit: This closes #1871
This closes #1871 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/be0e32e3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/be0e32e3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/be0e32e3 Branch: refs/heads/python-sdk Commit: be0e32e36313390ed04106d57f4c9dfeabb91b4d Parents: 1390699 0b4ee73 Author: Davor Bonaci Authored: Mon Jan 30 13:25:53 2017 -0800 Committer: Davor Bonaci Committed: Mon Jan 30 13:25:53 2017 -0800 -- .travis.yml | 15 +++ .../examples/cookbook/datastore_wordcount.py | 2 +- sdks/python/setup.py | 4 ++-- 3 files changed, 18 insertions(+), 3 deletions(-) --
[GitHub] beam pull request #1871: Revert python-sdk only changes in travis, and clean...
Github user aaltay closed the pull request at: https://github.com/apache/beam/pull/1871 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] (BEAM-1349) Merge python-sdk to master
Title: Message Title ASF GitHub Bot commented on BEAM-1349 Re: Merge python-sdk to master GitHub user aaltay opened a pull request: https://github.com/apache/beam/pull/1872 BEAM-1349 Merge python-sdk to master You can merge this pull request into a Git repository by running: $ git pull https://github.com/apache/beam python-sdk Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/1872.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1872 commit 0a66721b5528483536eac753d5fc8cf28844b2eb Author: Maria Garcia Herrero Date: 2016-11-22T17:57:50Z Merge remote-tracking branch 'origin/master' into merge_master commit cc706608b281c3beeebd2487084946c06bc83f30 Author: Mark Liu Date: 2016-11-17T17:53:01Z Support @ValidatesRunner(RunnableOnService) in Python [1/2] commit 28bfd9090b0ce33c3da0bab0220fcc1ef8a72b4b Author: Thomas Groh Date: 2016-11-22T18:11:22Z This closes #1376 commit b4187bd91e9e53c1562ee845ffa87cc9e734006f Author: Kenneth Knowles Date: 2016-11-22T20:08:38Z This closes #1416 commit a6be102a9ebafd3e616ee6aca9a11fbb21c375d9 Author: Maria Garcia Herrero Date: 2016-11-22T21:24:08Z Remove tests for merge commit 3b5cd0efc5f5e4b8fad34ee0d976e5e6ba501065 Author: Kenneth Knowles Date: 2016-11-22T21:56:37Z This closes #1384 commit 21f9c6d2cff052d662326ce73fdcf1fb08504dda Author: Kenneth Knowles Date: 2016-11-22T23:31:10Z This closes #1423 commit 2b69cce0f311a2ef40fdef4fe60d3e6fc13a8868 Author: Vikas Kedigehalli Date: 2016-11-16T00:41:24Z Add DatastoreIO to Python SDK commit 9b9d016c80b9a7e73a7485d3e579ead3ada18ac6 Author: Davor Bonaci Date: 2016-11-23T18:42:17Z This closes #1398 commit 4dd19782f2624bf8aed3df8484fa314f94904571 Author: Kenneth Knowles Date: 2016-11-16T05:33:13Z Reject stateful DoFn in SparkRunner commit 413a40243a30e059476395a2dcbfc98a94bb22f2 Author: Kenneth Knowles Date: 2016-11-16T05:33:28Z Reject stateful DoFn in FlinkRunner commit 255ad9a327133ab4f05ebbceca236d5fe0006028 Author: Kenneth Knowles Date: 2016-11-21T23:41:13Z Add JUnit category for stateful ParDo tests commit 1fc8d65a079e58d740a9b954da980963f20e9edf Author: Scott Wegner Date: 2016-11-22T00:33:07Z Update StarterPipeline
[GitHub] beam pull request #1872: [BEAM-1349] Merge python-sdk to master
GitHub user aaltay opened a pull request: https://github.com/apache/beam/pull/1872 [BEAM-1349] Merge python-sdk to master You can merge this pull request into a Git repository by running: $ git pull https://github.com/apache/beam python-sdk Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/1872.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1872 commit 0a66721b5528483536eac753d5fc8cf28844b2eb Author: Maria Garcia Herrero Date: 2016-11-22T17:57:50Z Merge remote-tracking branch 'origin/master' into merge_master commit cc706608b281c3beeebd2487084946c06bc83f30 Author: Mark Liu Date: 2016-11-17T17:53:01Z Support @ValidatesRunner(RunnableOnService) in Python [1/2] commit 28bfd9090b0ce33c3da0bab0220fcc1ef8a72b4b Author: Thomas Groh Date: 2016-11-22T18:11:22Z This closes #1376 commit b4187bd91e9e53c1562ee845ffa87cc9e734006f Author: Kenneth Knowles Date: 2016-11-22T20:08:38Z This closes #1416 commit a6be102a9ebafd3e616ee6aca9a11fbb21c375d9 Author: Maria Garcia Herrero Date: 2016-11-22T21:24:08Z Remove tests for merge commit 3b5cd0efc5f5e4b8fad34ee0d976e5e6ba501065 Author: Kenneth Knowles Date: 2016-11-22T21:56:37Z This closes #1384 commit 21f9c6d2cff052d662326ce73fdcf1fb08504dda Author: Kenneth Knowles Date: 2016-11-22T23:31:10Z This closes #1423 commit 2b69cce0f311a2ef40fdef4fe60d3e6fc13a8868 Author: Vikas Kedigehalli Date: 2016-11-16T00:41:24Z Add DatastoreIO to Python SDK commit 9b9d016c80b9a7e73a7485d3e579ead3ada18ac6 Author: Davor Bonaci Date: 2016-11-23T18:42:17Z This closes #1398 commit 4dd19782f2624bf8aed3df8484fa314f94904571 Author: Kenneth Knowles Date: 2016-11-16T05:33:13Z Reject stateful DoFn in SparkRunner commit 413a40243a30e059476395a2dcbfc98a94bb22f2 Author: Kenneth Knowles Date: 2016-11-16T05:33:28Z Reject stateful DoFn in FlinkRunner commit 255ad9a327133ab4f05ebbceca236d5fe0006028 Author: Kenneth Knowles Date: 2016-11-21T23:41:13Z Add JUnit category for stateful ParDo tests commit 1fc8d65a079e58d740a9b954da980963f20e9edf Author: Scott Wegner Date: 2016-11-22T00:33:07Z Update StarterPipeline Convert StarterPipeline ParDo to MapElements. Use the new DoFn for non-outputting transforms. commit 796ba7ab75bc8d01a3a59efc29cdc17bcd26af4a Author: Kenneth Knowles Date: 2016-11-16T05:33:01Z Reject stateful DoFn in ApexRunner commit 6fa8f658abaac1d3a983bfc3b8c09422159af8aa Author: bchambers Date: 2016-11-22T19:37:23Z Simplify the API for managing MetricsEnvironment 1. setCurrentContainer returns the previous MetricsEnvironment 2. setCurrentContainer(null) resets the thread local 3. scopedCurrentContainer sets the container and returns a Closeable to reset the previous container. commit f03b4fe11cb605edf216903738a6c305b3a91066 Author: Thomas Groh Date: 2016-11-22T22:51:39Z Output Keyed Bundles in GroupAlsoByWindowEvaluator This allows reuse of keys for downstream serialization. commit dcd401ba0b5bd12343484b0df50b15b6ef10ace9 Author: Thomas Groh Date: 2016-11-23T00:14:29Z Add TransformHierarchyTest This tests basic features of TransformHierarchy commit 6f86af612f97ad57cf4ba2cae21ba232f7494ada Author: Kenneth Knowles Date: 2016-11-23T06:16:29Z Use more natural class to find class loader in ReflectHelpers commit 2e03bb8a136078064014a0a7101960f6d2019487 Author: Thomas Weise Date: 2016-11-22T19:38:00Z Update transitive dependencies for Apex 3.5.0 snapshot version. commit 3dbeb8edfdfe4c9e8987e4d8df4451fdb748dc07 Author: Davor Bonaci Date: 2016-11-24T00:02:41Z This closes #1432 commit d46203b7fcdc9895c9cee1d82710f48aba31a748 Author: Vikas Kedigehalli Date: 2016-11-23T22:09:09Z datastoreio write/delete ptransform update datastore_wordcount example to include writes commit 1530a17279d098ae7459f689ef02401f5116e54e Author: Dan Halperin Date: 2016-11-28T23:54:27Z Closes #1433 commit 7a059d37e71b62702e8cdeafec6956fc7e1e38c4 Author: Sourabh Bajaj Date: 2016-11-21T23:50:21Z Improve the speed of getting file sizes commit ad4dc87a472387b507545ab80dbd2fe42e02cea3 Author: Davor Bonaci Date: 2016-11-29T01:40:50Z This closes #1404 commit 6c8c17a1c1977ed69860d25dc8ab45640e7a1c53 Author: Vikas Kedigehalli Date: 2016-11-29T17:54:00Z Update googledatastore version commit 5ce75a2eae31dbab4d07d301716b4d7e3218b8b9 Author: Dan Halperin Date: 2016-11-29T22:01:50Z Closes #1453 commit 81e7a0f653864212a5c9d3d0802608f92bb34501 Author: Mark Liu Date: 2016-11-17T22:45:42Z Support ValidatesRunner Attribute in Python This is roughly equivalent to "RunnableOnService" in the Java SDK. See BEAM-655 commit 70c1de9b95e9c20e5efb277d9ad50ae6348
[jira] (BEAM-1350) BigQueryIO does not honor SQL dialect or result flattening in DirectRunner
Title: Message Title Daniel Halperin created an issue Beam / BEAM-1350 BigQueryIO does not honor SQL dialect or result flattening in DirectRunner Issue Type: Bug Assignee: Daniel Halperin Components: sdk-java-gcp Created: 30/Jan/17 22:06 Priority: Major Reporter: Daniel Halperin BigQueryTableRowIterator is used when reading on some runners that do not perform initial splitting. It has a bug in that it did not propagate configuration correctly, by ignoring useLegacySQL and flattenResults flags. https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java#L395 vs https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1148 This can cause failures on some runners, possibly including the DirectRunner. Add Comment
[GitHub] beam pull request #1873: [BEAM-1350] BigQuery: refactor services so that all...
GitHub user dhalperi opened a pull request: https://github.com/apache/beam/pull/1873 [BEAM-1350] BigQuery: refactor services so that all queryConfig happens in BigQueryIO By putting all the configuration in the same place, we can avoid bugs that happen from mismatching code across files. Also made a few unnecessarily-public APIs package-private. And improved tests, removed a few dataflow references. Forward port from Dataflow. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dhalperi/beam bigquery-io-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/1873.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1873 commit e643a85cbf64687de024ca0dc0f34b9c6ae451f5 Author: Dan Halperin Date: 2017-01-30T22:04:32Z BigQuery: refactor services so that all queryConfig happens in BigQueryIO By putting all the configuration in the same place, we can avoid bugs that happen from mismatching code across files. Also made a few unnecessarily-public APIs package-private. And improved tests, removed a few dataflow references. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] (BEAM-1350) BigQueryIO does not honor SQL dialect or result flattening in DirectRunner
Title: Message Title ASF GitHub Bot commented on BEAM-1350 Re: BigQueryIO does not honor SQL dialect or result flattening in DirectRunner GitHub user dhalperi opened a pull request: https://github.com/apache/beam/pull/1873 BEAM-1350 BigQuery: refactor services so that all queryConfig happens in BigQueryIO By putting all the configuration in the same place, we can avoid bugs that happen from mismatching code across files. Also made a few unnecessarily-public APIs package-private. And improved tests, removed a few dataflow references. Forward port from Dataflow. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dhalperi/beam bigquery-io-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/1873.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1873 commit e643a85cbf64687de024ca0dc0f34b9c6ae451f5 Author: Dan Halperin Date: 2017-01-30T22:04:32Z BigQuery: refactor services so that all queryConfig happens in BigQueryIO By putting all the configuration in the same place, we can avoid bugs that happen from mismatching code across files. Also made a few unnecessarily-public APIs package-private. And improved tests, removed a few dataflow references. Add Comment This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d)
[jira] (BEAM-1351) Upgrade AutoValue 1.1->1.3
Title: Message Title Pei He created an issue Beam / BEAM-1351 Upgrade AutoValue 1.1->1.3 Issue Type: Task Assignee: Pei He Components: sdk-java-core Created: 30/Jan/17 22:06 Priority: Major Reporter: Pei He Update to allow using AutoValue on an inherited class. I am using this feature in Beam FileSystem, and to define a file system specific CreateOptions. Additional benefits: 1. The classes in the autovalue jar are now shaded with a $ so they never appear in IDE autocompletion. 2. AutoValue now uses its own implementation of a subset of Apache Velocity, so there will no longer be problems with interference between the Velocity that was bundled with AutoValue and other versions that might be present. 3. Explicit check for nested @AutoValue classes being private, or not being static. Otherwise the compiler errors could be hard to understand, especially in IDEs. 4. An Eclipse bug that could occasionally lead to exceptions in the IDE has been fixed (GitHub issue #200) 5. Added logic to AutoValue to detect the confusing case where you think you are using JavaBeans conventions (like getFoo()) but you aren't because at least one method isn't.
[jira] (BEAM-1352) io/google-cloud-platform should not depend on runners/dataflow for testing
Title: Message Title Pei He created an issue Beam / BEAM-1352 io/google-cloud-platform should not depend on runners/dataflow for testing Issue Type: Task Assignee: Pei He Components: sdk-java-core Created: 30/Jan/17 22:29 Priority: Major Reporter: Pei He dataflow-runner needs to depends on io/google-cloud-platform to specialize configurations. Currently, it is done by putting GcsUtil in the sdk.util. https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java#L189 It is no longer possible after FileSystem refactoring, given GcsFileSystem and its configuration will be in io/google-cloud-platform. Add Comment
[06/50] [abbrv] beam git commit: Closes #1810
Closes #1810 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/894461e6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/894461e6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/894461e6 Branch: refs/heads/master Commit: 894461e64b09e6d719ba3eef282cd36cea550f7b Parents: d0474ab f68c9dc Author: Robert Bradshaw Authored: Mon Jan 23 09:48:55 2017 -0800 Committer: Robert Bradshaw Committed: Mon Jan 23 09:48:55 2017 -0800 -- sdks/python/apache_beam/runners/common.pxd | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) --
[39/50] [abbrv] beam git commit: Update the version.py file to match the latest beam version.
Update the version.py file to match the latest beam version. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/38575a14 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/38575a14 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/38575a14 Branch: refs/heads/master Commit: 38575a14e2b17c93de2d0e27fe6213daa7101695 Parents: 4aaaf8f Author: Ahmet Altay Authored: Mon Jan 30 12:21:28 2017 -0800 Committer: Davor Bonaci Committed: Mon Jan 30 12:38:38 2017 -0800 -- sdks/python/apache_beam/version.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/38575a14/sdks/python/apache_beam/version.py -- diff --git a/sdks/python/apache_beam/version.py b/sdks/python/apache_beam/version.py index 60d9634..12509fb 100644 --- a/sdks/python/apache_beam/version.py +++ b/sdks/python/apache_beam/version.py @@ -21,7 +21,7 @@ import re -__version__ = '0.3.0-incubating.dev' # TODO: PEP 440 and incubating suffix +__version__ = '0.6.0.dev' # The following utilities are legacy code from the Maven integration; @@ -40,7 +40,6 @@ def get_version_from_pom(): search = pattern.search(pom) version = search.group(1) version = version.replace("-SNAPSHOT", ".dev") -# TODO: PEP 440 and incubating suffix return version
[13/50] [abbrv] beam git commit: Revert "Remove dataflow_test.py"
Revert "Remove dataflow_test.py" This reverts commit d5b90d8383e662e803ea79b31661250a043bcfd2. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/96fcc7d3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/96fcc7d3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/96fcc7d3 Branch: refs/heads/master Commit: 96fcc7d31c2540f867c3a73903c2aa99183a6b8b Parents: af49908 Author: Robert Bradshaw Authored: Tue Jan 24 09:28:38 2017 -0800 Committer: Robert Bradshaw Committed: Tue Jan 24 09:28:38 2017 -0800 -- sdks/python/apache_beam/dataflow_test.py| 418 +++ .../apache_beam/transforms/ptransform_test.py | 67 --- .../apache_beam/transforms/sideinputs_test.py | 208 + 3 files changed, 419 insertions(+), 274 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/96fcc7d3/sdks/python/apache_beam/dataflow_test.py -- diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py new file mode 100644 index 000..f410230 --- /dev/null +++ b/sdks/python/apache_beam/dataflow_test.py @@ -0,0 +1,418 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Integration tests for the dataflow package.""" + +from __future__ import absolute_import + +import logging +import re +import unittest + +import apache_beam as beam +from apache_beam.pvalue import AsDict +from apache_beam.pvalue import AsIter as AllOf +from apache_beam.pvalue import AsList +from apache_beam.pvalue import AsSingleton +from apache_beam.pvalue import EmptySideInput +from apache_beam.pvalue import SideOutputValue +from apache_beam.test_pipeline import TestPipeline +from apache_beam.transforms import Create +from apache_beam.transforms import DoFn +from apache_beam.transforms import FlatMap +from apache_beam.transforms import GroupByKey +from apache_beam.transforms import Map +from apache_beam.transforms import ParDo +from apache_beam.transforms import WindowInto +from apache_beam.transforms.util import assert_that +from apache_beam.transforms.util import equal_to +from apache_beam.transforms.window import IntervalWindow +from apache_beam.transforms.window import WindowFn +from nose.plugins.attrib import attr + + +class DataflowTest(unittest.TestCase): + """Dataflow integration tests.""" + + SAMPLE_DATA = ['aa bb cc aa bb aa \n'] * 10 + SAMPLE_RESULT = [('cc', 10), ('bb', 20), ('aa', 30)] + + @beam.ptransform_fn + def Count(pcoll): # pylint: disable=invalid-name, no-self-argument +"""A Count transform: v, ... => (v, n), ...""" +return (pcoll +| 'AddCount' >> Map(lambda x: (x, 1)) +| 'GroupCounts' >> GroupByKey() +| 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones + + @attr('ValidatesRunner') + def test_word_count(self): +pipeline = TestPipeline() +lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA) +result = ( +(lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x))) +.apply('CountWords', DataflowTest.Count)) +assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT)) +pipeline.run() + + @attr('ValidatesRunner') + def test_map(self): +pipeline = TestPipeline() +lines = pipeline | 'input' >> Create(['a', 'b', 'c']) +result = (lines + | 'upper' >> Map(str.upper) + | 'prefix' >> Map(lambda x, prefix: prefix + x, 'foo-')) +assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C'])) +pipeline.run() + + @attr('ValidatesRunner') + def test_par_do_with_side_input_as_arg(self): +pipeline = TestPipeline() +words_list = ['aa', 'bb', 'cc'] +words = pipeline | 'SomeWords' >> Create(words_list) +prefix = pipeline | 'SomeString' >> Create(['xyz']) # side in +suffix = 'zyx' +result = words | FlatMap( +'DecorateWords', +lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)], +AsSingleton(prefix), suffix) +assert_that(result, equal_to(['xyz-%s-zyx' % x for x in words_list
[36/50] [abbrv] beam git commit: Merge remote-tracking branch 'origin/master' into python-sdk.
Merge remote-tracking branch 'origin/master' into python-sdk. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c2859a55 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c2859a55 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c2859a55 Branch: refs/heads/master Commit: c2859a55f89c9807a037adfde9f7e8f506c108ce Parents: 1bc6859 34b4a6d Author: Ahmet Altay Authored: Fri Jan 27 16:57:44 2017 -0800 Committer: Ahmet Altay Committed: Fri Jan 27 16:57:44 2017 -0800 -- .jenkins/common_job_properties.groovy |9 +- ...job_beam_PostCommit_Java_MavenInstall.groovy |2 +- .../job_beam_PreCommit_Java_MavenInstall.groovy |2 +- .../job_beam_Release_NightlySnapshot.groovy |2 +- .jenkins/job_seed.groovy|2 +- .travis/README.md |2 +- DISCLAIMER | 10 - NOTICE |4 +- README.md | 46 +- examples/java/README.md | 16 +- examples/java/pom.xml | 21 +- .../beam/examples/DebuggingWordCount.java |4 +- .../org/apache/beam/examples/WordCount.java |6 +- .../beam/examples/complete/AutoComplete.java|2 +- .../org/apache/beam/examples/complete/README.md | 14 +- .../apache/beam/examples/complete/TfIdf.java|2 +- .../examples/complete/TopWikipediaSessions.java |2 +- .../examples/complete/TrafficMaxLaneFlow.java |2 +- .../beam/examples/complete/TrafficRoutes.java |2 +- .../examples/cookbook/BigQueryTornadoes.java|2 +- .../cookbook/CombinePerKeyExamples.java |2 +- .../org/apache/beam/examples/cookbook/README.md | 14 +- .../beam/examples/cookbook/TriggerExample.java |4 +- .../beam/examples/WindowedWordCountIT.java | 16 +- examples/java8/pom.xml |2 +- .../beam/examples/complete/game/GameStats.java |7 +- .../examples/complete/game/LeaderBoard.java |5 +- .../beam/examples/complete/game/UserScore.java |2 +- examples/pom.xml| 16 +- pom.xml | 41 +- runners/apex/README.md |4 +- runners/apex/pom.xml|3 +- .../beam/runners/apex/ApexPipelineOptions.java |7 +- .../apache/beam/runners/apex/ApexRunner.java| 43 +- .../beam/runners/apex/ApexYarnLauncher.java | 23 +- .../translation/CreateValuesTranslator.java | 18 +- .../FlattenPCollectionTranslator.java | 28 +- .../apex/translation/GroupByKeyTranslator.java |2 +- .../translation/ParDoBoundMultiTranslator.java | 27 +- .../apex/translation/ParDoBoundTranslator.java |4 +- .../apex/translation/TranslationContext.java| 27 +- .../apex/translation/WindowBoundTranslator.java |8 +- .../operators/ApexGroupByKeyOperator.java |4 +- .../operators/ApexParDoOperator.java|6 +- .../ApexReadUnboundedInputOperator.java | 17 +- .../beam/runners/apex/ApexRunnerTest.java | 75 ++ .../beam/runners/apex/ApexYarnLauncherTest.java |9 +- .../runners/apex/examples/WordCountTest.java|2 +- .../translation/ParDoBoundTranslatorTest.java |6 +- .../translation/ReadUnboundTranslatorTest.java |8 +- .../utils/ApexStateInternalsTest.java |2 +- .../test/resources/beam-runners-apex.properties | 20 + runners/core-java/pom.xml |2 +- .../beam/runners/core/AssignWindowsDoFn.java|3 +- .../apache/beam/runners/core/DoFnAdapters.java | 343 ++ .../apache/beam/runners/core/DoFnRunner.java| 21 - .../apache/beam/runners/core/DoFnRunners.java | 138 +-- .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 10 +- .../runners/core/GroupAlsoByWindowsDoFn.java|5 +- .../beam/runners/core/KeyedWorkItemCoder.java |4 +- .../core/LateDataDroppingDoFnRunner.java|1 - .../apache/beam/runners/core/NonEmptyPanes.java |2 +- .../org/apache/beam/runners/core/OldDoFn.java | 472 .../runners/core/PerKeyCombineFnRunner.java | 70 -- .../runners/core/PerKeyCombineFnRunners.java| 101 -- .../beam/runners/core/SimpleDoFnRunner.java | 63 - .../beam/runners/core/SimpleOldDoFnRunner.java |7 +- .../beam/runners/core/SplittableParDo.java |7 - .../core/UnboundedReadFromBoundedSource.java| 14 +- .../AfterDelayFromFirstElementStateMachine.java |2 +- .../core/triggers/AfterPaneStateMachine.java|2 +- .../core/DoFnDelegatingAggregatorTest.java | 144 +++ .../core/GroupAlsoByWindowsProperties.java |2 +- .../runners/core/KeyedWorkItemCoderTest.java|6 + .../core
[25/50] [abbrv] beam git commit: Revert "Revert "Remove dataflow_test.py""
Revert "Revert "Remove dataflow_test.py"" This reverts commit 96fcc7d31c2540f867c3a73903c2aa99183a6b8b. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2aa7d47e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2aa7d47e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2aa7d47e Branch: refs/heads/master Commit: 2aa7d47e1491e0601b7b4d1476a8f182b2a14dc3 Parents: 4e1028b Author: Robert Bradshaw Authored: Tue Jan 24 16:33:55 2017 -0800 Committer: Robert Bradshaw Committed: Wed Jan 25 16:18:09 2017 -0800 -- sdks/python/apache_beam/dataflow_test.py| 418 --- .../apache_beam/transforms/ptransform_test.py | 67 +++ .../apache_beam/transforms/sideinputs_test.py | 208 - 3 files changed, 274 insertions(+), 419 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/2aa7d47e/sdks/python/apache_beam/dataflow_test.py -- diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py deleted file mode 100644 index f410230..000 --- a/sdks/python/apache_beam/dataflow_test.py +++ /dev/null @@ -1,418 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Integration tests for the dataflow package.""" - -from __future__ import absolute_import - -import logging -import re -import unittest - -import apache_beam as beam -from apache_beam.pvalue import AsDict -from apache_beam.pvalue import AsIter as AllOf -from apache_beam.pvalue import AsList -from apache_beam.pvalue import AsSingleton -from apache_beam.pvalue import EmptySideInput -from apache_beam.pvalue import SideOutputValue -from apache_beam.test_pipeline import TestPipeline -from apache_beam.transforms import Create -from apache_beam.transforms import DoFn -from apache_beam.transforms import FlatMap -from apache_beam.transforms import GroupByKey -from apache_beam.transforms import Map -from apache_beam.transforms import ParDo -from apache_beam.transforms import WindowInto -from apache_beam.transforms.util import assert_that -from apache_beam.transforms.util import equal_to -from apache_beam.transforms.window import IntervalWindow -from apache_beam.transforms.window import WindowFn -from nose.plugins.attrib import attr - - -class DataflowTest(unittest.TestCase): - """Dataflow integration tests.""" - - SAMPLE_DATA = ['aa bb cc aa bb aa \n'] * 10 - SAMPLE_RESULT = [('cc', 10), ('bb', 20), ('aa', 30)] - - @beam.ptransform_fn - def Count(pcoll): # pylint: disable=invalid-name, no-self-argument -"""A Count transform: v, ... => (v, n), ...""" -return (pcoll -| 'AddCount' >> Map(lambda x: (x, 1)) -| 'GroupCounts' >> GroupByKey() -| 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones - - @attr('ValidatesRunner') - def test_word_count(self): -pipeline = TestPipeline() -lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA) -result = ( -(lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x))) -.apply('CountWords', DataflowTest.Count)) -assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT)) -pipeline.run() - - @attr('ValidatesRunner') - def test_map(self): -pipeline = TestPipeline() -lines = pipeline | 'input' >> Create(['a', 'b', 'c']) -result = (lines - | 'upper' >> Map(str.upper) - | 'prefix' >> Map(lambda x, prefix: prefix + x, 'foo-')) -assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C'])) -pipeline.run() - - @attr('ValidatesRunner') - def test_par_do_with_side_input_as_arg(self): -pipeline = TestPipeline() -words_list = ['aa', 'bb', 'cc'] -words = pipeline | 'SomeWords' >> Create(words_list) -prefix = pipeline | 'SomeString' >> Create(['xyz']) # side in -suffix = 'zyx' -result = words | FlatMap( -'DecorateWords', -lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)], -AsSingleton(prefix), suffix) -assert_that(result, equal_to(['xyz-%s-zyx' % x for x
[14/50] [abbrv] beam git commit: Closes #1831
Closes #1831 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0dc1f37 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0dc1f37 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0dc1f37 Branch: refs/heads/master Commit: d0dc1f375982bab747eda8ea26f4a41b15a1ec01 Parents: af49908 96fcc7d Author: Robert Bradshaw Authored: Tue Jan 24 10:06:03 2017 -0800 Committer: Robert Bradshaw Committed: Tue Jan 24 10:06:03 2017 -0800 -- sdks/python/apache_beam/dataflow_test.py| 418 +++ .../apache_beam/transforms/ptransform_test.py | 67 --- .../apache_beam/transforms/sideinputs_test.py | 208 + 3 files changed, 419 insertions(+), 274 deletions(-) --
[21/50] [abbrv] beam git commit: Closes #1811
Closes #1811 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/59242205 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/59242205 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/59242205 Branch: refs/heads/master Commit: 592422059e21bf72fc7b4842d6fd6df000a7d2a7 Parents: 9540cf1 61d8d3f Author: Robert Bradshaw Authored: Wed Jan 25 12:38:03 2017 -0800 Committer: Robert Bradshaw Committed: Wed Jan 25 12:38:03 2017 -0800 -- sdks/python/apache_beam/pipeline_test.py | 57 ++- 1 file changed, 21 insertions(+), 36 deletions(-) --
[11/50] [abbrv] beam git commit: Closes #1812
Closes #1812 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/af49908b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/af49908b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/af49908b Branch: refs/heads/master Commit: af49908b8fb5bb34428343218461660d41ead399 Parents: deb2aea 6cb2f37 Author: Robert Bradshaw Authored: Mon Jan 23 14:37:45 2017 -0800 Committer: Robert Bradshaw Committed: Mon Jan 23 14:37:45 2017 -0800 -- sdks/python/apache_beam/runners/common.py | 34 ++ 1 file changed, 19 insertions(+), 15 deletions(-) --
[40/50] [abbrv] beam git commit: [BEAM-843] Use New DoFn Directly in Flink Runner
[BEAM-843] Use New DoFn Directly in Flink Runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4aaaf8fb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4aaaf8fb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4aaaf8fb Branch: refs/heads/master Commit: 4aaaf8fb94222e6e283fa79d9c7dc8d9a730d278 Parents: 27cf68e Author: JingsongLi Authored: Wed Jan 18 11:34:06 2017 +0800 Committer: Davor Bonaci Committed: Mon Jan 30 12:38:38 2017 -0800 -- .../GroupAlsoByWindowViaWindowSetNewDoFn.java | 156 +++ .../wrappers/streaming/DoFnOperator.java| 69 .../wrappers/streaming/WindowDoFnOperator.java | 143 + 3 files changed, 264 insertions(+), 104 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/4aaaf8fb/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java new file mode 100644 index 000..cff6e00 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.util.Collection; +import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; +import org.apache.beam.runners.core.triggers.TriggerStateMachines; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.SystemDoFnInternal; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateInternalsFactory; +import org.apache.beam.sdk.util.state.TimerInternalsFactory; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; + +/** + * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the + * {@link ReduceFnRunner}. + */ +@SystemDoFnInternal +public class GroupAlsoByWindowViaWindowSetNewDoFn< +K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem> +extends DoFn> { + + private static final long serialVersionUID = 1L; + + public static + DoFn, KV> create( + WindowingStrategy strategy, + StateInternalsFactory stateInternalsFactory, + TimerInternalsFactory timerInternalsFactory, + SideInputReader sideInputReader, + SystemReduceFn reduceFn, + DoFnRunners.OutputManager outputManager, + TupleTag> mainTag) { +return new GroupAlsoByWindowViaWindowSetNewDoFn<>( +strategy, stateInternalsFactory, timerInternalsFactory, sideInputReader, +reduceFn, outputManager, mainTag); + } + + protected final Aggregator droppedDueToClosedWindow = + createAggregator( + GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs()); + protected final Aggregator droppedDueToLateness = + createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs()); + private final WindowingStrategy windowingStrategy; + private SystemReduceFn reduceFn; + private transient StateInternalsFactory stateInternalsFactory; + private transient TimerInternalsFactory timerInternalsFactory; + private transient SideInputReader sideInputReader; + private transient DoFnRunners.OutputManager outputManager
[02/50] [abbrv] beam git commit: Closes #1809
Closes #1809 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/946135f6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/946135f6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/946135f6 Branch: refs/heads/master Commit: 946135f6a955d9e27e7553c4cefef354ecd2535d Parents: c03e6f3 56512ab Author: Robert Bradshaw Authored: Sat Jan 21 00:30:35 2017 -0800 Committer: Robert Bradshaw Committed: Sat Jan 21 00:30:35 2017 -0800 -- sdks/python/apache_beam/runners/dataflow_runner.py | 6 -- sdks/python/apache_beam/runners/dataflow_runner_test.py | 5 ++--- 2 files changed, 6 insertions(+), 5 deletions(-) --
[18/50] [abbrv] beam git commit: Closes #1820
Closes #1820 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/43cb4d70 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/43cb4d70 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/43cb4d70 Branch: refs/heads/master Commit: 43cb4d70980af758bfea9a3c65530ca53a6239ec Parents: f983123 52fc95d Author: Robert Bradshaw Authored: Tue Jan 24 16:31:03 2017 -0800 Committer: Robert Bradshaw Committed: Tue Jan 24 16:31:03 2017 -0800 -- sdks/python/apache_beam/io/fileio.py| 542 +- sdks/python/apache_beam/io/fileio_test.py | 729 +-- .../runners/direct/transform_evaluator.py | 5 - 3 files changed, 3 insertions(+), 1273 deletions(-) --
[05/50] [abbrv] beam git commit: Add some typing to prevent speed regression for old_dofn.
Add some typing to prevent speed regression for old_dofn. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f68c9dc8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f68c9dc8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f68c9dc8 Branch: refs/heads/master Commit: f68c9dc8d17881c43c31922375fec9593265cc5d Parents: d0474ab Author: Robert Bradshaw Authored: Sat Jan 21 20:52:02 2017 -0800 Committer: Robert Bradshaw Committed: Sat Jan 21 20:52:02 2017 -0800 -- sdks/python/apache_beam/runners/common.pxd | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/f68c9dc8/sdks/python/apache_beam/runners/common.pxd -- diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index 06fe434..10d1f96 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -36,15 +36,17 @@ cdef class DoFnRunner(Receiver): cdef object tagged_receivers cdef LoggingContext logging_context cdef object step_name - cdef object is_new_dofn + cdef bint is_new_dofn cdef object args - cdef object kwargs + cdef dict kwargs cdef object side_inputs cdef bint has_windowed_side_inputs cdef Receiver main_receivers cpdef process(self, WindowedValue element) + cdef old_dofn_process(self, WindowedValue element) + cdef new_dofn_process(self, WindowedValue element) @cython.locals(windowed_value=WindowedValue) cpdef _process_outputs(self, WindowedValue element, results)
[24/50] [abbrv] beam git commit: Use a temp directory for requirements cache in test_with_requirements_file
Use a temp directory for requirements cache in test_with_requirements_file The test fails if there are leftover files in the default folder for requirements cache either from earlier tests, or from the previous workspaces. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5787e817 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5787e817 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5787e817 Branch: refs/heads/master Commit: 5787e817a7eda4859963d535df21f2fa00edf8af Parents: 5924220 Author: Ahmet Altay Authored: Wed Jan 25 09:57:18 2017 -0800 Committer: Robert Bradshaw Committed: Wed Jan 25 16:16:52 2017 -0800 -- .../python/apache_beam/utils/dependency_test.py | 47 +++- 1 file changed, 27 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/5787e817/sdks/python/apache_beam/utils/dependency_test.py -- diff --git a/sdks/python/apache_beam/utils/dependency_test.py b/sdks/python/apache_beam/utils/dependency_test.py index a484d60..75a89e2 100644 --- a/sdks/python/apache_beam/utils/dependency_test.py +++ b/sdks/python/apache_beam/utils/dependency_test.py @@ -106,27 +106,34 @@ class SetupTest(unittest.TestCase): dependency.stage_job_resources(options)) def test_with_requirements_file(self): -staging_dir = tempfile.mkdtemp() -source_dir = tempfile.mkdtemp() +try: + staging_dir = tempfile.mkdtemp() + requirements_cache_dir = tempfile.mkdtemp() + source_dir = tempfile.mkdtemp() -options = PipelineOptions() -options.view_as(GoogleCloudOptions).staging_location = staging_dir -self.update_options(options) -options.view_as(SetupOptions).requirements_file = os.path.join( -source_dir, dependency.REQUIREMENTS_FILE) -self.create_temp_file( -os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing') -self.assertEqual( -sorted([dependency.REQUIREMENTS_FILE, -'abc.txt', 'def.txt']), -sorted(dependency.stage_job_resources( -options, -populate_requirements_cache=self.populate_requirements_cache))) -self.assertTrue( -os.path.isfile( -os.path.join(staging_dir, dependency.REQUIREMENTS_FILE))) -self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt'))) -self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt'))) + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).requirements_cache = requirements_cache_dir + options.view_as(SetupOptions).requirements_file = os.path.join( + source_dir, dependency.REQUIREMENTS_FILE) + self.create_temp_file( + os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing') + self.assertEqual( + sorted([dependency.REQUIREMENTS_FILE, + 'abc.txt', 'def.txt']), + sorted(dependency.stage_job_resources( + options, + populate_requirements_cache=self.populate_requirements_cache))) + self.assertTrue( + os.path.isfile( + os.path.join(staging_dir, dependency.REQUIREMENTS_FILE))) + self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'abc.txt'))) + self.assertTrue(os.path.isfile(os.path.join(staging_dir, 'def.txt'))) +finally: + shutil.rmtree(staging_dir) + shutil.rmtree(requirements_cache_dir) + shutil.rmtree(source_dir) def test_requirements_file_not_present(self): staging_dir = tempfile.mkdtemp()
[29/50] [abbrv] beam git commit: Refactoring metrics infrastructure
Refactoring metrics infrastructure Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b148f5cc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b148f5cc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b148f5cc Branch: refs/heads/master Commit: b148f5cc9f3e414b9cd1f605b25d50e21f626b7a Parents: e3849af Author: Pablo Authored: Mon Jan 23 17:50:21 2017 -0800 Committer: Robert Bradshaw Committed: Thu Jan 26 15:28:49 2017 -0800 -- sdks/python/apache_beam/metrics/execution.pxd | 31 + sdks/python/apache_beam/metrics/execution.py| 70 sdks/python/apache_beam/runners/common.pxd | 2 + sdks/python/apache_beam/runners/common.py | 11 ++- .../apache_beam/runners/direct/executor.py | 12 ++-- .../runners/direct/transform_evaluator.py | 54 --- sdks/python/setup.py| 1 + 7 files changed, 125 insertions(+), 56 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/metrics/execution.pxd -- diff --git a/sdks/python/apache_beam/metrics/execution.pxd b/sdks/python/apache_beam/metrics/execution.pxd new file mode 100644 index 000..d89004f --- /dev/null +++ b/sdks/python/apache_beam/metrics/execution.pxd @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +cimport cython + + +cdef class MetricsContainer(object): + cdef object step_name + cdef public object counters + cdef public object distributions + + +cdef class ScopedMetricsContainer(object): + cpdef enter(self) + cpdef exit(self) + cdef list _stack + cdef MetricsContainer _container http://git-wip-us.apache.org/repos/asf/beam/blob/b148f5cc/sdks/python/apache_beam/metrics/execution.py -- diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py index 8f04b7b..3ba1735 100644 --- a/sdks/python/apache_beam/metrics/execution.py +++ b/sdks/python/apache_beam/metrics/execution.py @@ -98,36 +98,49 @@ class MetricResult(object): self.key, self.committed, self.attempted) -class MetricsEnvironment(object): +class _MetricsEnvironment(object): """Holds the MetricsContainer for every thread and other metric information. This class is not meant to be instantiated, instead being used to keep track of global state. """ - METRICS_SUPPORTED = False - _METRICS_SUPPORTED_LOCK = threading.Lock() - - PER_THREAD = threading.local() + def __init__(self): +self.METRICS_SUPPORTED = False +self._METRICS_SUPPORTED_LOCK = threading.Lock() +self.PER_THREAD = threading.local() +self.set_container_stack() + + def set_container_stack(self): +if not hasattr(self.PER_THREAD, 'container'): + self.PER_THREAD.container = [] + + def container_stack(self): +self.set_container_stack() +return self.PER_THREAD.container + + def set_metrics_supported(self, supported): +self.set_container_stack() +with self._METRICS_SUPPORTED_LOCK: + self.METRICS_SUPPORTED = supported + + def current_container(self): +self.set_container_stack() +index = len(self.PER_THREAD.container) - 1 +if index < 0: + return None +else: + return self.PER_THREAD.container[index] - @classmethod - def set_metrics_supported(cls, supported): -with cls._METRICS_SUPPORTED_LOCK: - cls.METRICS_SUPPORTED = supported + def set_current_container(self, container): +self.set_container_stack() +self.PER_THREAD.container.append(container) - @classmethod - def current_container(cls): -try: - return cls.PER_THREAD.container -except AttributeError: - return None + def unset_current_container(self): +self.set_container_stack() +self.PER_THREAD.container.pop() - @classmethod - def set_current_container(cls, container): -cls.PER_THREAD.container = container - @classmethod - def unset_current_conta
[50/50] [abbrv] beam git commit: This closes #1872
This closes #1872 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c3b97a28 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c3b97a28 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c3b97a28 Branch: refs/heads/master Commit: c3b97a2878a6ccb7b380cb7724ee0719a1d25d2e Parents: 847e4e9 2d7ce32 Author: Davor Bonaci Authored: Mon Jan 30 14:59:03 2017 -0800 Committer: Davor Bonaci Committed: Mon Jan 30 14:59:03 2017 -0800 -- .gitignore | 10 + .travis.yml | 21 +- pom.xml | 20 + sdks/pom.xml|2 + sdks/python/.pylintrc | 164 + sdks/python/MANIFEST.in | 19 + sdks/python/README.md | 372 ++ sdks/python/apache_beam/__init__.py | 82 + sdks/python/apache_beam/coders/__init__.py | 19 + sdks/python/apache_beam/coders/coder_impl.pxd | 143 + sdks/python/apache_beam/coders/coder_impl.py| 597 +++ sdks/python/apache_beam/coders/coders.py| 707 +++ sdks/python/apache_beam/coders/coders_test.py | 115 + .../apache_beam/coders/coders_test_common.py| 355 ++ .../apache_beam/coders/fast_coders_test.py | 37 + sdks/python/apache_beam/coders/observable.py| 38 + .../apache_beam/coders/observable_test.py | 57 + .../coders/proto2_coder_test_messages_pb2.py| 318 ++ .../apache_beam/coders/slow_coders_test.py | 39 + sdks/python/apache_beam/coders/slow_stream.py | 154 + .../apache_beam/coders/standard_coders.yaml | 67 + .../apache_beam/coders/standard_coders_test.py | 136 + sdks/python/apache_beam/coders/stream.pxd | 63 + sdks/python/apache_beam/coders/stream.pyx | 215 + sdks/python/apache_beam/coders/stream_test.py | 169 + sdks/python/apache_beam/coders/typecoders.py| 186 + .../apache_beam/coders/typecoders_test.py | 117 + sdks/python/apache_beam/error.py| 42 + sdks/python/apache_beam/examples/__init__.py| 16 + .../apache_beam/examples/complete/__init__.py | 16 + .../examples/complete/autocomplete.py | 89 + .../examples/complete/autocomplete_test.py | 52 + .../examples/complete/estimate_pi.py| 125 + .../examples/complete/estimate_pi_test.py | 52 + .../examples/complete/juliaset/__init__.py | 16 + .../complete/juliaset/juliaset/__init__.py | 16 + .../complete/juliaset/juliaset/juliaset.py | 123 + .../complete/juliaset/juliaset/juliaset_test.py | 86 + .../examples/complete/juliaset/juliaset_main.py | 58 + .../examples/complete/juliaset/setup.py | 116 + .../apache_beam/examples/complete/tfidf.py | 208 + .../apache_beam/examples/complete/tfidf_test.py | 92 + .../examples/complete/top_wikipedia_sessions.py | 180 + .../complete/top_wikipedia_sessions_test.py | 62 + .../apache_beam/examples/cookbook/__init__.py | 16 + .../examples/cookbook/bigquery_schema.py| 130 + .../examples/cookbook/bigquery_side_input.py| 123 + .../cookbook/bigquery_side_input_test.py| 54 + .../examples/cookbook/bigquery_tornadoes.py | 99 + .../cookbook/bigquery_tornadoes_test.py | 44 + .../apache_beam/examples/cookbook/bigshuffle.py | 95 + .../examples/cookbook/bigshuffle_test.py| 63 + .../apache_beam/examples/cookbook/coders.py | 101 + .../examples/cookbook/coders_test.py| 49 + .../examples/cookbook/combiners_test.py | 74 + .../examples/cookbook/custom_ptransform.py | 134 + .../examples/cookbook/custom_ptransform_test.py | 53 + .../examples/cookbook/datastore_wordcount.py| 256 ++ .../apache_beam/examples/cookbook/filters.py| 107 + .../examples/cookbook/filters_test.py | 69 + .../examples/cookbook/group_with_coder.py | 122 + .../examples/cookbook/group_with_coder_test.py | 89 + .../examples/cookbook/mergecontacts.py | 133 + .../examples/cookbook/mergecontacts_test.py | 125 + .../examples/cookbook/multiple_output_pardo.py | 181 + .../cookbook/multiple_output_pardo_test.py | 72 + .../apache_beam/examples/snippets/__init__.py | 16 + .../apache_beam/examples/snippets/snippets.py | 1142 + .../examples/snippets/snippets_test.py | 758 .../apache_beam/examples/streaming_wordcap.py | 64 + .../apache_beam/examples/streaming_wordcount.py | 74 + sdks/python/apache_beam/examples/wordcount.py | 109 + .../apache_beam/examples/wordcount_debugging.py | 166 + .../examples/wordcount_debugging_test.py| 59 + .../apache_beam/examples/wordcount_it_test.py | 59 + .../apache_beam/examples/wordcount_minimal.py | 121
[19/50] [abbrv] beam git commit: Install test dependencies in the post commit script.
Install test dependencies in the post commit script. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/19789db9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/19789db9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/19789db9 Branch: refs/heads/master Commit: 19789db9cab031e0891cb67c4ab6b8b03c6a8c09 Parents: 43cb4d7 Author: Ahmet Altay Authored: Tue Jan 24 15:13:03 2017 -0800 Committer: Robert Bradshaw Committed: Tue Jan 24 16:32:36 2017 -0800 -- sdks/python/run_postcommit.sh | 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/beam/blob/19789db9/sdks/python/run_postcommit.sh -- diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh index 2e419a5..3756075 100755 --- a/sdks/python/run_postcommit.sh +++ b/sdks/python/run_postcommit.sh @@ -70,6 +70,10 @@ python setup.py sdist SDK_LOCATION=$(find dist/apache-beam-sdk-*.tar.gz) +# Install test dependencies for ValidatesRunner tests. +echo "pyhamcrest" > postcommit_requirements.txt +echo "mock" >> postcommit_requirements.txt + # Run ValidatesRunner tests on Google Cloud Dataflow service echo ">>> RUNNING DATAFLOW RUNNER VALIDATESRUNNER TESTS" python setup.py nosetests \ @@ -80,6 +84,7 @@ python setup.py nosetests \ --temp_location=$GCS_LOCATION/temp-validatesrunner-test \ --sdk_location=$SDK_LOCATION \ --job_name=$JOBNAME_VR_TEST \ +--requirements_file=postcommit_requirements.txt \ --num_workers=1" # Run wordcount on the Google Cloud Dataflow service
[44/50] [abbrv] beam git commit: Add mock time to slow bigquery unit tests.
Add mock time to slow bigquery unit tests. Unit tests, testing retries does not need to use real time. This change reduces the total tox time for unit tests from 235 seconds to 73 seconds locally. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e02ddac3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e02ddac3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e02ddac3 Branch: refs/heads/master Commit: e02ddac308b8b1ea0bd0cb0ae4f9ba4908a50595 Parents: 475707f Author: Ahmet Altay Authored: Fri Jan 27 17:35:24 2017 -0800 Committer: Davor Bonaci Committed: Mon Jan 30 12:44:55 2017 -0800 -- sdks/python/apache_beam/io/bigquery_test.py | 26 1 file changed, 18 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/e02ddac3/sdks/python/apache_beam/io/bigquery_test.py -- diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py index b8682d1..14eb035 100644 --- a/sdks/python/apache_beam/io/bigquery_test.py +++ b/sdks/python/apache_beam/io/bigquery_test.py @@ -539,7 +539,8 @@ class TestBigQueryReader(unittest.TestCase): class TestBigQueryWriter(unittest.TestCase): - def test_no_table_and_create_never(self): + @mock.patch('time.sleep', return_value=None) + def test_no_table_and_create_never(self, patched_time_sleep): client = mock.Mock() client.tables.Get.side_effect = HttpError( response={'status': '404'}, url='', content='') @@ -572,7 +573,9 @@ class TestBigQueryWriter(unittest.TestCase): self.assertTrue(client.tables.Get.called) self.assertTrue(client.tables.Insert.called) - def test_no_table_and_create_if_needed_and_no_schema(self): + @mock.patch('time.sleep', return_value=None) + def test_no_table_and_create_if_needed_and_no_schema( + self, patched_time_sleep): client = mock.Mock() client.tables.Get.side_effect = HttpError( response={'status': '404'}, url='', content='') @@ -587,7 +590,9 @@ class TestBigQueryWriter(unittest.TestCase): 'Table project:dataset.table requires a schema. None can be inferred ' 'because the table does not exist.') - def test_table_not_empty_and_write_disposition_empty(self): + @mock.patch('time.sleep', return_value=None) + def test_table_not_empty_and_write_disposition_empty( + self, patched_time_sleep): client = mock.Mock() client.tables.Get.return_value = bigquery.Table( tableReference=bigquery.TableReference( @@ -712,7 +717,8 @@ class TestBigQueryWrapper(unittest.TestCase): wrapper._delete_dataset('', '') self.assertTrue(client.datasets.Delete.called) - def test_delete_dataset_retries_fail(self): + @mock.patch('time.sleep', return_value=None) + def test_delete_dataset_retries_fail(self, patched_time_sleep): client = mock.Mock() client.datasets.Delete.side_effect = ValueError("Cannot delete") wrapper = beam.io.bigquery.BigQueryWrapper(client) @@ -730,7 +736,8 @@ class TestBigQueryWrapper(unittest.TestCase): wrapper._delete_table('', '', '') self.assertTrue(client.tables.Delete.called) - def test_delete_table_retries_fail(self): + @mock.patch('time.sleep', return_value=None) + def test_delete_table_retries_fail(self, patched_time_sleep): client = mock.Mock() client.tables.Delete.side_effect = ValueError("Cannot delete") wrapper = beam.io.bigquery.BigQueryWrapper(client) @@ -738,7 +745,8 @@ class TestBigQueryWrapper(unittest.TestCase): wrapper._delete_table('', '', '') self.assertTrue(client.tables.Delete.called) - def test_delete_dataset_retries_for_timeouts(self): + @mock.patch('time.sleep', return_value=None) + def test_delete_dataset_retries_for_timeouts(self, patched_time_sleep): client = mock.Mock() client.datasets.Delete.side_effect = [ HttpError( @@ -749,7 +757,8 @@ class TestBigQueryWrapper(unittest.TestCase): wrapper._delete_dataset('', '') self.assertTrue(client.datasets.Delete.called) - def test_delete_table_retries_for_timeouts(self): + @mock.patch('time.sleep', return_value=None) + def test_delete_table_retries_for_timeouts(self, patched_time_sleep): client = mock.Mock() client.tables.Delete.side_effect = [ HttpError( @@ -760,7 +769,8 @@ class TestBigQueryWrapper(unittest.TestCase): wrapper._delete_table('', '', '') self.assertTrue(client.tables.Delete.called) - def test_temporary_dataset_is_unique(self): + @mock.patch('time.sleep', return_value=None) + def test_temporary_dataset_is_unique(self, patched_time_sleep): client = mock.Mock() client.datasets.Get.return_value = bigquery.Dataset( datasetReference=bigquery.D
[16/50] [abbrv] beam git commit: Closes #1832
Closes #1832 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f9831236 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f9831236 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f9831236 Branch: refs/heads/master Commit: f9831236c4a94af35a0a40a649323b578a3d92e4 Parents: d0dc1f3 9052366 Author: Robert Bradshaw Authored: Tue Jan 24 13:48:12 2017 -0800 Committer: Robert Bradshaw Committed: Tue Jan 24 13:48:12 2017 -0800 -- sdks/python/apache_beam/runners/common.py | 3 +++ 1 file changed, 3 insertions(+) --
[03/50] [abbrv] beam git commit: Implement Annotation based NewDoFn in python SDK
Implement Annotation based NewDoFn in python SDK Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9e272ecf Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9e272ecf Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9e272ecf Branch: refs/heads/master Commit: 9e272ecf639b7b13f23a83868fd101a437159c1c Parents: 946135f Author: Sourabh Bajaj Authored: Fri Jan 20 17:17:25 2017 -0800 Committer: Robert Bradshaw Committed: Sat Jan 21 20:37:07 2017 -0800 -- sdks/python/apache_beam/pipeline_test.py| 100 - sdks/python/apache_beam/runners/common.pxd | 4 + sdks/python/apache_beam/runners/common.py | 221 +-- .../runners/direct/transform_evaluator.py | 15 +- sdks/python/apache_beam/transforms/core.py | 113 +- sdks/python/apache_beam/typehints/decorators.py | 2 +- sdks/python/apache_beam/typehints/typecheck.py | 145 7 files changed, 531 insertions(+), 69 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/9e272ecf/sdks/python/apache_beam/pipeline_test.py -- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 336bf54..93b68d1 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -24,15 +24,23 @@ import unittest from apache_beam.pipeline import Pipeline from apache_beam.pipeline import PipelineOptions from apache_beam.pipeline import PipelineVisitor +from apache_beam.pvalue import AsSingleton from apache_beam.runners.dataflow.native_io.iobase import NativeSource from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms import CombineGlobally from apache_beam.transforms import Create from apache_beam.transforms import FlatMap from apache_beam.transforms import Map +from apache_beam.transforms import NewDoFn +from apache_beam.transforms import ParDo from apache_beam.transforms import PTransform from apache_beam.transforms import Read -from apache_beam.transforms.util import assert_that, equal_to +from apache_beam.transforms import WindowInto +from apache_beam.transforms.util import assert_that +from apache_beam.transforms.util import equal_to +from apache_beam.transforms.window import IntervalWindow +from apache_beam.transforms.window import WindowFn +from apache_beam.utils.timestamp import MIN_TIMESTAMP class FakeSource(NativeSource): @@ -241,6 +249,96 @@ class PipelineTest(unittest.TestCase): self.assertEqual([1, 4, 9], p | Create([1, 2, 3]) | Map(lambda x: x*x)) +class NewDoFnTest(unittest.TestCase): + + def setUp(self): +self.runner_name = 'DirectRunner' + + def test_element(self): +class TestDoFn(NewDoFn): + def process(self, element): +yield element + 10 + +pipeline = TestPipeline(runner=self.runner_name) +pcoll = pipeline | 'Create' >> Create([1, 2]) | 'Do' >> ParDo(TestDoFn()) +assert_that(pcoll, equal_to([11, 12])) +pipeline.run() + + def test_context_param(self): +class TestDoFn(NewDoFn): + def process(self, element, context=NewDoFn.ContextParam): +yield context.element + 10 + +pipeline = TestPipeline(runner=self.runner_name) +pcoll = pipeline | 'Create' >> Create([1, 2])| 'Do' >> ParDo(TestDoFn()) +assert_that(pcoll, equal_to([11, 12])) +pipeline.run() + + def test_side_input_no_tag(self): +class TestDoFn(NewDoFn): + def process(self, element, prefix, suffix): +return ['%s-%s-%s' % (prefix, element, suffix)] + +pipeline = TestPipeline() +words_list = ['aa', 'bb', 'cc'] +words = pipeline | 'SomeWords' >> Create(words_list) +prefix = 'zyx' +suffix = pipeline | 'SomeString' >> Create(['xyz']) # side in +result = words | 'DecorateWordsDoFnNoTag' >> ParDo( +TestDoFn(), prefix, suffix=AsSingleton(suffix)) +assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list])) +pipeline.run() + + def test_side_input_tagged(self): +class TestDoFn(NewDoFn): + def process(self, element, prefix, suffix=NewDoFn.SideInputParam): +return ['%s-%s-%s' % (prefix, element, suffix)] + +pipeline = TestPipeline() +words_list = ['aa', 'bb', 'cc'] +words = pipeline | 'SomeWords' >> Create(words_list) +prefix = 'zyx' +suffix = pipeline | 'SomeString' >> Create(['xyz']) # side in +result = words | 'DecorateWordsDoFnNoTag' >> ParDo( +TestDoFn(), prefix, suffix=AsSingleton(suffix)) +assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list])) +pipeline.run() + + def test_window_param(self): +class TestDoFn(NewDoFn): + def process(self, element, window=NewDoFn.WindowParam): +yield (float(window.start)
[31/50] [abbrv] beam git commit: Updating dataflow client protos to add new metrics.
http://git-wip-us.apache.org/repos/asf/beam/blob/901a14c4/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py -- diff --git a/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py b/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py index 178a542..a42154e 100644 --- a/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py +++ b/sdks/python/apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py @@ -24,6 +24,7 @@ and continuous computation. from apitools.base.protorpclite import messages as _messages from apitools.base.py import encoding +from apitools.base.py import extra_types package = 'dataflow' @@ -193,6 +194,7 @@ class CounterMetadata(_messages.Message): AND: Aggregated value represents the logical 'and' of all contributed values. SET: Aggregated value is a set of unique contributed values. + DISTRIBUTION: Aggregated value captures statistics about a distribution. """ INVALID = 0 SUM = 1 @@ -202,6 +204,7 @@ class CounterMetadata(_messages.Message): OR = 5 AND = 6 SET = 7 +DISTRIBUTION = 8 class StandardUnitsValueValuesEnum(_messages.Enum): """System defined Units, see above enum. @@ -308,6 +311,7 @@ class CounterUpdate(_messages.Message): aggregate value accumulated since the worker started working on this WorkItem. By default this is false, indicating that this counter is reported as a delta. +distribution: Distribution data floatingPoint: Floating point value for Sum, Max, Min. floatingPointList: List of floating point numbers, for Set. floatingPointMean: Floating point mean aggregation value for Mean. @@ -326,34 +330,38 @@ class CounterUpdate(_messages.Message): boolean = _messages.BooleanField(1) cumulative = _messages.BooleanField(2) - floatingPoint = _messages.FloatField(3) - floatingPointList = _messages.MessageField('FloatingPointList', 4) - floatingPointMean = _messages.MessageField('FloatingPointMean', 5) - integer = _messages.MessageField('SplitInt64', 6) - integerList = _messages.MessageField('IntegerList', 7) - integerMean = _messages.MessageField('IntegerMean', 8) - internal = _messages.MessageField('extra_types.JsonValue', 9) - nameAndKind = _messages.MessageField('NameAndKind', 10) - shortId = _messages.IntegerField(11) - stringList = _messages.MessageField('StringList', 12) - structuredNameAndMetadata = _messages.MessageField('CounterStructuredNameAndMetadata', 13) + distribution = _messages.MessageField('DistributionUpdate', 3) + floatingPoint = _messages.FloatField(4) + floatingPointList = _messages.MessageField('FloatingPointList', 5) + floatingPointMean = _messages.MessageField('FloatingPointMean', 6) + integer = _messages.MessageField('SplitInt64', 7) + integerList = _messages.MessageField('IntegerList', 8) + integerMean = _messages.MessageField('IntegerMean', 9) + internal = _messages.MessageField('extra_types.JsonValue', 10) + nameAndKind = _messages.MessageField('NameAndKind', 11) + shortId = _messages.IntegerField(12) + stringList = _messages.MessageField('StringList', 13) + structuredNameAndMetadata = _messages.MessageField('CounterStructuredNameAndMetadata', 14) class CreateJobFromTemplateRequest(_messages.Message): - """Request to create a Dataflow job. + """A request to create a Cloud Dataflow job from a template. Messages: -ParametersValue: Dynamic parameterization of the job's runtime - environment. +ParametersValue: The runtime parameters to pass to the job. Fields: -gcsPath: A path to the serialized JSON representation of the job. -parameters: Dynamic parameterization of the job's runtime environment. +environment: The runtime environment for the job. +gcsPath: Required. A Cloud Storage path to the template from which to + create the job. Must be a valid Cloud Storage URL, beginning with + `gs://`. +jobName: Required. The job name to use for the created job. +parameters: The runtime parameters to pass to the job. """ @encoding.MapUnrecognizedFields('additionalProperties') class ParametersValue(_messages.Message): -"""Dynamic parameterization of the job's runtime environment. +"""The runtime parameters to pass to the job. Messages: AdditionalProperty: An additional property for a ParametersValue object. @@ -375,8 +383,10 @@ class CreateJobFromTemplateRequest(_messages.Message): additionalProperties = _messages.MessageField('AdditionalProperty', 1, repeated=True) - gcsPath = _messages.StringField(1) - parameters = _messages.MessageField('ParametersValue', 2) + environment = _messages.MessageField('RuntimeEnvironment', 1) + gcsPath = _messages.StringField(2) + jobName = _messages.StringField(3) + parameters = _messages.MessageField('Paramet
[47/50] [abbrv] beam git commit: This closes #1871
This closes #1871 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/be0e32e3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/be0e32e3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/be0e32e3 Branch: refs/heads/master Commit: be0e32e36313390ed04106d57f4c9dfeabb91b4d Parents: 1390699 0b4ee73 Author: Davor Bonaci Authored: Mon Jan 30 13:25:53 2017 -0800 Committer: Davor Bonaci Committed: Mon Jan 30 13:25:53 2017 -0800 -- .travis.yml | 15 +++ .../examples/cookbook/datastore_wordcount.py | 2 +- sdks/python/setup.py | 4 ++-- 3 files changed, 18 insertions(+), 3 deletions(-) --
[42/50] [abbrv] beam git commit: Updates places in SDK that creates thread pools.
Updates places in SDK that creates thread pools. Moves ThreadPool creation to a util function. Records and resets logging level due to this being reset by apitools when used with a ThreadPool. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/51afc1cc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/51afc1cc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/51afc1cc Branch: refs/heads/master Commit: 51afc1ccfe78a0657b5f9bc139d1d4e7938ed672 Parents: f29527f Author: Chamikara Jayalath Authored: Sat Jan 28 08:54:33 2017 -0800 Committer: Davor Bonaci Committed: Mon Jan 30 12:43:37 2017 -0800 -- sdks/python/apache_beam/internal/util.py | 33 ++ sdks/python/apache_beam/io/filebasedsource.py | 17 +++ sdks/python/apache_beam/io/fileio.py | 11 ++-- 3 files changed, 40 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/51afc1cc/sdks/python/apache_beam/internal/util.py -- diff --git a/sdks/python/apache_beam/internal/util.py b/sdks/python/apache_beam/internal/util.py index 2d12d49..5b31e88 100644 --- a/sdks/python/apache_beam/internal/util.py +++ b/sdks/python/apache_beam/internal/util.py @@ -17,6 +17,11 @@ """Utility functions used throughout the package.""" +import logging +from multiprocessing.pool import ThreadPool +import threading +import weakref + class ArgumentPlaceholder(object): """A place holder object replacing PValues in argument lists. @@ -92,3 +97,31 @@ def insert_values_in_args(args, kwargs, values): (k, v_iter.next()) if isinstance(v, ArgumentPlaceholder) else (k, v) for k, v in sorted(kwargs.iteritems())) return (new_args, new_kwargs) + + +def run_using_threadpool(fn_to_execute, inputs, pool_size): + """Runs the given function on given inputs using a thread pool. + + Args: +fn_to_execute: Function to execute +inputs: Inputs on which given function will be executed in parallel. +pool_size: Size of thread pool. + Returns: +Results retrieved after executing the given function on given inputs. + """ + + # ThreadPool crashes in old versions of Python (< 2.7.5) if created + # from a child thread. (http://bugs.python.org/issue10015) + if not hasattr(threading.current_thread(), '_children'): +threading.current_thread()._children = weakref.WeakKeyDictionary() + pool = ThreadPool(min(pool_size, len(inputs))) + try: +# We record and reset logging level here since 'apitools' library Beam +# depends on updates the logging level when used with a threadpool - +# https://github.com/google/apitools/issues/141 +# TODO: Remove this once above issue in 'apitools' is fixed. +old_level = logging.getLogger().level +return pool.map(fn_to_execute, inputs) + finally: +pool.terminate() +logging.getLogger().setLevel(old_level) http://git-wip-us.apache.org/repos/asf/beam/blob/51afc1cc/sdks/python/apache_beam/io/filebasedsource.py -- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index 1bfde25..582d673 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -26,11 +26,9 @@ For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``. """ import random -import threading -import weakref -from multiprocessing.pool import ThreadPool from apache_beam.internal import pickler +from apache_beam.internal import util from apache_beam.io import concat_source from apache_beam.io import fileio from apache_beam.io import iobase @@ -158,16 +156,9 @@ class FileBasedSource(iobase.BoundedSource): return [fileio.ChannelFactory.size_in_bytes(file_names[0])] else: if pattern is None: -# ThreadPool crashes in old versions of Python (< 2.7.5) if created -# from a child thread. (http://bugs.python.org/issue10015) -if not hasattr(threading.current_thread(), '_children'): - threading.current_thread()._children = weakref.WeakKeyDictionary() -pool = ThreadPool( -min(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION, len(file_names))) -try: - return pool.map(fileio.ChannelFactory.size_in_bytes, file_names) -finally: - pool.terminate() +return util.run_using_threadpool( +fileio.ChannelFactory.size_in_bytes, file_names, +MAX_NUM_THREADS_FOR_SIZE_ESTIMATION) else: file_sizes = fileio.ChannelFactory.size_of_files_in_glob(pattern, file_names) http://git-wip-us.apache.org/repos/asf/beam/blob
[10/50] [abbrv] beam git commit: Remove dataflow_test.py
Remove dataflow_test.py Many of these tests were redundant with tests elsewhere, and the ones that weren't were put closer to similar tests. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d5b90d83 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d5b90d83 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d5b90d83 Branch: refs/heads/master Commit: d5b90d8383e662e803ea79b31661250a043bcfd2 Parents: 01b3628 Author: Robert Bradshaw Authored: Sat Jan 21 21:53:42 2017 -0800 Committer: Robert Bradshaw Committed: Mon Jan 23 14:36:55 2017 -0800 -- sdks/python/apache_beam/dataflow_test.py| 418 --- .../apache_beam/transforms/ptransform_test.py | 67 +++ .../apache_beam/transforms/sideinputs_test.py | 208 - 3 files changed, 274 insertions(+), 419 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/d5b90d83/sdks/python/apache_beam/dataflow_test.py -- diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py deleted file mode 100644 index f410230..000 --- a/sdks/python/apache_beam/dataflow_test.py +++ /dev/null @@ -1,418 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Integration tests for the dataflow package.""" - -from __future__ import absolute_import - -import logging -import re -import unittest - -import apache_beam as beam -from apache_beam.pvalue import AsDict -from apache_beam.pvalue import AsIter as AllOf -from apache_beam.pvalue import AsList -from apache_beam.pvalue import AsSingleton -from apache_beam.pvalue import EmptySideInput -from apache_beam.pvalue import SideOutputValue -from apache_beam.test_pipeline import TestPipeline -from apache_beam.transforms import Create -from apache_beam.transforms import DoFn -from apache_beam.transforms import FlatMap -from apache_beam.transforms import GroupByKey -from apache_beam.transforms import Map -from apache_beam.transforms import ParDo -from apache_beam.transforms import WindowInto -from apache_beam.transforms.util import assert_that -from apache_beam.transforms.util import equal_to -from apache_beam.transforms.window import IntervalWindow -from apache_beam.transforms.window import WindowFn -from nose.plugins.attrib import attr - - -class DataflowTest(unittest.TestCase): - """Dataflow integration tests.""" - - SAMPLE_DATA = ['aa bb cc aa bb aa \n'] * 10 - SAMPLE_RESULT = [('cc', 10), ('bb', 20), ('aa', 30)] - - @beam.ptransform_fn - def Count(pcoll): # pylint: disable=invalid-name, no-self-argument -"""A Count transform: v, ... => (v, n), ...""" -return (pcoll -| 'AddCount' >> Map(lambda x: (x, 1)) -| 'GroupCounts' >> GroupByKey() -| 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones - - @attr('ValidatesRunner') - def test_word_count(self): -pipeline = TestPipeline() -lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA) -result = ( -(lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x))) -.apply('CountWords', DataflowTest.Count)) -assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT)) -pipeline.run() - - @attr('ValidatesRunner') - def test_map(self): -pipeline = TestPipeline() -lines = pipeline | 'input' >> Create(['a', 'b', 'c']) -result = (lines - | 'upper' >> Map(str.upper) - | 'prefix' >> Map(lambda x, prefix: prefix + x, 'foo-')) -assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C'])) -pipeline.run() - - @attr('ValidatesRunner') - def test_par_do_with_side_input_as_arg(self): -pipeline = TestPipeline() -words_list = ['aa', 'bb', 'cc'] -words = pipeline | 'SomeWords' >> Create(words_list) -prefix = pipeline | 'SomeString' >> Create(['xyz']) # side in -suffix = 'zyx' -result = words | FlatMap( -'DecorateWords', -lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)], -AsSingleton(prefix), suffix) -assert_that(resu
[33/50] [abbrv] beam git commit: Closes #1857
Closes #1857 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52d97e2f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52d97e2f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52d97e2f Branch: refs/heads/master Commit: 52d97e2fc2e383a58969447addd45ebe3eed4f5f Parents: 3d6f20d 901a14c Author: Robert Bradshaw Authored: Fri Jan 27 12:00:25 2017 -0800 Committer: Robert Bradshaw Committed: Fri Jan 27 12:00:25 2017 -0800 -- .../clients/dataflow/dataflow_v1b3_client.py| 578 .../clients/dataflow/dataflow_v1b3_messages.py | 931 +-- 2 files changed, 1075 insertions(+), 434 deletions(-) --
[23/50] [abbrv] beam git commit: Closes #1844
Closes #1844 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4e1028b3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4e1028b3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4e1028b3 Branch: refs/heads/master Commit: 4e1028b3dfeaf02e51eb9f3b5d1a5e78c1cfcbb9 Parents: 5924220 5787e81 Author: Robert Bradshaw Authored: Wed Jan 25 16:16:52 2017 -0800 Committer: Robert Bradshaw Committed: Wed Jan 25 16:16:52 2017 -0800 -- .../python/apache_beam/utils/dependency_test.py | 47 +++- 1 file changed, 27 insertions(+), 20 deletions(-) --
[17/50] [abbrv] beam git commit: Removes Dataflow native text source and sink from Beam SDK.
Removes Dataflow native text source and sink from Beam SDK. Users should be using Beam text source and sink available in module 'textio.py' instead of this. Also removes Dataflow native file source/sink that is only used by native text source/sink. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52fc95dd Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52fc95dd Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52fc95dd Branch: refs/heads/master Commit: 52fc95ddebceaaf27897c4f6d5b97d08bd4b3a1e Parents: f983123 Author: Chamikara Jayalath Authored: Mon Jan 23 13:23:45 2017 -0800 Committer: Robert Bradshaw Committed: Tue Jan 24 16:31:02 2017 -0800 -- sdks/python/apache_beam/io/fileio.py| 542 +- sdks/python/apache_beam/io/fileio_test.py | 729 +-- .../runners/direct/transform_evaluator.py | 5 - 3 files changed, 3 insertions(+), 1273 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/52fc95dd/sdks/python/apache_beam/io/fileio.py -- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index ebc4fed..52f31c6 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -34,12 +34,10 @@ import weakref from apache_beam import coders from apache_beam.io import gcsio from apache_beam.io import iobase -from apache_beam.io import range_trackers -from apache_beam.runners.dataflow.native_io import iobase as dataflow_io from apache_beam.transforms.display import DisplayDataItem -__all__ = ['TextFileSource', 'TextFileSink'] +__all__ = ['TextFileSink'] DEFAULT_SHARD_NAME_TEMPLATE = '-S-of-N' @@ -111,326 +109,6 @@ class CompressionTypes(object): return cls.UNCOMPRESSED -class NativeFileSource(dataflow_io.NativeSource): - """A source implemented by Dataflow service from a GCS or local file or files. - - This class is to be only inherited by sources natively implemented by Cloud - Dataflow service, hence should not be sub-classed by users. - """ - - def __init__(self, - file_path, - start_offset=None, - end_offset=None, - coder=coders.BytesCoder(), - compression_type=CompressionTypes.AUTO, - mime_type='application/octet-stream'): -"""Initialize a NativeFileSource. - -Args: - file_path: The file path to read from as a local file path or a GCS -gs:// path. The path can contain glob characters (*, ?, and [...] -sets). - start_offset: The byte offset in the source file that the reader -should start reading. By default is 0 (beginning of file). - end_offset: The byte offset in the file that the reader should stop -reading. By default it is the end of the file. - compression_type: Used to handle compressed input files. Typical value - is CompressionTypes.AUTO, in which case the file_path's extension will - be used to detect the compression. - coder: Coder used to decode each record. - -Raises: - TypeError: if file_path is not a string. - -If the file_path contains glob characters then the start_offset and -end_offset must not be specified. - -The 'start_offset' and 'end_offset' pair provide a mechanism to divide the -file into multiple pieces for individual sources. Because the offset -is measured by bytes, some complication arises when the offset splits in -the middle of a record. To avoid the scenario where two adjacent sources -each get a fraction of a line we adopt the following rules: - -If start_offset falls inside a record (any character except the first one) -then the source will skip the record and start with the next one. - -If end_offset falls inside a record (any character except the first one) -then the source will contain that entire record. -""" -if not isinstance(file_path, basestring): - raise TypeError('%s: file_path must be a string; got %r instead' % - (self.__class__.__name__, file_path)) - -self.file_path = file_path -self.start_offset = start_offset -self.end_offset = end_offset -self.compression_type = compression_type -self.coder = coder -self.mime_type = mime_type - - def display_data(self): -return {'file_pattern': DisplayDataItem(self.file_path, -label="File Pattern"), -'compression': DisplayDataItem(str(self.compression_type), - label='Compression')} - - def __eq__(self, other): -return (self.file_path == other.file_path and -self.start_offset == o
[48/50] [abbrv] beam git commit: Remove sdks/python/LICENSE
Remove sdks/python/LICENSE Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e0ff9973 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e0ff9973 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e0ff9973 Branch: refs/heads/master Commit: e0ff9973940c5585376ac2beb6edd1e20de962ac Parents: be0e32e Author: Davor Bonaci Authored: Mon Jan 30 14:41:32 2017 -0800 Committer: Davor Bonaci Committed: Mon Jan 30 14:41:32 2017 -0800 -- sdks/python/LICENSE | 202 --- 1 file changed, 202 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/e0ff9973/sdks/python/LICENSE -- diff --git a/sdks/python/LICENSE b/sdks/python/LICENSE deleted file mode 100644 index d645695..000 --- a/sdks/python/LICENSE +++ /dev/null @@ -1,202 +0,0 @@ - - Apache License - Version 2.0, January 2004 -http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the -
[22/50] [abbrv] beam git commit: Cleanup tests in pipeline_test.
Cleanup tests in pipeline_test. Notably, the runner_name parameter has been obsolete since the removal of DiskCachedRunnerPipelineTest and is an inferior version of what TestPipeline provides. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/61d8d3f0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/61d8d3f0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/61d8d3f0 Branch: refs/heads/master Commit: 61d8d3f0690142f6dc87b1484d3ebd148a706837 Parents: 9540cf1 Author: Robert Bradshaw Authored: Sat Jan 21 21:07:39 2017 -0800 Committer: Robert Bradshaw Committed: Wed Jan 25 12:38:03 2017 -0800 -- sdks/python/apache_beam/pipeline_test.py | 57 ++- 1 file changed, 21 insertions(+), 36 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/61d8d3f0/sdks/python/apache_beam/pipeline_test.py -- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 93b68d1..833293f 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -38,8 +38,8 @@ from apache_beam.transforms import Read from apache_beam.transforms import WindowInto from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to -from apache_beam.transforms.window import IntervalWindow -from apache_beam.transforms.window import WindowFn +from apache_beam.transforms.window import SlidingWindows +from apache_beam.transforms.window import TimestampedValue from apache_beam.utils.timestamp import MIN_TIMESTAMP @@ -70,9 +70,6 @@ class FakeSource(NativeSource): class PipelineTest(unittest.TestCase): - def setUp(self): -self.runner_name = 'DirectRunner' - @staticmethod def custom_callable(pcoll): return pcoll | '+1' >> FlatMap(lambda x: [x + 1]) @@ -103,7 +100,7 @@ class PipelineTest(unittest.TestCase): self.leave_composite.append(transform_node) def test_create(self): -pipeline = TestPipeline(runner=self.runner_name) +pipeline = TestPipeline() pcoll = pipeline | 'label1' >> Create([1, 2, 3]) assert_that(pcoll, equal_to([1, 2, 3])) @@ -114,19 +111,19 @@ class PipelineTest(unittest.TestCase): pipeline.run() def test_create_singleton_pcollection(self): -pipeline = TestPipeline(runner=self.runner_name) +pipeline = TestPipeline() pcoll = pipeline | 'label' >> Create([[1, 2, 3]]) assert_that(pcoll, equal_to([[1, 2, 3]])) pipeline.run() def test_read(self): -pipeline = TestPipeline(runner=self.runner_name) +pipeline = TestPipeline() pcoll = pipeline | 'read' >> Read(FakeSource([1, 2, 3])) assert_that(pcoll, equal_to([1, 2, 3])) pipeline.run() def test_visit_entire_graph(self): -pipeline = Pipeline(self.runner_name) +pipeline = Pipeline() pcoll1 = pipeline | 'pcoll' >> Create([1, 2, 3]) pcoll2 = pcoll1 | 'do1' >> FlatMap(lambda x: [x + 1]) pcoll3 = pcoll2 | 'do2' >> FlatMap(lambda x: [x + 1]) @@ -145,14 +142,14 @@ class PipelineTest(unittest.TestCase): self.assertEqual(visitor.leave_composite[0].transform, transform) def test_apply_custom_transform(self): -pipeline = TestPipeline(runner=self.runner_name) +pipeline = TestPipeline() pcoll = pipeline | 'pcoll' >> Create([1, 2, 3]) result = pcoll | PipelineTest.CustomTransform() assert_that(result, equal_to([2, 3, 4])) pipeline.run() def test_reuse_custom_transform_instance(self): -pipeline = Pipeline(self.runner_name) +pipeline = Pipeline() pcoll1 = pipeline | 'pcoll1' >> Create([1, 2, 3]) pcoll2 = pipeline | 'pcoll2' >> Create([4, 5, 6]) transform = PipelineTest.CustomTransform() @@ -167,7 +164,7 @@ class PipelineTest(unittest.TestCase): 'pvalue | "label" >> transform') def test_reuse_cloned_custom_transform_instance(self): -pipeline = TestPipeline(runner=self.runner_name) +pipeline = TestPipeline() pcoll1 = pipeline | 'pc1' >> Create([1, 2, 3]) pcoll2 = pipeline | 'pc2' >> Create([4, 5, 6]) transform = PipelineTest.CustomTransform() @@ -240,7 +237,7 @@ class PipelineTest(unittest.TestCase): def raise_exception(exn): raise exn with self.assertRaises(ValueError): - with Pipeline(self.runner_name) as p: + with Pipeline() as p: # pylint: disable=expression-not-assigned p | Create([ValueError]) | Map(raise_exception) @@ -251,15 +248,12 @@ class PipelineTest(unittest.TestCase): class NewDoFnTest(unittest.TestCase): - def setUp(self): -self.runner_name = 'DirectRunner' - def test_element(self): class TestDoFn(NewDoFn): def process(self, element): yield elemen
[45/50] [abbrv] beam git commit: This closes #1863
This closes #1863 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1390699c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1390699c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1390699c Branch: refs/heads/master Commit: 1390699c37596ebe34a773627660b6c496375a8e Parents: 475707f e02ddac Author: Davor Bonaci Authored: Mon Jan 30 12:45:03 2017 -0800 Committer: Davor Bonaci Committed: Mon Jan 30 12:45:03 2017 -0800 -- sdks/python/apache_beam/io/bigquery_test.py | 26 1 file changed, 18 insertions(+), 8 deletions(-) --
[27/50] [abbrv] beam git commit: Fix read/write display data
Fix read/write display data Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e4eda3c3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e4eda3c3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e4eda3c3 Branch: refs/heads/master Commit: e4eda3c335b5767bdaf40b56b2dd5d67d7348f20 Parents: c6420df Author: Pablo Authored: Fri Jan 13 11:25:36 2017 -0800 Committer: Robert Bradshaw Committed: Thu Jan 26 14:51:56 2017 -0800 -- sdks/python/apache_beam/io/avroio_test.py | 6 sdks/python/apache_beam/io/fileio.py | 10 ++- sdks/python/apache_beam/io/fileio_test.py | 2 -- sdks/python/apache_beam/io/iobase.py | 38 +- sdks/python/apache_beam/io/textio.py | 25 + sdks/python/apache_beam/io/textio_test.py | 30 6 files changed, 47 insertions(+), 64 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/avroio_test.py -- diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index aed468d..d2fb1d1 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -196,9 +196,6 @@ class TestAvro(unittest.TestCase): 'file_pattern', 'some_avro_sink-%(shard_num)05d-of-%(num_shards)05d.end'), DisplayDataItemMatcher( -'shards', -0), -DisplayDataItemMatcher( 'codec', 'null'), DisplayDataItemMatcher( @@ -219,9 +216,6 @@ class TestAvro(unittest.TestCase): 'file_pattern', 'some_avro_sink-%(shard_num)05d-of-%(num_shards)05d'), DisplayDataItemMatcher( -'shards', -0), -DisplayDataItemMatcher( 'codec', 'deflate'), DisplayDataItemMatcher( http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/fileio.py -- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 52f31c6..f67dca9 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -547,7 +547,8 @@ class FileSink(iobase.Sink): def display_data(self): return {'shards': -DisplayDataItem(self.num_shards, label='Number of Shards'), +DisplayDataItem(self.num_shards, +label='Number of Shards').drop_if_default(0), 'compression': DisplayDataItem(str(self.compression_type)), 'file_pattern': @@ -787,6 +788,13 @@ class TextFileSink(FileSink): '\'textio.WriteToText()\' instead of directly ' 'instantiating a TextFileSink object.') + def display_data(self): +dd_parent = super(TextFileSink, self).display_data() +dd_parent['append_newline'] = DisplayDataItem( +self.append_trailing_newlines, +label='Append Trailing New Lines') +return dd_parent + def write_encoded_record(self, file_handle, encoded_value): """Writes a single encoded record.""" file_handle.write(encoded_value) http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/fileio_test.py -- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index ad77dc5..6c33f53 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -142,8 +142,6 @@ class TestFileSink(unittest.TestCase): dd = DisplayData.create_from(sink) expected_items = [ DisplayDataItemMatcher( -'shards', 0), -DisplayDataItemMatcher( 'compression', 'auto'), DisplayDataItemMatcher( 'file_pattern', http://git-wip-us.apache.org/repos/asf/beam/blob/e4eda3c3/sdks/python/apache_beam/io/iobase.py -- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 12af3b6..1266ed3 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -759,16 +759,15 @@ class WriteImpl(ptransform.PTransform): write_result_coll = (keyed_pcoll | core.WindowInto(window.GlobalWindows()) | core.GroupByKey() - | 'WriteBundles' >> core.Map( - _write_keyed_bundle, self.sink, + | 'WriteBundles' >> core.ParDo( +
[37/50] [abbrv] beam git commit: Update pom.xml for sdks/python.
Update pom.xml for sdks/python. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f1b8679c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f1b8679c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f1b8679c Branch: refs/heads/master Commit: f1b8679c4af283d1e751043e2e765b7f295af0b2 Parents: c2859a5 Author: Ahmet Altay Authored: Fri Jan 27 17:04:21 2017 -0800 Committer: Ahmet Altay Committed: Fri Jan 27 17:04:21 2017 -0800 -- sdks/python/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/f1b8679c/sdks/python/pom.xml -- diff --git a/sdks/python/pom.xml b/sdks/python/pom.xml index cc90969..615ddc5 100644 --- a/sdks/python/pom.xml +++ b/sdks/python/pom.xml @@ -22,7 +22,7 @@ org.apache.beam beam-sdks-parent -0.5.0-incubating-SNAPSHOT +0.6.0-SNAPSHOT ../pom.xml
[43/50] [abbrv] beam git commit: This closes #1866
This closes #1866 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/475707f0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/475707f0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/475707f0 Branch: refs/heads/master Commit: 475707f0ffd7bc82ca78fa3f3c9e78f661478b99 Parents: f29527f 51afc1c Author: Davor Bonaci Authored: Mon Jan 30 12:43:48 2017 -0800 Committer: Davor Bonaci Committed: Mon Jan 30 12:43:48 2017 -0800 -- sdks/python/apache_beam/internal/util.py | 33 ++ sdks/python/apache_beam/io/filebasedsource.py | 17 +++ sdks/python/apache_beam/io/fileio.py | 11 ++-- 3 files changed, 40 insertions(+), 21 deletions(-) --
[08/50] [abbrv] beam git commit: Closes #1818
Closes #1818 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/01b36280 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/01b36280 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/01b36280 Branch: refs/heads/master Commit: 01b362807724b03969775c3a17af0854bb4b29a6 Parents: 894461e 1811458 Author: Robert Bradshaw Authored: Mon Jan 23 14:36:00 2017 -0800 Committer: Robert Bradshaw Committed: Mon Jan 23 14:36:00 2017 -0800 -- sdks/python/apache_beam/runners/dataflow_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[38/50] [abbrv] beam git commit: Closes #1861
Closes #1861 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/27cf68ee Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/27cf68ee Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/27cf68ee Branch: refs/heads/master Commit: 27cf68ee72bd58475c170712f7afe20102601606 Parents: 1bc6859 f1b8679 Author: Dan Halperin Authored: Sun Jan 29 08:21:18 2017 -0800 Committer: Dan Halperin Committed: Sun Jan 29 08:21:18 2017 -0800 -- .jenkins/common_job_properties.groovy |9 +- ...job_beam_PostCommit_Java_MavenInstall.groovy |2 +- .../job_beam_PreCommit_Java_MavenInstall.groovy |2 +- .../job_beam_Release_NightlySnapshot.groovy |2 +- .jenkins/job_seed.groovy|2 +- .travis/README.md |2 +- DISCLAIMER | 10 - NOTICE |4 +- README.md | 46 +- examples/java/README.md | 16 +- examples/java/pom.xml | 21 +- .../beam/examples/DebuggingWordCount.java |4 +- .../org/apache/beam/examples/WordCount.java |6 +- .../beam/examples/complete/AutoComplete.java|2 +- .../org/apache/beam/examples/complete/README.md | 14 +- .../apache/beam/examples/complete/TfIdf.java|2 +- .../examples/complete/TopWikipediaSessions.java |2 +- .../examples/complete/TrafficMaxLaneFlow.java |2 +- .../beam/examples/complete/TrafficRoutes.java |2 +- .../examples/cookbook/BigQueryTornadoes.java|2 +- .../cookbook/CombinePerKeyExamples.java |2 +- .../org/apache/beam/examples/cookbook/README.md | 14 +- .../beam/examples/cookbook/TriggerExample.java |4 +- .../beam/examples/WindowedWordCountIT.java | 16 +- examples/java8/pom.xml |2 +- .../beam/examples/complete/game/GameStats.java |7 +- .../examples/complete/game/LeaderBoard.java |5 +- .../beam/examples/complete/game/UserScore.java |2 +- examples/pom.xml| 16 +- pom.xml | 41 +- runners/apex/README.md |4 +- runners/apex/pom.xml|3 +- .../beam/runners/apex/ApexPipelineOptions.java |7 +- .../apache/beam/runners/apex/ApexRunner.java| 43 +- .../beam/runners/apex/ApexYarnLauncher.java | 23 +- .../translation/CreateValuesTranslator.java | 18 +- .../FlattenPCollectionTranslator.java | 28 +- .../apex/translation/GroupByKeyTranslator.java |2 +- .../translation/ParDoBoundMultiTranslator.java | 27 +- .../apex/translation/ParDoBoundTranslator.java |4 +- .../apex/translation/TranslationContext.java| 27 +- .../apex/translation/WindowBoundTranslator.java |8 +- .../operators/ApexGroupByKeyOperator.java |4 +- .../operators/ApexParDoOperator.java|6 +- .../ApexReadUnboundedInputOperator.java | 17 +- .../beam/runners/apex/ApexRunnerTest.java | 75 ++ .../beam/runners/apex/ApexYarnLauncherTest.java |9 +- .../runners/apex/examples/WordCountTest.java|2 +- .../translation/ParDoBoundTranslatorTest.java |6 +- .../translation/ReadUnboundTranslatorTest.java |8 +- .../utils/ApexStateInternalsTest.java |2 +- .../test/resources/beam-runners-apex.properties | 20 + runners/core-java/pom.xml |2 +- .../beam/runners/core/AssignWindowsDoFn.java|3 +- .../apache/beam/runners/core/DoFnAdapters.java | 343 ++ .../apache/beam/runners/core/DoFnRunner.java| 21 - .../apache/beam/runners/core/DoFnRunners.java | 138 +-- .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 10 +- .../runners/core/GroupAlsoByWindowsDoFn.java|5 +- .../beam/runners/core/KeyedWorkItemCoder.java |4 +- .../core/LateDataDroppingDoFnRunner.java|1 - .../apache/beam/runners/core/NonEmptyPanes.java |2 +- .../org/apache/beam/runners/core/OldDoFn.java | 472 .../runners/core/PerKeyCombineFnRunner.java | 70 -- .../runners/core/PerKeyCombineFnRunners.java| 101 -- .../beam/runners/core/SimpleDoFnRunner.java | 63 - .../beam/runners/core/SimpleOldDoFnRunner.java |7 +- .../beam/runners/core/SplittableParDo.java |7 - .../core/UnboundedReadFromBoundedSource.java| 14 +- .../AfterDelayFromFirstElementStateMachine.java |2 +- .../core/triggers/AfterPaneStateMachine.java|2 +- .../core/DoFnDelegatingAggregatorTest.java | 144 +++ .../core/GroupAlsoByWindowsProperties.java |2 +- .../runners/core/KeyedWorkItemCoderTest.java|6 + .../core/LateDataDroppingDoFnRunnerTest.java|2
[35/50] [abbrv] beam git commit: This closes #1807
This closes #1807 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1bc68598 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1bc68598 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1bc68598 Branch: refs/heads/master Commit: 1bc685980092e0922504858fa6c08adc8c44acaa Parents: 52d97e2 e5d8810 Author: Dan Halperin Authored: Fri Jan 27 14:30:28 2017 -0800 Committer: Dan Halperin Committed: Fri Jan 27 14:30:28 2017 -0800 -- sdks/python/apache_beam/coders/coders.py | 1 - sdks/python/run_pylint.sh| 44 --- 2 files changed, 12 insertions(+), 33 deletions(-) --
[20/50] [abbrv] beam git commit: Closes #1836
Closes #1836 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9540cf17 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9540cf17 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9540cf17 Branch: refs/heads/master Commit: 9540cf1762d8595126a1f96301c35524b0a804c2 Parents: 43cb4d7 19789db Author: Robert Bradshaw Authored: Tue Jan 24 16:32:37 2017 -0800 Committer: Robert Bradshaw Committed: Tue Jan 24 16:32:37 2017 -0800 -- sdks/python/run_postcommit.sh | 5 + 1 file changed, 5 insertions(+) --
[41/50] [abbrv] beam git commit: This closes #1870
This closes #1870 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f29527f6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f29527f6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f29527f6 Branch: refs/heads/master Commit: f29527f68b8de92caf18b183e3a7e97eb190f67e Parents: 27cf68e 38575a1 Author: Davor Bonaci Authored: Mon Jan 30 12:38:53 2017 -0800 Committer: Davor Bonaci Committed: Mon Jan 30 12:38:53 2017 -0800 -- .../GroupAlsoByWindowViaWindowSetNewDoFn.java | 156 +++ .../wrappers/streaming/DoFnOperator.java| 69 .../wrappers/streaming/WindowDoFnOperator.java | 143 + sdks/python/apache_beam/version.py | 3 +- 4 files changed, 265 insertions(+), 106 deletions(-) --