[GitHub] beam pull request #3819: JStorm-runner: Performance improvement

2017-09-07 Thread bastiliu
GitHub user bastiliu opened a pull request:

https://github.com/apache/beam/pull/3819

JStorm-runner: Performance improvement

1. remove some logs on critical path
2. register "TimestampedValue" in Kryo to reduce the serialized size of 
event value

Follow this checklist to help us incorporate your contribution quickly and 
easily:

 - [ ] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
 - [ ] Each commit in the pull request should have a meaningful subject 
line and body.
 - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
 - [ ] Write a pull request description that is detailed enough to 
understand what the pull request does, how, and why.
 - [ ] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
 - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bastiliu/beam jstorm-runner

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3819.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3819


commit 43492000a49e81b6d9a2420148fb2df1735301b0
Author: basti.lj 
Date:   2017-09-08T04:19:49Z

JStorm-runner: Performance improvement
1. remove some logs on critical path
2. register "TimestampedValue" in Kryo to reduce the serialized size of 
event value




---


[GitHub] beam pull request #3818: [BEAM-2866] Key StateTable off ID

2017-09-07 Thread kennknowles
GitHub user kennknowles opened a pull request:

https://github.com/apache/beam/pull/3818

[BEAM-2866] Key StateTable off ID

Follow this checklist to help us incorporate your contribution quickly and 
easily:

 - [ ] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
 - [ ] Each commit in the pull request should have a meaningful subject 
line and body.
 - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
 - [ ] Write a pull request description that is detailed enough to 
understand what the pull request does, how, and why.
 - [ ] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
 - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kennknowles/beam StateTag-key

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3818.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3818


commit 2d6799ad9b9fcdd924269656ea845d23f1169f65
Author: Kenneth Knowles 
Date:   2017-09-08T02:36:17Z

Add StateInternalsTest for bad coder equality

commit badf6205fc3334329bcc7cb640f1548850c339b5
Author: Kenneth Knowles 
Date:   2017-09-08T02:48:18Z

Key StateTable off id, not full StateTag




---


[jira] [Commented] (BEAM-2866) StateTable should key off state id, not StateTag, since coders may have bad equality

2017-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16158053#comment-16158053
 ] 

ASF GitHub Bot commented on BEAM-2866:
--

GitHub user kennknowles opened a pull request:

https://github.com/apache/beam/pull/3818

[BEAM-2866] Key StateTable off ID

Follow this checklist to help us incorporate your contribution quickly and 
easily:

 - [ ] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
 - [ ] Each commit in the pull request should have a meaningful subject 
line and body.
 - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
 - [ ] Write a pull request description that is detailed enough to 
understand what the pull request does, how, and why.
 - [ ] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
 - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kennknowles/beam StateTag-key

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3818.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3818


commit 2d6799ad9b9fcdd924269656ea845d23f1169f65
Author: Kenneth Knowles 
Date:   2017-09-08T02:36:17Z

Add StateInternalsTest for bad coder equality

commit badf6205fc3334329bcc7cb640f1548850c339b5
Author: Kenneth Knowles 
Date:   2017-09-08T02:48:18Z

Key StateTable off id, not full StateTag




> StateTable should key off state id, not StateTag, since coders may have bad 
> equality
> 
>
> Key: BEAM-2866
> URL: https://issues.apache.org/jira/browse/BEAM-2866
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
> Fix For: 2.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3817: [BEAM-2865] Introduces FileIO.write() and uses it i...

2017-09-07 Thread jkff
GitHub user jkff opened a pull request:

https://github.com/apache/beam/pull/3817

[BEAM-2865] Introduces FileIO.write() and uses it in AvroIO.

https://issues.apache.org/jira/browse/BEAM-2865

http://s.apache.org/fileio-write

This needs more docs / checkstyle / tests, but it already passes all tests 
of AvroIO, both via AvroIO.write() and via new methods AvroIO.sink().

R: @robertwb 
CC: @reuvenlax 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jkff/incubator-beam fileio-write

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3817.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3817


commit 90ddc0d1171b3850d55e9ddbbee45abe48ba4086
Author: Eugene Kirpichov 
Date:   2017-09-06T05:11:34Z

Introduces PCollectionView.get()

commit c8f5aa336ae2b2ee1191a611bef2b0cf451c95f3
Author: Eugene Kirpichov 
Date:   2017-09-07T06:24:18Z

[BEAM-2865] Introduces FileIO.write() and uses it in AvroIO.

http://s.apache.org/fileio-write




---


[jira] [Commented] (BEAM-2865) Implement FileIO.write()

2017-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16157997#comment-16157997
 ] 

ASF GitHub Bot commented on BEAM-2865:
--

GitHub user jkff opened a pull request:

https://github.com/apache/beam/pull/3817

[BEAM-2865] Introduces FileIO.write() and uses it in AvroIO.

https://issues.apache.org/jira/browse/BEAM-2865

http://s.apache.org/fileio-write

This needs more docs / checkstyle / tests, but it already passes all tests 
of AvroIO, both via AvroIO.write() and via new methods AvroIO.sink().

R: @robertwb 
CC: @reuvenlax 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jkff/incubator-beam fileio-write

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3817.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3817


commit 90ddc0d1171b3850d55e9ddbbee45abe48ba4086
Author: Eugene Kirpichov 
Date:   2017-09-06T05:11:34Z

Introduces PCollectionView.get()

commit c8f5aa336ae2b2ee1191a611bef2b0cf451c95f3
Author: Eugene Kirpichov 
Date:   2017-09-07T06:24:18Z

[BEAM-2865] Introduces FileIO.write() and uses it in AvroIO.

http://s.apache.org/fileio-write




> Implement FileIO.write()
> 
>
> Key: BEAM-2865
> URL: https://issues.apache.org/jira/browse/BEAM-2865
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-core
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
> Fix For: 2.2.0
>
>
> Design doc: http://s.apache.org/fileio-write
> Discussion: 
> https://lists.apache.org/thread.html/cc543556cc709a44ed92262207215eaa0e43a0f573c630b6360d4edc@%3Cdev.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2865) Implement FileIO.write()

2017-09-07 Thread Eugene Kirpichov (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eugene Kirpichov updated BEAM-2865:
---
Issue Type: New Feature  (was: Bug)

> Implement FileIO.write()
> 
>
> Key: BEAM-2865
> URL: https://issues.apache.org/jira/browse/BEAM-2865
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-core
>Reporter: Eugene Kirpichov
>Assignee: Eugene Kirpichov
> Fix For: 2.2.0
>
>
> Design doc: http://s.apache.org/fileio-write
> Discussion: 
> https://lists.apache.org/thread.html/cc543556cc709a44ed92262207215eaa0e43a0f573c630b6360d4edc@%3Cdev.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-2865) Implement FileIO.write()

2017-09-07 Thread Eugene Kirpichov (JIRA)
Eugene Kirpichov created BEAM-2865:
--

 Summary: Implement FileIO.write()
 Key: BEAM-2865
 URL: https://issues.apache.org/jira/browse/BEAM-2865
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-core
Reporter: Eugene Kirpichov
Assignee: Eugene Kirpichov
 Fix For: 2.2.0


Design doc: http://s.apache.org/fileio-write

Discussion: 
https://lists.apache.org/thread.html/cc543556cc709a44ed92262207215eaa0e43a0f573c630b6360d4edc@%3Cdev.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Jenkins build is still unstable: beam_PostCommit_Java_MavenInstall #4741

2017-09-07 Thread Apache Jenkins Server
See 




[beam-site] branch asf-site updated (b74cc36 -> 8e42ee6)

2017-09-07 Thread altay
This is an automated email from the ASF dual-hosted git repository.

altay pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git.


from b74cc36  Prepare repository for deployment.
 new 279fe3c  Document stale PR policy
 new dd17e33  Regenerate website
 new 8e42ee6  This closes #309

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 content/contribute/contribution-guide/index.html | 5 +
 src/contribute/contribution-guide.md | 4 
 2 files changed, 9 insertions(+)

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" '].


[beam-site] 03/03: This closes #309

2017-09-07 Thread altay
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 8e42ee6403269acf3c67f46f4a8cde9712200307
Merge: b74cc36 dd17e33
Author: Ahmet Altay 
AuthorDate: Thu Sep 7 17:40:43 2017 -0700

This closes #309

 content/contribute/contribution-guide/index.html | 5 +
 src/contribute/contribution-guide.md | 4 
 2 files changed, 9 insertions(+)

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[beam-site] 02/03: Regenerate website

2017-09-07 Thread altay
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit dd17e3309f726b7e7ba8d43e82072407af1dc143
Author: Ahmet Altay 
AuthorDate: Thu Sep 7 17:40:42 2017 -0700

Regenerate website
---
 content/contribute/contribution-guide/index.html | 5 +
 1 file changed, 5 insertions(+)

diff --git a/content/contribute/contribution-guide/index.html 
b/content/contribute/contribution-guide/index.html
index db9f7b7..b90d061 100644
--- a/content/contribute/contribution-guide/index.html
+++ b/content/contribute/contribution-guide/index.html
@@ -189,6 +189,7 @@
   Code Review and Revision
   LGTM
   Deleting your branch
+  Stale pull requests
 
   
   Commit (committers only)
@@ -573,6 +574,10 @@ $ git push GitHub_user --delete my-branch
 
 
 
+Stale pull requests
+
+The community will close stale pull requests in order to keep the project 
healthy. A pull request becomes stale after its author fails to respond to 
actionable comments for 60 days.  Author of a closed pull request is welcome to 
reopen the same pull request again in the future. The associated JIRAs will be 
unassigned from the author but will stay open.
+
 Commit (committers only)
 Once the code has been peer reviewed by a committer, the next step is for 
the committer to merge it into the https://git-wip-us.apache.org/repos/asf/beam.git;>authoritative Apache 
repository, not the read-only GitHub mirror. (In the case that the author 
is also a committer, it is acceptable for either the author of the change or 
committer who reviewed the change to do the merge. Just be explicit about whose 
job it is!)
 

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[beam-site] 01/03: Document stale PR policy

2017-09-07 Thread altay
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 279fe3c297dbaf5a4887a281a995f1e38ab42981
Author: Ahmet Altay 
AuthorDate: Wed Aug 30 15:19:08 2017 -0700

Document stale PR policy
---
 src/contribute/contribution-guide.md | 4 
 1 file changed, 4 insertions(+)

diff --git a/src/contribute/contribution-guide.md 
b/src/contribute/contribution-guide.md
index b37ece6..4989596 100644
--- a/src/contribute/contribution-guide.md
+++ b/src/contribute/contribution-guide.md
@@ -301,6 +301,10 @@ From another local branch, run:
$ git branch -d 
$ git push  --delete 
 
+### Stale pull requests
+
+The community will close stale pull requests in order to keep the project 
healthy. A pull request becomes stale after its author fails to respond to 
actionable comments for 60 days.  Author of a closed pull request is welcome to 
reopen the same pull request again in the future. The associated JIRAs will be 
unassigned from the author but will stay open.
+
 ## Commit (committers only)
 Once the code has been peer reviewed by a committer, the next step is for the 
committer to merge it into the [authoritative Apache 
repository](https://git-wip-us.apache.org/repos/asf/beam.git), not the 
read-only GitHub mirror. (In the case that the author is also a committer, it 
is acceptable for either the author of the change or committer who reviewed the 
change to do the merge. Just be explicit about whose job it is!)
 

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[jira] [Commented] (BEAM-2858) temp file garbage collection in BigQuery sink should be in a separate DoFn

2017-09-07 Thread Chamikara Jayalath (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16157905#comment-16157905
 ] 

Chamikara Jayalath commented on BEAM-2858:
--

I tried by manually deleting the first file of gcsURIs passed in following 
location.

https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L157

Attached the diff.


> temp file garbage collection in BigQuery sink should be in a separate DoFn
> --
>
> Key: BEAM-2858
> URL: https://issues.apache.org/jira/browse/BEAM-2858
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Affects Versions: 2.1.0
>Reporter: Reuven Lax
>Assignee: Chamikara Jayalath
> Fix For: 2.2.0
>
> Attachments: delete_file_diff.txt
>
>
> Currently the WriteTables transform deletes the set of input files as soon as 
> the load() job completes. However this is incorrect - if the task fails 
> partially through deleting files (e.g. if the worker crashes), the task will 
> be retried. If the write disposition is WRITE_TRUNCATE, bad things could 
> result.
> The resulting behavior will depend on what BQ does if one of input files is 
> missing (because we had previously deleted it). In the best case, BQ will 
> fail the load. In this case the step will keep failing until the runner 
> finally fails the entire job. If however BQ ignores the missing file, the 
> load will overwrite the previously-written table with the smaller set of 
> files and the job will succeed. This is the worst-case scenario, as it will 
> result in data loss.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2858) temp file garbage collection in BigQuery sink should be in a separate DoFn

2017-09-07 Thread Chamikara Jayalath (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chamikara Jayalath updated BEAM-2858:
-
Attachment: delete_file_diff.txt

> temp file garbage collection in BigQuery sink should be in a separate DoFn
> --
>
> Key: BEAM-2858
> URL: https://issues.apache.org/jira/browse/BEAM-2858
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Affects Versions: 2.1.0
>Reporter: Reuven Lax
>Assignee: Chamikara Jayalath
> Fix For: 2.2.0
>
> Attachments: delete_file_diff.txt
>
>
> Currently the WriteTables transform deletes the set of input files as soon as 
> the load() job completes. However this is incorrect - if the task fails 
> partially through deleting files (e.g. if the worker crashes), the task will 
> be retried. If the write disposition is WRITE_TRUNCATE, bad things could 
> result.
> The resulting behavior will depend on what BQ does if one of input files is 
> missing (because we had previously deleted it). In the best case, BQ will 
> fail the load. In this case the step will keep failing until the runner 
> finally fails the entire job. If however BQ ignores the missing file, the 
> load will overwrite the previously-written table with the smaller set of 
> files and the job will succeed. This is the worst-case scenario, as it will 
> result in data loss.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2858) temp file garbage collection in BigQuery sink should be in a separate DoFn

2017-09-07 Thread Reuven Lax (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16157856#comment-16157856
 ] 

Reuven Lax commented on BEAM-2858:
--

I asked the BigQuery team, and they said the load job should fail. How did you 
delete one of the files? These are the temp files generated from within the 
Beam job.

> temp file garbage collection in BigQuery sink should be in a separate DoFn
> --
>
> Key: BEAM-2858
> URL: https://issues.apache.org/jira/browse/BEAM-2858
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Affects Versions: 2.1.0
>Reporter: Reuven Lax
>Assignee: Chamikara Jayalath
> Fix For: 2.2.0
>
>
> Currently the WriteTables transform deletes the set of input files as soon as 
> the load() job completes. However this is incorrect - if the task fails 
> partially through deleting files (e.g. if the worker crashes), the task will 
> be retried. If the write disposition is WRITE_TRUNCATE, bad things could 
> result.
> The resulting behavior will depend on what BQ does if one of input files is 
> missing (because we had previously deleted it). In the best case, BQ will 
> fail the load. In this case the step will keep failing until the runner 
> finally fails the entire job. If however BQ ignores the missing file, the 
> load will overwrite the previously-written table with the smaller set of 
> files and the job will succeed. This is the worst-case scenario, as it will 
> result in data loss.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Jenkins build became unstable: beam_PostCommit_Java_MavenInstall #4740

2017-09-07 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-2842) python test failure in local maven build

2017-09-07 Thread Xu Mingmin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16157745#comment-16157745
 ] 

Xu Mingmin commented on BEAM-2842:
--

duplicated with https://issues.apache.org/jira/browse/BEAM-2861

> python test failure in local maven build
> 
>
> Key: BEAM-2842
> URL: https://issues.apache.org/jira/browse/BEAM-2842
> Project: Beam
>  Issue Type: Test
>  Components: sdk-ideas, website
>Reporter: Xu Mingmin
>Assignee: Reuven Lax
>Priority: Minor
> Fix For: Not applicable
>
>
> See below error when running {{mvn clean install}} locally (Mac OS), with a 
> fresh environment. 
> I resolve it following the guideline mentioned in error log, and wanna what's 
> the proper way to improve it:
> 1). add a task in prepare step of 'Contribution Guide';
> 2). option for shared credential?
> {code}
> File 
> “/Users//Desktop/beam/sdks/python/target/.tox/py27gcp/lib/python2.7/site-packages/google/auth/_default.py”,
>  line 282, in default
> raise exceptions.DefaultCredentialsError(_HELP_MESSAGE)
> DefaultCredentialsError: Could not automatically determine credentials. 
> Please set GOOGLE_APPLICATION_CREDENTIALS or
> explicitly create credential and re-run the application. For more
> information, please see
> https://developers.google.com/accounts/docs/application-default-credentials.
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[beam-site] 01/02: [BEAM-1534] Add docker images documentation to the website

2017-09-07 Thread mergebot-role
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 8e16d3954c96c59476501cdfa1daf9285765cb2c
Author: Ismaël Mejía 
AuthorDate: Tue Aug 22 10:59:27 2017 +0200

[BEAM-1534] Add docker images documentation to the website
---
 src/_includes/header.html   |   1 +
 src/contribute/docker-images.md | 183 
 2 files changed, 184 insertions(+)

diff --git a/src/_includes/header.html b/src/_includes/header.html
index 79e72b6..2de94b2 100644
--- a/src/_includes/header.html
+++ b/src/_includes/header.html
@@ -84,6 +84,7 @@
 Design Principles
 Ongoing Projects
 Source Repository
+Docker 
Images
 
 Promotion
 Presentation Materials
diff --git a/src/contribute/docker-images.md b/src/contribute/docker-images.md
new file mode 100644
index 000..0cb19da
--- /dev/null
+++ b/src/contribute/docker-images.md
@@ -0,0 +1,183 @@
+---
+layout: default
+title: 'Beam Docker Images'
+permalink: /contribute/docker-images/
+---
+
+# Docker Images
+
+Docker images allow to create a reproducible environment to build and test
+Beam. You can use the docker images by using the provided [Docker 
scripts](https://github.com/apache/beam/tree/master/sdks/java/build-tools/src/main/resources/docker).
+
+In this directory you will find scripts to build and run docker images for
+different purposes:
+
+- [file](#file): Create a Docker container from a Beam source code .zip file
+  in a given environment. It is useful to test a specific version of Beam,
+  for example to validate a release vote.
+
+- [git](#git): Same as file but the Beam source code comes from the git 
repository,
+  you can choose a given branch/tag/pull-request. Useful to test in a specific
+  environment.
+
+- [release](#release): It builds an end-user distribution of the latest 
version of Beam
+  and its dependencies. Useful for end-users who want to have a ready to use
+  container with Beam (Python only for the moment).
+
+## File based image
+
+If you want to build a container with a ready JDK 8 environment to test Beam:
+
+```
+cd file/openjdk8
+docker build -t beam:openjdk8 .
+```
+
+If you want to build a container with JDK 7:
+
+```
+cd file/openjdk7
+docker build -t beam:openjdk7 .
+```
+
+When you run the image it downloads the specific version of Beam given the
+environment variables. By default it downloads the source code of the latest
+master. If you want to download the latest master and execute the tests:
+
+```
+docker run -it beam:openjdk8 mvn clean verify -Prelease
+```
+
+If you want to have an interactive session you can run a bash prompt:
+
+```
+docker run -it beam:openjdk8 /bin/bash
+```
+
+Inside the container you can test a specific module by running Maven. You
+have to change MODULE_PATH for the given module path. For example to test
+HBaseIO you should write its path `sdks/java/io/hbase` in the place of
+MODULE_PATH:
+
+```
+mvn --projects MODULE_PATH clean verify -Prelease
+```
+
+### Configuring the runtime via the environment variables
+
+You can run different versions of Beam by passing the specific environment
+variables:
+
+`URL`: The URL with the file containing the specific source code.  
+`SRC_FILE`: The downloaded file name.  
+`SRC_DIR`: The name of the directory inside of the zip file.
+
+For example to run a Docker container with the exact code of the release 2.0.0:
+
+```
+docker run \
+  -e 
URL="https://www.apache.org/dyn/closer.cgi?filename=beam/2.0.0/apache-beam-2.0.0-source-release.zip=download;
 \
+  -e SRC_FILE="apache-beam-2.0.0-source-release.zip" \
+  -e SRC_DIR="apache-beam-2.0.0" \
+  -it beam:openjdk8 /bin/bash
+```
+
+If you want to run a container with a specific version e.g. 2.1.0-RC1:
+
+```
+docker run \
+  -e URL="https://github.com/apache/beam/archive/v2.1.0-RC1.zip; \
+  -e SRC_FILE="v2.1.0-RC1.zip" \
+  -e SRC_DIR="beam-2.1.0-RC1" \
+  -it beam:openjdk8 /bin/bash
+```
+
+*Notice that SRC_FILE is different from SRC_DIR because github replaces the 'v'
+character in the internal name of the zip*.
+
+To run a container with the source code during a vote:
+
+```
+docker run \
+  -e 
URL="https://dist.apache.org/repos/dist/dev/beam/2.1.0/apache-beam-2.1.0-source-release.zip;
 \
+  -e SRC_FILE="apache-beam-2.1.0-source-release.zip" \
+  -e SRC_DIR="apache-beam-2.1.0" \
+  -it beam:openjdk8 /bin/bash
+```
+
+### Testing in an specific environment with your own source
+
+You can also overwrite the volume containing the source code of Beam from a
+directory in your host machine. This is useful to test your code with different
+versions of Java.
+
+If you have the code in the ~/workspace/beam directory and you want to quickly
+test it with Beam on Java 7 you can do:
+
+```
+docker run -v ~/workspace/beam:/home/user/beam -it beam:openjdk7 

[beam-site] 02/02: This closes #297

2017-09-07 Thread mergebot-role
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit bf1f9819af40b9d84ec052f4d28e9419a4ed8ee6
Merge: b74cc36 8e16d39
Author: Mergebot 
AuthorDate: Thu Sep 7 21:36:18 2017 +

This closes #297

 src/_includes/header.html   |   1 +
 src/contribute/docker-images.md | 183 
 2 files changed, 184 insertions(+)

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[beam-site] branch mergebot updated (c1e88cc -> bf1f981)

2017-09-07 Thread mergebot-role
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a change to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git.


 discard c1e88cc  This closes #309
 discard 63d9fe1  fixing reviewer comment
 discard 959d838  Document stale PR policy
 new 8e16d39  [BEAM-1534] Add docker images documentation to the website
 new bf1f981  This closes #297

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (c1e88cc)
\
 N -- N -- N   refs/heads/mergebot (bf1f981)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/_includes/header.html|   1 +
 src/contribute/contribution-guide.md |   4 -
 src/contribute/docker-images.md  | 183 +++
 3 files changed, 184 insertions(+), 4 deletions(-)
 create mode 100644 src/contribute/docker-images.md

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" '].


[beam-site] 02/03: fixing reviewer comment

2017-09-07 Thread mergebot-role
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 63d9fe1ec730bdf14c1ac6aa06260e655a980c84
Author: Ahmet Altay 
AuthorDate: Thu Sep 7 13:40:16 2017 -0700

fixing reviewer comment
---
 src/contribute/contribution-guide.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/contribute/contribution-guide.md 
b/src/contribute/contribution-guide.md
index 5de8299..4989596 100644
--- a/src/contribute/contribution-guide.md
+++ b/src/contribute/contribution-guide.md
@@ -303,7 +303,7 @@ From another local branch, run:
 
 ### Stale pull requests
 
-The community will close stale pull requests in order to keep the project 
healthy. A pull request becomes stale after its author fails to respond to 
actionable comments for 60 days.  Author of a closed pull request is welcome to 
reopen the same pull request again in the future. The associated JIRAs will be 
unassigned from the authors but will stay open.
+The community will close stale pull requests in order to keep the project 
healthy. A pull request becomes stale after its author fails to respond to 
actionable comments for 60 days.  Author of a closed pull request is welcome to 
reopen the same pull request again in the future. The associated JIRAs will be 
unassigned from the author but will stay open.
 
 ## Commit (committers only)
 Once the code has been peer reviewed by a committer, the next step is for the 
committer to merge it into the [authoritative Apache 
repository](https://git-wip-us.apache.org/repos/asf/beam.git), not the 
read-only GitHub mirror. (In the case that the author is also a committer, it 
is acceptable for either the author of the change or committer who reviewed the 
change to do the merge. Just be explicit about whose job it is!)

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[beam-site] 01/03: Document stale PR policy

2017-09-07 Thread mergebot-role
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 959d8380d156ef140b4c9db2ddc9b046a34fb8bd
Author: Ahmet Altay 
AuthorDate: Wed Aug 30 15:19:08 2017 -0700

Document stale PR policy
---
 src/contribute/contribution-guide.md | 4 
 1 file changed, 4 insertions(+)

diff --git a/src/contribute/contribution-guide.md 
b/src/contribute/contribution-guide.md
index b37ece6..5de8299 100644
--- a/src/contribute/contribution-guide.md
+++ b/src/contribute/contribution-guide.md
@@ -301,6 +301,10 @@ From another local branch, run:
$ git branch -d 
$ git push  --delete 
 
+### Stale pull requests
+
+The community will close stale pull requests in order to keep the project 
healthy. A pull request becomes stale after its author fails to respond to 
actionable comments for 60 days.  Author of a closed pull request is welcome to 
reopen the same pull request again in the future. The associated JIRAs will be 
unassigned from the authors but will stay open.
+
 ## Commit (committers only)
 Once the code has been peer reviewed by a committer, the next step is for the 
committer to merge it into the [authoritative Apache 
repository](https://git-wip-us.apache.org/repos/asf/beam.git), not the 
read-only GitHub mirror. (In the case that the author is also a committer, it 
is acceptable for either the author of the change or committer who reviewed the 
change to do the merge. Just be explicit about whose job it is!)
 

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[beam-site] 03/03: This closes #309

2017-09-07 Thread mergebot-role
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit c1e88cc897cb81172369615bea5c6e4bb460381c
Merge: b74cc36 63d9fe1
Author: Mergebot 
AuthorDate: Thu Sep 7 21:07:26 2017 +

This closes #309

 src/contribute/contribution-guide.md | 4 
 1 file changed, 4 insertions(+)

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[beam-site] branch mergebot updated (2e14567 -> c1e88cc)

2017-09-07 Thread mergebot-role
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a change to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git.


from 2e14567  This closes #314
 add b74cc36  Prepare repository for deployment.
 new 959d838  Document stale PR policy
 new 63d9fe1  fixing reviewer comment
 new c1e88cc  This closes #309

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 content/blog/2017/08/16/splittable-do-fn.html | 15 ---
 content/feed.xml  | 15 ---
 src/contribute/contribution-guide.md  |  4 
 3 files changed, 20 insertions(+), 14 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" '].


[jira] [Updated] (BEAM-2859) ProcessingTime based timers are not properly fired in case the watermark stays put

2017-09-07 Thread Stas Levin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin updated BEAM-2859:
-
Summary: ProcessingTime based timers are not properly fired in case the 
watermark stays put  (was: Processing time based timers are not properly fired 
in case the watermark stays put)

> ProcessingTime based timers are not properly fired in case the watermark 
> stays put
> --
>
> Key: BEAM-2859
> URL: https://issues.apache.org/jira/browse/BEAM-2859
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Stas Levin
>Assignee: Stas Levin
>
> {{AfterProcessingTime}} based timers are not fired when the input watermark 
> does not advance, preventing from buffered element to be emitted.
> The reason seems to be that {{SparkTimerInternals#getTimersReadyToProcess()}} 
> determines what triggers are ready to be processed based on the following 
> condition: 
> {code:java}
> timer.getTimestamp().isBefore(inputWatermark)
> {code}
> However, if the timer domain is {{TimeDomain.PROCESSING_TIME}} the position 
> of the input watermark should *NOT* have effect.
> In addition, {{SparkTimerInternals#getTimersReadyToProcess()}} deletes timers 
> once they are deemed eligible for processing (but will not necessarily fire). 
> This may not be the correct behavior for timers in general and for timers in 
> the {{TimeDomain.PROCESSING_TIME}} in particular, since they should remain 
> scheduled until the corresponding window expires and all state is cleared.
> For instance, consider a timer that is found eligible for processing and is 
> thus deleted, then it just so happens to be that its {{shouldFire()}} returns 
> {{false}} and it is not fired and needs to be re-run next time around, but 
> won't, since it's been deleted. The implied moral being that _"eligible for 
> processing"_ does not imply _"should be deleted"_.
> It may be better to avoid removing timers in 
> {{SparkTimerInternals#getTimersReadyToProcess()}} and leave timer management 
> up to {{ReduceFnRunner#clearAllState()}} which has more context to determine 
> whether it's time for a given timer to be deleted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[beam-site] branch asf-site updated (75dee9f -> b74cc36)

2017-09-07 Thread mergebot-role
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git.


from 75dee9f  This closes #288
 add b50a4a9  Fixes SDF blog post after rename of Match to FileIO.match
 add 2e14567  This closes #314
 new b74cc36  Prepare repository for deployment.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 content/blog/2017/08/16/splittable-do-fn.html | 15 ---
 content/feed.xml  | 15 ---
 src/_posts/2017-08-04-splittable-do-fn.md | 17 +
 3 files changed, 25 insertions(+), 22 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" '].


[beam-site] 01/01: Prepare repository for deployment.

2017-09-07 Thread mergebot-role
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit b74cc36629ec656165a9872ba6b5f35280ad916c
Author: Mergebot 
AuthorDate: Thu Sep 7 20:54:55 2017 +

Prepare repository for deployment.
---
 content/blog/2017/08/16/splittable-do-fn.html | 15 ---
 content/feed.xml  | 15 ---
 2 files changed, 16 insertions(+), 14 deletions(-)

diff --git a/content/blog/2017/08/16/splittable-do-fn.html 
b/content/blog/2017/08/16/splittable-do-fn.html
index 7a4a2ed..e044ee0 100644
--- a/content/blog/2017/08/16/splittable-do-fn.html
+++ b/content/blog/2017/08/16/splittable-do-fn.html
@@ -602,16 +602,16 @@ to illustrate the approach.
 This hypothetical DoFn reads records 
from a single Avro file. Notably missing
 is the code for expanding a filepattern: it no longer needs to be part of this
 DoFn! Instead, the SDK includes a
-https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java;>Match.filepatterns()
+https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java;>FileIO.matchAll()
 transform for expanding a filepattern into a PCollection of filenames, and
 different file format IOs can reuse the same transform, reading the files with
 different DoFns.
 
 This example demonstrates the benefits of increased modularity allowed by 
SDF:
-Match supports continuous ingestion of 
new files in streaming pipelines using
-.continuously(), and this functionality 
becomes automatically available to
-various file format IOs. For example, TextIO.read().watchForNewFiles() https://github.com/apache/beam/blob/f7e8f886c91ea9d0b51e00331eeb4484e2f6e000/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java#L480;>uses
-Match under the
+FileIO.matchAll() supports continuous 
ingestion of new files in streaming
+pipelines using .continuously(), and 
this functionality becomes automatically
+available to various file format IOs. For example,
+TextIO.read().watchForNewFiles() https://github.com/apache/beam/blob/3bd68ecfd7d576d78e02deb0476e549f11e1b5ef/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java#L486;>uses
 FileIO.matchAll() under the
 hood).
 
 Current status
@@ -631,9 +631,10 @@ development.
 HEAD and will be included in Beam 2.2.0. TextIO and AvroIO finally provide
 continuous ingestion of files (one of the most frequently requested features)
 via .watchForNewFiles() which is backed 
by the utility transforms
-Match.filepatterns().continuously() and 
the more general
+FileIO.matchAll().continuously() and 
the more general
 https://github.com/apache/beam/blob/f7e8f886c91ea9d0b51e00331eeb4484e2f6e000/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java;>Watch.growthOf().
-These utility transforms are also independently useful for “power user” use 
cases.
+These utility transforms are also independently useful for “power user” use
+cases.
 
 To enable more flexible use cases for IOs currently based on the Source 
API, we
 will change them to use SDF. This transition is http://s.apache.org/textio-sdf;>pioneered by
diff --git a/content/feed.xml b/content/feed.xml
index eb789b8..7cb78ab 100644
--- a/content/feed.xml
+++ b/content/feed.xml
@@ -975,16 +975,16 @@ to illustrate the approach./p
 pThis hypothetical code 
class=highlighter-rougeDoFn/code reads records from a 
single Avro file. Notably missing
 is the code for expanding a filepattern: it no longer needs to be part of this
 code class=highlighter-rougeDoFn/code! Instead, 
the SDK includes a
-a 
href=https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.javaMatch.filepatterns()/a
+a 
href=https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.javaFileIO.matchAll()/a
 transform for expanding a filepattern into a code 
class=highlighter-rougePCollection/code of filenames, 
and
 different file format IOs can reuse the same transform, reading the files with
 different code 
class=highlighter-rougeDoFn/codes./p
 
 pThis example demonstrates the benefits of increased modularity 
allowed by SDF:
-code class=highlighter-rougeMatch/code supports 
continuous ingestion of new files in streaming pipelines using
-code class=highlighter-rouge.continuously()/code, 
and this functionality becomes automatically available to
-various file format IOs. For example, code 
class=highlighter-rougeTextIO.read().watchForNewFiles()/code
 a 
href=https://github.com/apache/beam/blob/f7e8f886c91ea9d0b51e00331eeb4484e2f6e000/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java#L480uses
-code class=highlighter-rougeMatch/code under the
+code class=highlighter-rougeFileIO.matchAll()/code 
supports continuous ingestion of new files in streaming
+pipelines using code 

[jira] [Resolved] (BEAM-1532) Improve splitting (support sub-splits) for HBaseIO

2017-09-07 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/BEAM-1532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía resolved BEAM-1532.

   Resolution: Duplicate
Fix Version/s: Not applicable

> Improve splitting (support sub-splits) for HBaseIO
> --
>
> Key: BEAM-1532
> URL: https://issues.apache.org/jira/browse/BEAM-1532
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Ismaël Mejía
>Priority: Minor
> Fix For: Not applicable
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[beam-site] 02/02: This closes #314

2017-09-07 Thread mergebot-role
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 2e1456721b50ae940d76d90ba40fe6074886dc5f
Merge: 75dee9f b50a4a9
Author: Mergebot 
AuthorDate: Thu Sep 7 20:52:23 2017 +

This closes #314

 src/_posts/2017-08-04-splittable-do-fn.md | 17 +
 1 file changed, 9 insertions(+), 8 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[beam-site] branch mergebot updated (11ea306 -> 2e14567)

2017-09-07 Thread mergebot-role
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a change to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git.


 discard 11ea306  This closes #298
 discard f1d1070  Update Gearpump runner related pages
 new b50a4a9  Fixes SDF blog post after rename of Match to FileIO.match
 new 2e14567  This closes #314

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (11ea306)
\
 N -- N -- N   refs/heads/mergebot (2e14567)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/_includes/header.html |   1 -
 src/_posts/2017-08-04-splittable-do-fn.md |  17 +
 src/contribute/work-in-progress.md|   1 +
 src/documentation/index.md|   2 +-
 src/documentation/runners/gearpump.md |   8 
 src/get-started/beam-overview.md  |   6 +-
 src/images/logo_gearpump.png  | Bin 4691 -> 0 bytes
 src/index.md  |   3 ---
 8 files changed, 24 insertions(+), 14 deletions(-)
 delete mode 100644 src/images/logo_gearpump.png

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" '].


[beam-site] 01/02: Fixes SDF blog post after rename of Match to FileIO.match

2017-09-07 Thread mergebot-role
This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit b50a4a99fa88c8c14c7f555be984c134b7bf
Author: Eugene Kirpichov 
AuthorDate: Thu Sep 7 13:35:57 2017 -0700

Fixes SDF blog post after rename of Match to FileIO.match
---
 src/_posts/2017-08-04-splittable-do-fn.md | 17 +
 1 file changed, 9 insertions(+), 8 deletions(-)

diff --git a/src/_posts/2017-08-04-splittable-do-fn.md 
b/src/_posts/2017-08-04-splittable-do-fn.md
index d5379fd..156a6ad 100644
--- a/src/_posts/2017-08-04-splittable-do-fn.md
+++ b/src/_posts/2017-08-04-splittable-do-fn.md
@@ -441,17 +441,17 @@ class AvroReader(DoFn):
 This hypothetical `DoFn` reads records from a single Avro file. Notably missing
 is the code for expanding a filepattern: it no longer needs to be part of this
 `DoFn`! Instead, the SDK includes a
-[Match.filepatterns()](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java)
+[FileIO.matchAll()](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java)
 transform for expanding a filepattern into a `PCollection` of filenames, and
 different file format IOs can reuse the same transform, reading the files with
 different `DoFn`s.
 
 This example demonstrates the benefits of increased modularity allowed by SDF:
-`Match` supports continuous ingestion of new files in streaming pipelines using
-`.continuously()`, and this functionality becomes automatically available to
-various file format IOs. For example, `TextIO.read().watchForNewFiles()` [uses
-`Match` under the
-hood)](https://github.com/apache/beam/blob/f7e8f886c91ea9d0b51e00331eeb4484e2f6e000/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java#L480).
+`FileIO.matchAll()` supports continuous ingestion of new files in streaming
+pipelines using `.continuously()`, and this functionality becomes automatically
+available to various file format IOs. For example,
+`TextIO.read().watchForNewFiles()` [uses `FileIO.matchAll()` under the
+hood)](https://github.com/apache/beam/blob/3bd68ecfd7d576d78e02deb0476e549f11e1b5ef/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java#L486).
 
 ## Current status
 
@@ -471,9 +471,10 @@ Several SDF-based transforms and IO connectors are 
available for Beam users at
 HEAD and will be included in Beam 2.2.0. `TextIO` and `AvroIO` finally provide
 continuous ingestion of files (one of the most frequently requested features)
 via `.watchForNewFiles()` which is backed by the utility transforms
-`Match.filepatterns().continuously()` and the more general
+`FileIO.matchAll().continuously()` and the more general
 
[`Watch.growthOf()`](https://github.com/apache/beam/blob/f7e8f886c91ea9d0b51e00331eeb4484e2f6e000/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java).
-These utility transforms are also independently useful for "power user" use 
cases.
+These utility transforms are also independently useful for "power user" use
+cases.
 
 To enable more flexible use cases for IOs currently based on the Source API, we
 will change them to use SDF. This transition is [pioneered by

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" .


[jira] [Commented] (BEAM-2822) Add support for progress reporting in fn API

2017-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16157627#comment-16157627
 ] 

ASF GitHub Bot commented on BEAM-2822:
--

Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3802


> Add support for progress reporting in fn API
> 
>
> Key: BEAM-2822
> URL: https://issues.apache.org/jira/browse/BEAM-2822
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Vikas Kedigehalli
>Assignee: Vikas Kedigehalli
>Priority: Minor
>
> https://s.apache.org/beam-fn-api-progress-reporting



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3802: [BEAM-2822] Add fn API progress reporting protos

2017-09-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3802


---


[1/2] beam git commit: Add fn API progress reporting protos

2017-09-07 Thread lcwik
Repository: beam
Updated Branches:
  refs/heads/master 3bd68ecfd -> 80c86f81b


Add fn API progress reporting protos


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/53d8b27c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/53d8b27c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/53d8b27c

Branch: refs/heads/master
Commit: 53d8b27ca1bc186d01c36ae663c8e3ab82011d00
Parents: 3bd68ec
Author: Vikas Kedigehalli 
Authored: Fri Sep 1 09:45:22 2017 -0700
Committer: Luke Cwik 
Committed: Thu Sep 7 13:48:07 2017 -0700

--
 .../fn-api/src/main/proto/beam_fn_api.proto | 98 ++--
 1 file changed, 89 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/53d8b27c/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
--
diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto 
b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
index 53d67bc..9bf1b5f 100644
--- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
+++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
@@ -97,7 +97,9 @@ service BeamFnControl {
   ) {}
 }
 
-// A request sent by a runner which it the SDK is asked to fulfill.
+// A request sent by a runner which the SDK is asked to fulfill.
+// For any unsupported request type, an error should be returned with a
+// matching instruction id.
 // Stable
 message InstructionRequest {
   // (Required) An unique identifier provided by the runner which represents
@@ -189,23 +191,101 @@ message ProcessBundleRequest {
 
 // Stable
 message ProcessBundleResponse {
+  // (Optional) If metrics reporting is supported by the SDK, this represents
+  // the final metrics to record for this bundle.
+  Metrics metrics = 1;
 }
 
+// A request to report progress information for a given bundle.
+// This is an optional request to be handled and is used to support advanced
+// SDK features such as SplittableDoFn, user level metrics etc.
 message ProcessBundleProgressRequest {
   // (Required) A reference to an active process bundle request with the given
   // instruction id.
   string instruction_reference = 1;
 }
 
+message Metrics {
+  // PTransform level metrics.
+  // These metrics are split into processed and active element groups for
+  // progress reporting purposes. This allows a Runner to see what is measured,
+  // what is estimated and what can be extrapolated to be able to accurately
+  // estimate the backlog of remaining work.
+  message PTransform {
+// Metrics that are measured for processed and active element groups.
+message Measured {
+  // (Required) Map from local input name to number of elements processed
+  // from this input.
+  map input_element_counts = 1;
+
+  // (Required) Map from local output name to number of elements produced
+  // for this output.
+  map output_element_counts = 2;
+
+  // (Optional) The total time spent so far in processing the elements in
+  // this group.
+  int64 total_time_spent = 3;
+
+  // TODO: Add other element group level metrics.
+}
+
+// Metrics for fully processed elements.
+message ProcessedElements {
+  // (Required)
+  Measured measured = 1;
+}
+
+// Metrics for active elements.
+// An element is considered active if the SDK has started but not finished
+// processing it yet.
+message ActiveElements {
+  // (Required)
+  Measured measured = 1;
+
+  // Estimated metrics.
+
+  // (Optional) Sum of estimated fraction of known work remaining for all
+  // active elements, as reported by this transform.
+  // If not reported, a Runner could extrapolate this from the processed
+  // elements.
+  // TODO: Handle the case when known work is infinite.
+  double fraction_remaining = 2;
+
+  // (Optional) Map from local output name to sum of estimated number
+  // of elements remaining for this output from all active elements,
+  // as reported by this transform.
+  // If not reported, a Runner could extrapolate this from the processed
+  // elements.
+  map output_elements_remaining = 3;
+}
+
+// (Required): Metrics for processed elements.
+ProcessedElements processed_elements = 1;
+// (Required): Metrics for active elements.
+ActiveElements active_elements = 2;
+
+// (Optional): Map from local output name to its watermark.
+// The watermarks reported are tentative, to get a better sense of progress
+// while processing a bundle but before it is committed. At bundle commit
+// time, a Runner needs to also take into account the timers set to compute
+// the actual 

[2/2] beam git commit: [BEAM-2822] Add fn API progress reporting protos

2017-09-07 Thread lcwik
[BEAM-2822] Add fn API progress reporting protos

This closes #3802


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/80c86f81
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/80c86f81
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/80c86f81

Branch: refs/heads/master
Commit: 80c86f81b40e2342bff5bec4a5a1162b5cf67b14
Parents: 3bd68ec 53d8b27
Author: Luke Cwik 
Authored: Thu Sep 7 13:48:34 2017 -0700
Committer: Luke Cwik 
Committed: Thu Sep 7 13:48:34 2017 -0700

--
 .../fn-api/src/main/proto/beam_fn_api.proto | 98 ++--
 1 file changed, 89 insertions(+), 9 deletions(-)
--




[jira] [Updated] (BEAM-2852) Add support for Kafka as source/sink on Nexmark

2017-09-07 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/BEAM-2852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía updated BEAM-2852:
---
Labels: newbie starter  (was: )

> Add support for Kafka as source/sink on Nexmark
> ---
>
> Key: BEAM-2852
> URL: https://issues.apache.org/jira/browse/BEAM-2852
> Project: Beam
>  Issue Type: Improvement
>  Components: testing
>Reporter: Ismaël Mejía
>Priority: Minor
>  Labels: newbie, starter
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2854) Add SQL versions of the Nexmark queries (when possible)

2017-09-07 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/BEAM-2854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía updated BEAM-2854:
---
Labels: newbie starter  (was: )

> Add SQL versions of the Nexmark queries (when possible)
> ---
>
> Key: BEAM-2854
> URL: https://issues.apache.org/jira/browse/BEAM-2854
> Project: Beam
>  Issue Type: Improvement
>  Components: testing
>Reporter: Ismaël Mejía
>Priority: Minor
>  Labels: newbie, starter
>
> Wit the new support for SQL we can translate some of the Nexmark queries and 
> eventually also add new ones to expand the Nexmark suite.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2855) Implement a python version of the nexmark queries

2017-09-07 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/BEAM-2855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía updated BEAM-2855:
---
Labels: newbie starter  (was: )

> Implement a python version of the nexmark queries
> -
>
> Key: BEAM-2855
> URL: https://issues.apache.org/jira/browse/BEAM-2855
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py, testing
>Reporter: Ismaël Mejía
>Priority: Minor
>  Labels: newbie, starter
>
> Currently we have a Java only implementation of Nexmark, a python based 
> implementation would be nice to have to validate the direct and dataflow 
> runners, but also to validate the new support of multiple SDKs in multiple 
> runners via the runner/fn API.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-2864) Support backfill deduplication in BigQueryIO.write()

2017-09-07 Thread Eugene Kirpichov (JIRA)
Eugene Kirpichov created BEAM-2864:
--

 Summary: Support backfill deduplication in BigQueryIO.write()
 Key: BEAM-2864
 URL: https://issues.apache.org/jira/browse/BEAM-2864
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-gcp
Reporter: Eugene Kirpichov
Assignee: Reuven Lax


See https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/603 motivated 
by SO question 
https://stackoverflow.com/questions/46076914/apache-beam-update-bigquery-table-row-with-bigqueryio

Perhaps one way we can do this is make BigQueryIO return a PValue that can be 
sequenced with other things, and implement a BigQuery.update() transform that 
executes a single DML statement (or a small collection thereof - since DML in 
BigQuery is very scarce), and let the user sandwich them together if they would 
like to.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2815) Python DirectRunner is unusable with input files in the 100-250MB range

2017-09-07 Thread Ahmet Altay (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16157573#comment-16157573
 ] 

Ahmet Altay commented on BEAM-2815:
---

Thank you [~pk11] for the offer. [~charleschen] is coordinating the effort 
here, he can reach out to you if there are pieces that can be done in parallel.

> Python DirectRunner is unusable with input files in the 100-250MB range
> ---
>
> Key: BEAM-2815
> URL: https://issues.apache.org/jira/browse/BEAM-2815
> Project: Beam
>  Issue Type: Bug
>  Components: runner-direct, sdk-py
>Affects Versions: 2.1.0
> Environment: python 2.7.10, beam 2.1, os x 
>Reporter: Peter Hausel
>Assignee: Charles Chen
> Attachments: Screen Shot 2017-08-27 at 9.00.29 AM.png, Screen Shot 
> 2017-08-27 at 9.06.00 AM.png
>
>
> The current python DirectRunner implementation seems to be unusable with 
> training data sets that are bigger than tiny samples - making serious local 
> development impossible or very cumbersome. I am aware of some of the 
> limitations of the current DirectRunner implementation[1][2][3], however I 
> was not sure if this odd behavior is expected.
> [1][2][3]
> https://stackoverflow.com/a/44765621
> https://issues.apache.org/jira/browse/BEAM-1442
> https://beam.apache.org/documentation/runners/direct/
> Repro:
> The simple script below blew up my laptop (MBP 2015) and had to terminate the 
> process after 10 minutes or so (screenshots about high memory and CPU 
> utilization are also attached).
> {code}
> from apache_beam.io import textio
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import SetupOptions
> import argparse
> def run(argv=None):
>  """Main entry point; defines and runs the pipeline."""
>  parser = argparse.ArgumentParser()
>  parser.add_argument('--input',
>   dest='input',
>   
> default='/Users/pk11/Desktop/Motor_Vehicle_Crashes_-_Vehicle_Information__Three_Year_Window.csv',
>   help='Input file to process.')
>  known_args, pipeline_args = parser.parse_known_args(argv)
>  pipeline_options = PipelineOptions(pipeline_args)
>  pipeline_options.view_as(SetupOptions).save_main_session = True
>  pipeline = beam.Pipeline(options=pipeline_options)
>  raw_data = (
>pipeline
>| 'ReadTrainData' >> textio.ReadFromText(known_args.input, 
> skip_header_lines=1)
>| 'Map' >> beam.Map(lambda line: line.lower())
>  )
>  result = pipeline.run()
>  result.wait_until_finish()
>  print(raw_data)
> if __name__ == '__main__':
>   run()
> {code}
> Example dataset:  
> https://catalog.data.gov/dataset/motor-vehicle-crashes-vehicle-information-beginning-2009
> for comparison: 
> {code}
> lines = [line.lower() for line in 
> open('/Users/pk11/Desktop/Motor_Vehicle_Crashes_-_Vehicle_Information__Three_Year_Window.csv')]
> print(len(lines))
> {code}
> this vanilla python script runs on the same hardware and dataset in 0m4.909s. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-2815) Python DirectRunner is unusable with input files in the 100-250MB range

2017-09-07 Thread Ahmet Altay (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmet Altay reassigned BEAM-2815:
-

Assignee: Charles Chen  (was: Ahmet Altay)

> Python DirectRunner is unusable with input files in the 100-250MB range
> ---
>
> Key: BEAM-2815
> URL: https://issues.apache.org/jira/browse/BEAM-2815
> Project: Beam
>  Issue Type: Bug
>  Components: runner-direct, sdk-py
>Affects Versions: 2.1.0
> Environment: python 2.7.10, beam 2.1, os x 
>Reporter: Peter Hausel
>Assignee: Charles Chen
> Attachments: Screen Shot 2017-08-27 at 9.00.29 AM.png, Screen Shot 
> 2017-08-27 at 9.06.00 AM.png
>
>
> The current python DirectRunner implementation seems to be unusable with 
> training data sets that are bigger than tiny samples - making serious local 
> development impossible or very cumbersome. I am aware of some of the 
> limitations of the current DirectRunner implementation[1][2][3], however I 
> was not sure if this odd behavior is expected.
> [1][2][3]
> https://stackoverflow.com/a/44765621
> https://issues.apache.org/jira/browse/BEAM-1442
> https://beam.apache.org/documentation/runners/direct/
> Repro:
> The simple script below blew up my laptop (MBP 2015) and had to terminate the 
> process after 10 minutes or so (screenshots about high memory and CPU 
> utilization are also attached).
> {code}
> from apache_beam.io import textio
> import apache_beam as beam
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import SetupOptions
> import argparse
> def run(argv=None):
>  """Main entry point; defines and runs the pipeline."""
>  parser = argparse.ArgumentParser()
>  parser.add_argument('--input',
>   dest='input',
>   
> default='/Users/pk11/Desktop/Motor_Vehicle_Crashes_-_Vehicle_Information__Three_Year_Window.csv',
>   help='Input file to process.')
>  known_args, pipeline_args = parser.parse_known_args(argv)
>  pipeline_options = PipelineOptions(pipeline_args)
>  pipeline_options.view_as(SetupOptions).save_main_session = True
>  pipeline = beam.Pipeline(options=pipeline_options)
>  raw_data = (
>pipeline
>| 'ReadTrainData' >> textio.ReadFromText(known_args.input, 
> skip_header_lines=1)
>| 'Map' >> beam.Map(lambda line: line.lower())
>  )
>  result = pipeline.run()
>  result.wait_until_finish()
>  print(raw_data)
> if __name__ == '__main__':
>   run()
> {code}
> Example dataset:  
> https://catalog.data.gov/dataset/motor-vehicle-crashes-vehicle-information-beginning-2009
> for comparison: 
> {code}
> lines = [line.lower() for line in 
> open('/Users/pk11/Desktop/Motor_Vehicle_Crashes_-_Vehicle_Information__Three_Year_Window.csv')]
> print(len(lines))
> {code}
> this vanilla python script runs on the same hardware and dataset in 0m4.909s. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2764) SolrIOTest fails in master branch

2017-09-07 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/BEAM-2764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía updated BEAM-2764:
---
Fix Version/s: Not applicable

> SolrIOTest fails in master branch
> -
>
> Key: BEAM-2764
> URL: https://issues.apache.org/jira/browse/BEAM-2764
> Project: Beam
>  Issue Type: Test
>  Components: sdk-java-extensions
>Reporter: Ted Yu
>Assignee: Cao Manh Dat
>Priority: Minor
> Fix For: 2.2.0
>
>
> From 
> https://builds.apache.org/job/beam_Release_NightlySnapshot/org.apache.beam$beam-sdks-java-io-solr/503/testReport/junit/org.apache.beam.sdk.io.solr/SolrIOTest/testSizes/
>  :
> {code}
> java.lang.AssertionError: 
> Wrong estimated size beyond maximum
> Expected: a value less than <36000L>
>  but: <38762L> was greater than <36000L>
>   at 
> __randomizedtesting.SeedInfo.seed([B412D95ADCA1707D:1747543CE2D1CF75]:0)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.junit.Assert.assertThat(Assert.java:956)
>   at org.apache.beam.sdk.io.solr.SolrIOTest.testSizes(SolrIOTest.java:150)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner.invoke(RandomizedRunner.java:1764)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$8.evaluate(RandomizedRunner.java:871)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$9.evaluate(RandomizedRunner.java:907)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$10.evaluate(RandomizedRunner.java:921)
>   at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
>   at 
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:320)
>   at 
> com.carrotsearch.randomizedtesting.rules.SystemPropertiesRestoreRule$1.evaluate(SystemPropertiesRestoreRule.java:57)
>   at 
> org.apache.lucene.util.TestRuleSetupTeardownChained$1.evaluate(TestRuleSetupTeardownChained.java:49)
>   at 
> org.apache.lucene.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:45)
>   at 
> org.apache.lucene.util.TestRuleThreadAndTestName$1.evaluate(TestRuleThreadAndTestName.java:48)
>   at 
> org.apache.lucene.util.TestRuleIgnoreAfterMaxFailures$1.evaluate(TestRuleIgnoreAfterMaxFailures.java:64)
>   at 
> org.apache.lucene.util.TestRuleMarkFailure$1.evaluate(TestRuleMarkFailure.java:47)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> com.carrotsearch.randomizedtesting.ThreadLeakControl$StatementRunner.run(ThreadLeakControl.java:367)
>   at 
> com.carrotsearch.randomizedtesting.ThreadLeakControl.forkTimeoutingTask(ThreadLeakControl.java:809)
>   at 
> com.carrotsearch.randomizedtesting.ThreadLeakControl$3.evaluate(ThreadLeakControl.java:460)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner.runSingleTest(RandomizedRunner.java:880)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$5.evaluate(RandomizedRunner.java:781)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$6.evaluate(RandomizedRunner.java:816)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$7.evaluate(RandomizedRunner.java:827)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> com.carrotsearch.randomizedtesting.rules.SystemPropertiesRestoreRule$1.evaluate(SystemPropertiesRestoreRule.java:57)
>   at 
> org.apache.lucene.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:45)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> org.apache.lucene.util.TestRuleStoreClassName$1.evaluate(TestRuleStoreClassName.java:41)
>   at 
> com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
>   at 
> com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> 

[jira] [Updated] (BEAM-2764) SolrIOTest fails in master branch

2017-09-07 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/BEAM-2764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía updated BEAM-2764:
---
Fix Version/s: (was: Not applicable)
   2.2.0

> SolrIOTest fails in master branch
> -
>
> Key: BEAM-2764
> URL: https://issues.apache.org/jira/browse/BEAM-2764
> Project: Beam
>  Issue Type: Test
>  Components: sdk-java-extensions
>Reporter: Ted Yu
>Assignee: Cao Manh Dat
>Priority: Minor
> Fix For: 2.2.0
>
>
> From 
> https://builds.apache.org/job/beam_Release_NightlySnapshot/org.apache.beam$beam-sdks-java-io-solr/503/testReport/junit/org.apache.beam.sdk.io.solr/SolrIOTest/testSizes/
>  :
> {code}
> java.lang.AssertionError: 
> Wrong estimated size beyond maximum
> Expected: a value less than <36000L>
>  but: <38762L> was greater than <36000L>
>   at 
> __randomizedtesting.SeedInfo.seed([B412D95ADCA1707D:1747543CE2D1CF75]:0)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.junit.Assert.assertThat(Assert.java:956)
>   at org.apache.beam.sdk.io.solr.SolrIOTest.testSizes(SolrIOTest.java:150)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner.invoke(RandomizedRunner.java:1764)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$8.evaluate(RandomizedRunner.java:871)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$9.evaluate(RandomizedRunner.java:907)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$10.evaluate(RandomizedRunner.java:921)
>   at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
>   at 
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:320)
>   at 
> com.carrotsearch.randomizedtesting.rules.SystemPropertiesRestoreRule$1.evaluate(SystemPropertiesRestoreRule.java:57)
>   at 
> org.apache.lucene.util.TestRuleSetupTeardownChained$1.evaluate(TestRuleSetupTeardownChained.java:49)
>   at 
> org.apache.lucene.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:45)
>   at 
> org.apache.lucene.util.TestRuleThreadAndTestName$1.evaluate(TestRuleThreadAndTestName.java:48)
>   at 
> org.apache.lucene.util.TestRuleIgnoreAfterMaxFailures$1.evaluate(TestRuleIgnoreAfterMaxFailures.java:64)
>   at 
> org.apache.lucene.util.TestRuleMarkFailure$1.evaluate(TestRuleMarkFailure.java:47)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> com.carrotsearch.randomizedtesting.ThreadLeakControl$StatementRunner.run(ThreadLeakControl.java:367)
>   at 
> com.carrotsearch.randomizedtesting.ThreadLeakControl.forkTimeoutingTask(ThreadLeakControl.java:809)
>   at 
> com.carrotsearch.randomizedtesting.ThreadLeakControl$3.evaluate(ThreadLeakControl.java:460)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner.runSingleTest(RandomizedRunner.java:880)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$5.evaluate(RandomizedRunner.java:781)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$6.evaluate(RandomizedRunner.java:816)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$7.evaluate(RandomizedRunner.java:827)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> com.carrotsearch.randomizedtesting.rules.SystemPropertiesRestoreRule$1.evaluate(SystemPropertiesRestoreRule.java:57)
>   at 
> org.apache.lucene.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:45)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> org.apache.lucene.util.TestRuleStoreClassName$1.evaluate(TestRuleStoreClassName.java:41)
>   at 
> com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
>   at 
> com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> 

[jira] [Resolved] (BEAM-2764) SolrIOTest fails in master branch

2017-09-07 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/BEAM-2764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía resolved BEAM-2764.

Resolution: Fixed

> SolrIOTest fails in master branch
> -
>
> Key: BEAM-2764
> URL: https://issues.apache.org/jira/browse/BEAM-2764
> Project: Beam
>  Issue Type: Test
>  Components: sdk-java-extensions
>Reporter: Ted Yu
>Assignee: Cao Manh Dat
>Priority: Minor
> Fix For: 2.2.0
>
>
> From 
> https://builds.apache.org/job/beam_Release_NightlySnapshot/org.apache.beam$beam-sdks-java-io-solr/503/testReport/junit/org.apache.beam.sdk.io.solr/SolrIOTest/testSizes/
>  :
> {code}
> java.lang.AssertionError: 
> Wrong estimated size beyond maximum
> Expected: a value less than <36000L>
>  but: <38762L> was greater than <36000L>
>   at 
> __randomizedtesting.SeedInfo.seed([B412D95ADCA1707D:1747543CE2D1CF75]:0)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.junit.Assert.assertThat(Assert.java:956)
>   at org.apache.beam.sdk.io.solr.SolrIOTest.testSizes(SolrIOTest.java:150)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner.invoke(RandomizedRunner.java:1764)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$8.evaluate(RandomizedRunner.java:871)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$9.evaluate(RandomizedRunner.java:907)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$10.evaluate(RandomizedRunner.java:921)
>   at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
>   at 
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:320)
>   at 
> com.carrotsearch.randomizedtesting.rules.SystemPropertiesRestoreRule$1.evaluate(SystemPropertiesRestoreRule.java:57)
>   at 
> org.apache.lucene.util.TestRuleSetupTeardownChained$1.evaluate(TestRuleSetupTeardownChained.java:49)
>   at 
> org.apache.lucene.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:45)
>   at 
> org.apache.lucene.util.TestRuleThreadAndTestName$1.evaluate(TestRuleThreadAndTestName.java:48)
>   at 
> org.apache.lucene.util.TestRuleIgnoreAfterMaxFailures$1.evaluate(TestRuleIgnoreAfterMaxFailures.java:64)
>   at 
> org.apache.lucene.util.TestRuleMarkFailure$1.evaluate(TestRuleMarkFailure.java:47)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> com.carrotsearch.randomizedtesting.ThreadLeakControl$StatementRunner.run(ThreadLeakControl.java:367)
>   at 
> com.carrotsearch.randomizedtesting.ThreadLeakControl.forkTimeoutingTask(ThreadLeakControl.java:809)
>   at 
> com.carrotsearch.randomizedtesting.ThreadLeakControl$3.evaluate(ThreadLeakControl.java:460)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner.runSingleTest(RandomizedRunner.java:880)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$5.evaluate(RandomizedRunner.java:781)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$6.evaluate(RandomizedRunner.java:816)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$7.evaluate(RandomizedRunner.java:827)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> com.carrotsearch.randomizedtesting.rules.SystemPropertiesRestoreRule$1.evaluate(SystemPropertiesRestoreRule.java:57)
>   at 
> org.apache.lucene.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:45)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> org.apache.lucene.util.TestRuleStoreClassName$1.evaluate(TestRuleStoreClassName.java:41)
>   at 
> com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
>   at 
> com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> 

[2/2] beam git commit: This closes #3727

2017-09-07 Thread iemejia
This closes #3727


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3bd68ecf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3bd68ecf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3bd68ecf

Branch: refs/heads/master
Commit: 3bd68ecfd7d576d78e02deb0476e549f11e1b5ef
Parents: bd4eab0 3cba56a
Author: Ismaël Mejía 
Authored: Thu Sep 7 22:02:54 2017 +0200
Committer: Ismaël Mejía 
Committed: Thu Sep 7 22:02:54 2017 +0200

--
 .../test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--




[1/2] beam git commit: [BEAM-2764] Change document size range to fix flakiness on SolrIO tests

2017-09-07 Thread iemejia
Repository: beam
Updated Branches:
  refs/heads/master bd4eab032 -> 3bd68ecfd


[BEAM-2764] Change document size range to fix flakiness on SolrIO tests


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3cba56a6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3cba56a6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3cba56a6

Branch: refs/heads/master
Commit: 3cba56a621117659c0b20bb8247f0864b5b218dd
Parents: bd4eab0
Author: Cao Manh Dat 
Authored: Thu Aug 17 14:05:46 2017 +0700
Committer: Ismaël Mejía 
Committed: Thu Sep 7 22:01:36 2017 +0200

--
 .../test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/3cba56a6/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java
--
diff --git 
a/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java
 
b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java
index 808cd0f..fb99d55 100644
--- 
a/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java
+++ 
b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTestUtils.java
@@ -28,8 +28,8 @@ import org.apache.solr.common.SolrInputDocument;
 
 /** Test utilities to use with {@link SolrIO}. */
 public class SolrIOTestUtils {
-  public static final long MIN_DOC_SIZE = 40L;
-  public static final long MAX_DOC_SIZE = 90L;
+  public static final long MIN_DOC_SIZE = 30L;
+  public static final long MAX_DOC_SIZE = 150L;
 
   static void createCollection(
   String collection, int numShards, int replicationFactor, 
AuthorizedSolrClient client)



[jira] [Commented] (BEAM-2764) SolrIOTest fails in master branch

2017-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16157550#comment-16157550
 ] 

ASF GitHub Bot commented on BEAM-2764:
--

Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3727


> SolrIOTest fails in master branch
> -
>
> Key: BEAM-2764
> URL: https://issues.apache.org/jira/browse/BEAM-2764
> Project: Beam
>  Issue Type: Test
>  Components: sdk-java-extensions
>Reporter: Ted Yu
>Assignee: Cao Manh Dat
>Priority: Minor
>
> From 
> https://builds.apache.org/job/beam_Release_NightlySnapshot/org.apache.beam$beam-sdks-java-io-solr/503/testReport/junit/org.apache.beam.sdk.io.solr/SolrIOTest/testSizes/
>  :
> {code}
> java.lang.AssertionError: 
> Wrong estimated size beyond maximum
> Expected: a value less than <36000L>
>  but: <38762L> was greater than <36000L>
>   at 
> __randomizedtesting.SeedInfo.seed([B412D95ADCA1707D:1747543CE2D1CF75]:0)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.junit.Assert.assertThat(Assert.java:956)
>   at org.apache.beam.sdk.io.solr.SolrIOTest.testSizes(SolrIOTest.java:150)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner.invoke(RandomizedRunner.java:1764)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$8.evaluate(RandomizedRunner.java:871)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$9.evaluate(RandomizedRunner.java:907)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$10.evaluate(RandomizedRunner.java:921)
>   at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
>   at 
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:320)
>   at 
> com.carrotsearch.randomizedtesting.rules.SystemPropertiesRestoreRule$1.evaluate(SystemPropertiesRestoreRule.java:57)
>   at 
> org.apache.lucene.util.TestRuleSetupTeardownChained$1.evaluate(TestRuleSetupTeardownChained.java:49)
>   at 
> org.apache.lucene.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:45)
>   at 
> org.apache.lucene.util.TestRuleThreadAndTestName$1.evaluate(TestRuleThreadAndTestName.java:48)
>   at 
> org.apache.lucene.util.TestRuleIgnoreAfterMaxFailures$1.evaluate(TestRuleIgnoreAfterMaxFailures.java:64)
>   at 
> org.apache.lucene.util.TestRuleMarkFailure$1.evaluate(TestRuleMarkFailure.java:47)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> com.carrotsearch.randomizedtesting.ThreadLeakControl$StatementRunner.run(ThreadLeakControl.java:367)
>   at 
> com.carrotsearch.randomizedtesting.ThreadLeakControl.forkTimeoutingTask(ThreadLeakControl.java:809)
>   at 
> com.carrotsearch.randomizedtesting.ThreadLeakControl$3.evaluate(ThreadLeakControl.java:460)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner.runSingleTest(RandomizedRunner.java:880)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$5.evaluate(RandomizedRunner.java:781)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$6.evaluate(RandomizedRunner.java:816)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$7.evaluate(RandomizedRunner.java:827)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> com.carrotsearch.randomizedtesting.rules.SystemPropertiesRestoreRule$1.evaluate(SystemPropertiesRestoreRule.java:57)
>   at 
> org.apache.lucene.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:45)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> org.apache.lucene.util.TestRuleStoreClassName$1.evaluate(TestRuleStoreClassName.java:41)
>   at 
> com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
>   at 
> com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   

[GitHub] beam pull request #3727: BEAM-2764: Set right range for min_doc_size and max...

2017-09-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3727


---


[jira] [Created] (BEAM-2863) Add support for Side Inputs over the Fn API

2017-09-07 Thread Luke Cwik (JIRA)
Luke Cwik created BEAM-2863:
---

 Summary: Add support for Side Inputs over the Fn API
 Key: BEAM-2863
 URL: https://issues.apache.org/jira/browse/BEAM-2863
 Project: Beam
  Issue Type: Improvement
  Components: beam-model
Reporter: Luke Cwik


See:
* https://s.apache.org/beam-side-inputs-1-pager
* http://s.apache.org/beam-fn-api-state-api



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-1962) Connection should be closed in case start() throws exception

2017-09-07 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated BEAM-1962:
-
Description: 
In JmsIO#start() :

{code}
  try {
Connection connection;
if (spec.getUsername() != null) {
  connection =
  connectionFactory.createConnection(spec.getUsername(), 
spec.getPassword());
} else {
  connection = connectionFactory.createConnection();
}
connection.start();
this.connection = connection;
  } catch (Exception e) {
throw new IOException("Error connecting to JMS", e);
  }
{code}
If start() throws exception, connection should be closed.

  was:
In JmsIO#start() :
{code}
  try {
Connection connection;
if (spec.getUsername() != null) {
  connection =
  connectionFactory.createConnection(spec.getUsername(), 
spec.getPassword());
} else {
  connection = connectionFactory.createConnection();
}
connection.start();
this.connection = connection;
  } catch (Exception e) {
throw new IOException("Error connecting to JMS", e);
  }
{code}
If start() throws exception, connection should be closed.


> Connection should be closed in case start() throws exception
> 
>
> Key: BEAM-1962
> URL: https://issues.apache.org/jira/browse/BEAM-1962
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Reporter: Ted Yu
>Assignee: Jean-Baptiste Onofré
>Priority: Minor
>
> In JmsIO#start() :
> {code}
>   try {
> Connection connection;
> if (spec.getUsername() != null) {
>   connection =
>   connectionFactory.createConnection(spec.getUsername(), 
> spec.getPassword());
> } else {
>   connection = connectionFactory.createConnection();
> }
> connection.start();
> this.connection = connection;
>   } catch (Exception e) {
> throw new IOException("Error connecting to JMS", e);
>   }
> {code}
> If start() throws exception, connection should be closed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2764) SolrIOTest fails in master branch

2017-09-07 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16157533#comment-16157533
 ] 

Ted Yu commented on BEAM-2764:
--

lgtm

> SolrIOTest fails in master branch
> -
>
> Key: BEAM-2764
> URL: https://issues.apache.org/jira/browse/BEAM-2764
> Project: Beam
>  Issue Type: Test
>  Components: sdk-java-extensions
>Reporter: Ted Yu
>Assignee: Cao Manh Dat
>Priority: Minor
>
> From 
> https://builds.apache.org/job/beam_Release_NightlySnapshot/org.apache.beam$beam-sdks-java-io-solr/503/testReport/junit/org.apache.beam.sdk.io.solr/SolrIOTest/testSizes/
>  :
> {code}
> java.lang.AssertionError: 
> Wrong estimated size beyond maximum
> Expected: a value less than <36000L>
>  but: <38762L> was greater than <36000L>
>   at 
> __randomizedtesting.SeedInfo.seed([B412D95ADCA1707D:1747543CE2D1CF75]:0)
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at org.junit.Assert.assertThat(Assert.java:956)
>   at org.apache.beam.sdk.io.solr.SolrIOTest.testSizes(SolrIOTest.java:150)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner.invoke(RandomizedRunner.java:1764)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$8.evaluate(RandomizedRunner.java:871)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$9.evaluate(RandomizedRunner.java:907)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$10.evaluate(RandomizedRunner.java:921)
>   at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
>   at 
> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:320)
>   at 
> com.carrotsearch.randomizedtesting.rules.SystemPropertiesRestoreRule$1.evaluate(SystemPropertiesRestoreRule.java:57)
>   at 
> org.apache.lucene.util.TestRuleSetupTeardownChained$1.evaluate(TestRuleSetupTeardownChained.java:49)
>   at 
> org.apache.lucene.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:45)
>   at 
> org.apache.lucene.util.TestRuleThreadAndTestName$1.evaluate(TestRuleThreadAndTestName.java:48)
>   at 
> org.apache.lucene.util.TestRuleIgnoreAfterMaxFailures$1.evaluate(TestRuleIgnoreAfterMaxFailures.java:64)
>   at 
> org.apache.lucene.util.TestRuleMarkFailure$1.evaluate(TestRuleMarkFailure.java:47)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> com.carrotsearch.randomizedtesting.ThreadLeakControl$StatementRunner.run(ThreadLeakControl.java:367)
>   at 
> com.carrotsearch.randomizedtesting.ThreadLeakControl.forkTimeoutingTask(ThreadLeakControl.java:809)
>   at 
> com.carrotsearch.randomizedtesting.ThreadLeakControl$3.evaluate(ThreadLeakControl.java:460)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner.runSingleTest(RandomizedRunner.java:880)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$5.evaluate(RandomizedRunner.java:781)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$6.evaluate(RandomizedRunner.java:816)
>   at 
> com.carrotsearch.randomizedtesting.RandomizedRunner$7.evaluate(RandomizedRunner.java:827)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> com.carrotsearch.randomizedtesting.rules.SystemPropertiesRestoreRule$1.evaluate(SystemPropertiesRestoreRule.java:57)
>   at 
> org.apache.lucene.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:45)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> org.apache.lucene.util.TestRuleStoreClassName$1.evaluate(TestRuleStoreClassName.java:41)
>   at 
> com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
>   at 
> com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
>   at 
> org.apache.lucene.util.TestRuleAssertionsRequired$1.evaluate(TestRuleAssertionsRequired.java:53)
>  

[jira] [Commented] (BEAM-2545) bigtable e2e tests failing - UNKNOWN: Stale requests/Error mutating row

2017-09-07 Thread Solomon Duskis (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16157507#comment-16157507
 ] 

Solomon Duskis commented on BEAM-2545:
--

FYI, the Cloud Bigtable team released a version of CloudBigtableIO that works 
for Beam.  I need to open a new issue here to discuss the possibility of 
creating a Cloud Bigtable beam connector that uses HBase objects.  There are a 
few ways we can go with that, and a few potential pitfalls that ought to be 
discussed.

> bigtable e2e tests failing -  UNKNOWN: Stale requests/Error mutating row
> 
>
> Key: BEAM-2545
> URL: https://issues.apache.org/jira/browse/BEAM-2545
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Reporter: Stephen Sisk
>Assignee: Chamikara Jayalath
>
> The BigtableWriteIT is taking a long time (~10min) and throwing errors. 
> Example test run: 
> https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/4264/org.apache.beam$beam-runners-google-cloud-dataflow-java/testReport/junit/org.apache.beam.sdk.io.gcp.bigtable/BigtableWriteIT/testE2EBigtableWrite/
> (96dc5c8efaf8fa26): java.io.IOException: At least 25 errors occurred writing 
> to Bigtable. First 10 errors: 
> Error mutating row key00175 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00175"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00176 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00176"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00177 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00177"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00178 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00178"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00179 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00179"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00180 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00180"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00181 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00181"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00182 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00182"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00183 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00183"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00184 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00184"
> }
> ]: UNKNOWN: Stale requests.
>  at 
> org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$Write$BigtableWriterFn.checkForFailures(BigtableIO.java:655)
>  at 
> org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$Write$BigtableWriterFn.finishBundle(BigtableIO.java:607)
> Stacktrace
> java.lang.RuntimeException: 
> (96dc5c8efaf8fa26): java.io.IOException: At least 25 errors occurred writing 
> to Bigtable. First 10 errors: 
> Error mutating row key00175 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00175"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00176 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00176"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00177 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00177"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00178 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00178"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00179 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00179"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00180 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00180"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00181 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00181"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00182 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00182"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00183 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00183"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00184 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00184"
> }
> ]: UNKNOWN: Stale requests.
>   at 
> org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$Write$BigtableWriterFn.checkForFailures(BigtableIO.java:655)
>   at 
> org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$Write$BigtableWriterFn.finishBundle(BigtableIO.java:607)
>   at 
> 

[jira] [Comment Edited] (BEAM-2545) bigtable e2e tests failing - UNKNOWN: Stale requests/Error mutating row

2017-09-07 Thread Solomon Duskis (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16157488#comment-16157488
 ] 

Solomon Duskis edited comment on BEAM-2545 at 9/7/17 7:32 PM:
--

Yes.  1.0.0-pre3 is the proper version to choose.  It should fix that problem.

Users should be able to explicitly add 
com.google.cloud.bigtable:bigtable-client-core:1.0.0-pre3 to their maven/gradle 
configurations to fix this problem with Beam 2.1.0.


was (Author: sduskis):
Yes.  1.0.0-pre3 is the proper version to choose.  It should fix that problem.

Users should be able to explicitly add 
com.google.cloud.bigtabl:bigtable-client-core:1.0.0-pre3 to their maven/gradle 
configurations to fix this problem with Beam 2.1.0.

> bigtable e2e tests failing -  UNKNOWN: Stale requests/Error mutating row
> 
>
> Key: BEAM-2545
> URL: https://issues.apache.org/jira/browse/BEAM-2545
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Reporter: Stephen Sisk
>Assignee: Chamikara Jayalath
>
> The BigtableWriteIT is taking a long time (~10min) and throwing errors. 
> Example test run: 
> https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/4264/org.apache.beam$beam-runners-google-cloud-dataflow-java/testReport/junit/org.apache.beam.sdk.io.gcp.bigtable/BigtableWriteIT/testE2EBigtableWrite/
> (96dc5c8efaf8fa26): java.io.IOException: At least 25 errors occurred writing 
> to Bigtable. First 10 errors: 
> Error mutating row key00175 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00175"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00176 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00176"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00177 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00177"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00178 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00178"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00179 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00179"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00180 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00180"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00181 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00181"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00182 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00182"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00183 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00183"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00184 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00184"
> }
> ]: UNKNOWN: Stale requests.
>  at 
> org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$Write$BigtableWriterFn.checkForFailures(BigtableIO.java:655)
>  at 
> org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$Write$BigtableWriterFn.finishBundle(BigtableIO.java:607)
> Stacktrace
> java.lang.RuntimeException: 
> (96dc5c8efaf8fa26): java.io.IOException: At least 25 errors occurred writing 
> to Bigtable. First 10 errors: 
> Error mutating row key00175 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00175"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00176 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00176"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00177 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00177"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00178 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00178"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00179 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00179"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00180 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00180"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00181 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00181"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00182 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00182"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00183 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00183"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00184 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00184"
> }
> ]: UNKNOWN: Stale requests.
>   at 
> 

[jira] [Commented] (BEAM-2545) bigtable e2e tests failing - UNKNOWN: Stale requests/Error mutating row

2017-09-07 Thread Solomon Duskis (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16157488#comment-16157488
 ] 

Solomon Duskis commented on BEAM-2545:
--

Yes.  1.0.0-pre3 is the proper version to choose.  It should fix that problem.

Users should be able to explicitly add 
com.google.cloud.bigtabl:bigtable-client-core:1.0.0-pre3 to their maven/gradle 
configurations to fix this problem with Beam 2.1.0.

> bigtable e2e tests failing -  UNKNOWN: Stale requests/Error mutating row
> 
>
> Key: BEAM-2545
> URL: https://issues.apache.org/jira/browse/BEAM-2545
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Reporter: Stephen Sisk
>Assignee: Chamikara Jayalath
>
> The BigtableWriteIT is taking a long time (~10min) and throwing errors. 
> Example test run: 
> https://builds.apache.org/view/Beam/job/beam_PostCommit_Java_MavenInstall/4264/org.apache.beam$beam-runners-google-cloud-dataflow-java/testReport/junit/org.apache.beam.sdk.io.gcp.bigtable/BigtableWriteIT/testE2EBigtableWrite/
> (96dc5c8efaf8fa26): java.io.IOException: At least 25 errors occurred writing 
> to Bigtable. First 10 errors: 
> Error mutating row key00175 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00175"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00176 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00176"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00177 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00177"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00178 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00178"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00179 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00179"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00180 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00180"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00181 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00181"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00182 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00182"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00183 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00183"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00184 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00184"
> }
> ]: UNKNOWN: Stale requests.
>  at 
> org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$Write$BigtableWriterFn.checkForFailures(BigtableIO.java:655)
>  at 
> org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$Write$BigtableWriterFn.finishBundle(BigtableIO.java:607)
> Stacktrace
> java.lang.RuntimeException: 
> (96dc5c8efaf8fa26): java.io.IOException: At least 25 errors occurred writing 
> to Bigtable. First 10 errors: 
> Error mutating row key00175 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00175"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00176 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00176"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00177 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00177"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00178 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00178"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00179 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00179"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00180 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00180"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00181 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00181"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00182 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00182"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00183 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00183"
> }
> ]: UNKNOWN: Stale requests.
> Error mutating row key00184 with mutations [set_cell {
>   family_name: "cf"
>   value: "value00184"
> }
> ]: UNKNOWN: Stale requests.
>   at 
> org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$Write$BigtableWriterFn.checkForFailures(BigtableIO.java:655)
>   at 
> org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$Write$BigtableWriterFn.finishBundle(BigtableIO.java:607)
>   at 
> org.apache.beam.runners.dataflow.TestDataflowRunner.run(TestDataflowRunner.java:133)
>   at 
> 

[jira] [Commented] (BEAM-2862) Add support for User State over the Fn API

2017-09-07 Thread Luke Cwik (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16157426#comment-16157426
 ] 

Luke Cwik commented on BEAM-2862:
-

A lot of the work for the Java SDK harness was captured underneath BEAM-1347

> Add support for User State over the Fn API
> --
>
> Key: BEAM-2862
> URL: https://issues.apache.org/jira/browse/BEAM-2862
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Luke Cwik
>
> https://s.apache.org/beam-fn-api-state-api



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2862) Add support for User State over the Fn API

2017-09-07 Thread Luke Cwik (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Cwik updated BEAM-2862:

Description: https://s.apache.org/beam-fn-api-state-api  (was: Design: 
https://s.apache.org/beam-fn-api-state-api)

> Add support for User State over the Fn API
> --
>
> Key: BEAM-2862
> URL: https://issues.apache.org/jira/browse/BEAM-2862
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Luke Cwik
>
> https://s.apache.org/beam-fn-api-state-api



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-2862) Add support for User State over the Fn API

2017-09-07 Thread Luke Cwik (JIRA)
Luke Cwik created BEAM-2862:
---

 Summary: Add support for User State over the Fn API
 Key: BEAM-2862
 URL: https://issues.apache.org/jira/browse/BEAM-2862
 Project: Beam
  Issue Type: Improvement
  Components: beam-model
Reporter: Luke Cwik


Design: https://s.apache.org/beam-fn-api-state-api



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2421) Migrate Apache Beam to use impulse primitive as the only root primitive

2017-09-07 Thread Luke Cwik (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Cwik updated BEAM-2421:

Summary: Migrate Apache Beam to use impulse primitive as the only root 
primitive  (was: Migrate Dataflow to use impulse primitive as the only root 
primitive)

> Migrate Apache Beam to use impulse primitive as the only root primitive
> ---
>
> Key: BEAM-2421
> URL: https://issues.apache.org/jira/browse/BEAM-2421
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Luke Cwik
>
> The impulse source emits a single byte array element within the global window.
> This is in preparation for using SDF as the replacement for different bounded 
> and unbounded source implementations.
> Before:
> Read(Source) -> ParDo -> ...
> Impulse -> SDF(Source) -> ParDo -> ...



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-165) Add Hadoop MapReduce runner

2017-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16157414#comment-16157414
 ] 

ASF GitHub Bot commented on BEAM-165:
-

Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3705


> Add Hadoop MapReduce runner
> ---
>
> Key: BEAM-165
> URL: https://issues.apache.org/jira/browse/BEAM-165
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-ideas, runner-mapreduce
>Reporter: Jean-Baptiste Onofré
>Assignee: Pei He
>
> I think a MapReduce runner could be a good addition to Beam. It would allow 
> users to smoothly "migrate" from MapReduce to Spark or Flink.
> Of course, the MapReduce runner will run in batch mode (not stream).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[09/36] beam git commit: mr-runner: hack to get around that ViewAsXXX.expand() return wrong output PValue.

2017-09-07 Thread kenn
mr-runner: hack to get around that ViewAsXXX.expand() return wrong output 
PValue.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5905efd3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5905efd3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5905efd3

Branch: refs/heads/mr-runner
Commit: 5905efd3364f2cd27567126508576aac887a1f63
Parents: 98da2a2
Author: Pei He 
Authored: Wed Aug 2 21:59:21 2017 +0800
Committer: Pei He 
Committed: Thu Aug 31 14:13:48 2017 +0800

--
 .../translation/TranslationContext.java | 54 +++-
 1 file changed, 42 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/5905efd3/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
index 2b51df5..365bdc0 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
@@ -22,13 +22,17 @@ import static 
com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
 import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -82,6 +86,11 @@ public class TranslationContext {
   this.currentNode = node;
   for (Map.Entry entry : 
currentNode.getOutputs().entrySet()) {
 pValueToTupleTag.put(entry.getValue(), entry.getKey());
+// TODO: this is a hack to get around that ViewAsXXX.expand() return 
wrong output PValue.
+if (node.getTransform() instanceof View.CreatePCollectionView) {
+  View.CreatePCollectionView view = (View.CreatePCollectionView) 
node.getTransform();
+  pValueToTupleTag.put(view.getView(), 
view.getView().getTagInternal());
+}
   }
 }
 
@@ -98,29 +107,50 @@ public class TranslationContext {
 }
 
 public List getInputTags() {
-  return FluentIterable.from(currentNode.getInputs().values())
+  Iterable inputs;
+  if (currentNode.getTransform() instanceof ParDo.MultiOutput) {
+ParDo.MultiOutput parDo = (ParDo.MultiOutput) 
currentNode.getTransform();
+inputs = ImmutableList.builder()
+.add(getInput()).addAll(parDo.getSideInputs()).build();
+  } else {
+inputs = currentNode.getInputs().values();
+  }
+  return FluentIterable.from(inputs)
   .transform(new Function() {
 @Override
 public Graphs.Tag apply(PValue pValue) {
   checkState(
   pValueToTupleTag.containsKey(pValue),
   String.format("Failed to find TupleTag for pValue: %s.", 
pValue));
-  PCollection pc = (PCollection) pValue;
-  return Graphs.Tag.of(
-  pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder());
+  if (pValue instanceof PCollection) {
+PCollection pc = (PCollection) pValue;
+return Graphs.Tag.of(
+pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder());
+  } else {
+return Graphs.Tag.of(
+pValue.getName(),
+pValueToTupleTag.get(pValue),
+((PCollectionView) pValue).getCoderInternal());
+  }
 }})
   .toList();
 }
 
 public List getOutputTags() {
-  return FluentIterable.from(currentNode.getOutputs().entrySet())
-  .transform(new Function, 
Graphs.Tag>() {
-@Override
-public Graphs.Tag apply(Map.Entry entry) {
-  PCollection pc = (PCollection) entry.getValue();
-  return Graphs.Tag.of(pc.getName(), entry.getKey(), 
pc.getCoder());
-}})
-  .toList();
+  if 

[20/36] beam git commit: mr-runner: fix the bug that steps are attached multiple times in diamond shaped DAG.

2017-09-07 Thread kenn
mr-runner: fix the bug that steps are attached multiple times in diamond shaped 
DAG.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4e7062cd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4e7062cd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4e7062cd

Branch: refs/heads/mr-runner
Commit: 4e7062cd9de6ae3f0616033823fd995eb10a3744
Parents: 6c2390a
Author: Pei He 
Authored: Wed Aug 30 19:16:06 2017 +0800
Committer: Pei He 
Committed: Thu Aug 31 14:13:50 2017 +0800

--
 .../runners/mapreduce/translation/JobPrototype.java   | 14 ++
 1 file changed, 10 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/4e7062cd/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
index 39487fd..a0c6626 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
@@ -24,10 +24,12 @@ import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -139,7 +141,7 @@ public class JobPrototype {
 
   // Setup BeamReducer
   Graphs.Step reducerStartStep = gabwStep;
-  chainOperations(reducerStartStep, fusedStep);
+  chainOperations(reducerStartStep, fusedStep, 
Sets.newHashSet());
   conf.set(
   BeamReducer.BEAM_REDUCER_KV_CODER,
   Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
@@ -152,7 +154,7 @@ public class JobPrototype {
 }
 
 // Setup DoFns in BeamMapper.
-chainOperations(startStep, fusedStep);
+chainOperations(startStep, fusedStep, Sets.newHashSet());
 
 job.setMapOutputKeyClass(BytesWritable.class);
 job.setMapOutputValueClass(byte[].class);
@@ -177,7 +179,8 @@ public class JobPrototype {
 return job;
   }
 
-  private void chainOperations(Graphs.Step current, Graphs.FusedStep 
fusedStep) {
+  private void chainOperations(
+  Graphs.Step current, Graphs.FusedStep fusedStep, Set 
visited) {
 Operation operation = current.getOperation();
 List outputTags = fusedStep.getOutputTags(current);
 for (Graphs.Tag outTag : outputTags) {
@@ -185,9 +188,12 @@ public class JobPrototype {
 operation.attachConsumer(outTag.getTupleTag(), 
consumer.getOperation());
   }
 }
+visited.add(current);
 for (Graphs.Tag outTag : outputTags) {
   for (Graphs.Step consumer : fusedStep.getConsumers(outTag)) {
-chainOperations(consumer, fusedStep);
+if (!visited.contains(consumer)) {
+  chainOperations(consumer, fusedStep, visited);
+}
   }
 }
   }



[25/36] beam git commit: mr-runner: handle no files case in FileSideInputReader for empty views.

2017-09-07 Thread kenn
mr-runner: handle no files case in FileSideInputReader for empty views.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ca0b15ad
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ca0b15ad
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ca0b15ad

Branch: refs/heads/mr-runner
Commit: ca0b15ada6cdbdfba9ac8adb0b8c874477587fae
Parents: 807f903
Author: Pei He 
Authored: Thu Aug 31 17:29:04 2017 +0800
Committer: Pei He 
Committed: Fri Sep 1 17:13:40 2017 +0800

--
 .../mapreduce/translation/FileSideInputReader.java | 13 +++--
 1 file changed, 7 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/ca0b15ad/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
index cb3a8c4..403de4e 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
@@ -76,14 +76,15 @@ public class FileSideInputReader implements SideInputReader 
{
 try {
   FileSystem fs = pattern.getFileSystem(conf);
   FileStatus[] files = fs.globStatus(pattern);
-  // TODO: handle empty views which may result in no files case.
-  SequenceFile.Reader reader = new SequenceFile.Reader(fs, 
files[0].getPath(), conf);
 
   List availableSideInputs = new ArrayList<>();
-  BytesWritable value = new BytesWritable();
-  while (reader.next(NullWritable.get(), value)) {
-ByteArrayInputStream inStream = new 
ByteArrayInputStream(value.getBytes());
-availableSideInputs.add(elemCoder.decode(inStream));
+  if (files.length > 0) {
+SequenceFile.Reader reader = new SequenceFile.Reader(fs, 
files[0].getPath(), conf);
+BytesWritable value = new BytesWritable();
+while (reader.next(NullWritable.get(), value)) {
+  ByteArrayInputStream inStream = new 
ByteArrayInputStream(value.getBytes());
+  availableSideInputs.add(elemCoder.decode(inStream));
+}
   }
   Iterable sideInputForWindow =
   Iterables.filter(availableSideInputs, new 
Predicate() {



[33/36] beam git commit: mr-runner: support SourceMetrics, this fixes MetricsTest.testBoundedSourceMetrics().

2017-09-07 Thread kenn
mr-runner: support SourceMetrics, this fixes 
MetricsTest.testBoundedSourceMetrics().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c62b3ad4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c62b3ad4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c62b3ad4

Branch: refs/heads/mr-runner
Commit: c62b3ad462c2c07ce36cce025dc52204e7eb87d2
Parents: 5248ce4
Author: Pei He 
Authored: Fri Sep 1 16:55:19 2017 +0800
Committer: Pei He 
Committed: Fri Sep 1 17:15:08 2017 +0800

--
 .../mapreduce/translation/BeamInputFormat.java  | 39 +++-
 .../translation/FileReadOperation.java  |  4 ++
 .../translation/FlattenTranslator.java  | 12 +++---
 .../mapreduce/translation/GraphPlanner.java |  3 +-
 .../mapreduce/translation/JobPrototype.java |  4 +-
 .../translation/ReadBoundedTranslator.java  |  7 ++--
 .../mapreduce/translation/ReadOperation.java|  5 ++-
 .../translation/SourceReadOperation.java|  6 ++-
 8 files changed, 56 insertions(+), 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/c62b3ad4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
index 10d9ada..9dc3396 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
@@ -26,12 +26,15 @@ import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.List;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -87,7 +90,8 @@ public class BeamInputFormat extends InputFormat {
 .transform(new Function() {
   @Override
   public ReadOperation.TaggedSource 
apply(BoundedSource input) {
-return ReadOperation.TaggedSource.of(input, 
taggedSource.getTag());
+return ReadOperation.TaggedSource.of(
+taggedSource.getStepName(), input, 
taggedSource.getTag());
   }});
   } catch (Exception e) {
 Throwables.throwIfUnchecked(e);
@@ -98,7 +102,8 @@ public class BeamInputFormat extends InputFormat {
   .transform(new Function() {
 @Override
 public InputSplit apply(ReadOperation.TaggedSource taggedSource) {
-  return new BeamInputSplit(taggedSource.getSource(), options, 
taggedSource.getTag());
+  return new BeamInputSplit(taggedSource.getStepName(), 
taggedSource.getSource(),
+  options, taggedSource.getTag());
 }})
   .toList();
 } catch (Exception e) {
@@ -113,6 +118,7 @@ public class BeamInputFormat extends InputFormat {
   }
 
   public static class BeamInputSplit extends InputSplit implements Writable 
{
+private String stepName;
 private BoundedSource boundedSource;
 private SerializedPipelineOptions options;
 private TupleTag tupleTag;
@@ -121,9 +127,11 @@ public class BeamInputFormat extends InputFormat {
 }
 
 public BeamInputSplit(
+String stepName,
 BoundedSource boundedSource,
 SerializedPipelineOptions options,
 TupleTag tupleTag) {
+  this.stepName = checkNotNull(stepName, "stepName");
   this.boundedSource = checkNotNull(boundedSource, "boundedSources");
   this.options = checkNotNull(options, "options");
   this.tupleTag = checkNotNull(tupleTag, "tupleTag");
@@ -131,7 +139,7 @@ public class BeamInputFormat extends InputFormat {
 
 public BeamRecordReader createReader() throws IOException {
   return new BeamRecordReader<>(
-  boundedSource.createReader(options.getPipelineOptions()), tupleTag);
+  stepName, 

[29/36] beam git commit: mr-runner: ensure Operation only start/finish once for diamond shaped DAG, this fixes ParDoLifecycleTest.

2017-09-07 Thread kenn
mr-runner: ensure Operation only start/finish once for diamond shaped DAG, this 
fixes ParDoLifecycleTest.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8627913e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8627913e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8627913e

Branch: refs/heads/mr-runner
Commit: 8627913eeb0a51a251a953930fc52025dbf8a723
Parents: e330d36
Author: Pei He 
Authored: Fri Sep 1 14:10:22 2017 +0800
Committer: Pei He 
Committed: Fri Sep 1 17:13:53 2017 +0800

--
 .../beam/runners/mapreduce/translation/Operation.java   | 12 
 1 file changed, 12 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/8627913e/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
index bd24f05..a96806d 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Operation.java
@@ -31,12 +31,16 @@ import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 public abstract class Operation implements Serializable {
   private final OutputReceiver[] receivers;
   private SerializableConfiguration conf;
+  private boolean started;
+  private boolean finished;
 
   public Operation(int numOutputs) {
 this.receivers = new OutputReceiver[numOutputs];
 for (int i = 0; i < numOutputs; ++i) {
   receivers[i] = new OutputReceiver();
 }
+this.started = false;
+this.finished = false;
   }
 
   /**
@@ -45,6 +49,10 @@ public abstract class Operation implements Serializable {
* Called after all successors consuming operations have been started.
*/
   public void start(TaskInputOutputContext 
taskContext) {
+if (started) {
+  return;
+}
+started = true;
 conf = new SerializableConfiguration(taskContext.getConfiguration());
 for (OutputReceiver receiver : receivers) {
   if (receiver == null) {
@@ -67,6 +75,10 @@ public abstract class Operation implements Serializable {
* Called after all predecessors producing operations have been finished.
*/
   public void finish() {
+if (finished) {
+  return;
+}
+finished = true;
 for (OutputReceiver receiver : receivers) {
   if (receiver == null) {
 continue;



[04/36] beam git commit: mr-runner: add BeamReducer and support GroupByKey.

2017-09-07 Thread kenn
mr-runner: add BeamReducer and support GroupByKey.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/923190dc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/923190dc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/923190dc

Branch: refs/heads/mr-runner
Commit: 923190dca2426711e30e5c5fe7093e14fcbefe07
Parents: 389b02b
Author: Pei He 
Authored: Wed Jul 26 21:19:30 2017 +0800
Committer: Pei He 
Committed: Thu Aug 31 14:13:47 2017 +0800

--
 .../mapreduce/translation/BeamMapper.java   |  13 +-
 .../mapreduce/translation/BeamReducer.java  |  68 +++
 .../runners/mapreduce/translation/Graph.java|  36 +++---
 .../mapreduce/translation/GraphConverter.java   |  26 +++-
 .../mapreduce/translation/GraphPlanner.java |  28 +++--
 .../GroupAlsoByWindowsParDoOperation.java   |  38 ++
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  | 120 +++
 .../mapreduce/translation/JobPrototype.java |  46 +--
 .../translation/NormalParDoOperation.java   |  49 
 .../mapreduce/translation/Operation.java|  69 +++
 .../mapreduce/translation/OutputReceiver.java   |  12 +-
 .../mapreduce/translation/ParDoOperation.java   |  73 ---
 .../mapreduce/translation/WriteOperation.java   |  52 
 .../beam/runners/mapreduce/WordCountTest.java   |   7 --
 14 files changed, 534 insertions(+), 103 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
index b74797d..11ecc8d 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
@@ -6,8 +6,7 @@ import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 /**
  * Created by peihe on 21/07/2017.
@@ -15,7 +14,7 @@ import org.slf4j.LoggerFactory;
 public class BeamMapper
 extends Mapper {
 
-  public static final String BEAM_SERIALIZED_PAR_DO_OPERATION = 
"beam-serialized-par-do-op";
+  public static final String BEAM_PAR_DO_OPERATION_MAPPER = 
"beam-par-do-op-mapper";
 
   private ParDoOperation parDoOperation;
 
@@ -23,11 +22,11 @@ public class BeamMapper
   protected void setup(
   Mapper.Context context) {
 String serializedParDo = checkNotNull(
-context.getConfiguration().get(BEAM_SERIALIZED_PAR_DO_OPERATION),
-BEAM_SERIALIZED_PAR_DO_OPERATION);
+context.getConfiguration().get(BEAM_PAR_DO_OPERATION_MAPPER),
+BEAM_PAR_DO_OPERATION_MAPPER);
 parDoOperation = (ParDoOperation) 
SerializableUtils.deserializeFromByteArray(
-Base64.decodeBase64(serializedParDo), "DoFn");
-parDoOperation.start();
+Base64.decodeBase64(serializedParDo), "ParDoOperation");
+parDoOperation.start((TaskInputOutputContext) context);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/923190dc/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
new file mode 100644
index 000..8eb7938
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
@@ -0,0 +1,68 @@
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+import com.google.common.collect.FluentIterable;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import 

[21/36] beam git commit: mr-runner: setMetricsSupported to run ValidatesRunner tests with TestPipeline.

2017-09-07 Thread kenn
mr-runner: setMetricsSupported to run ValidatesRunner tests with TestPipeline.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8d3386d4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8d3386d4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8d3386d4

Branch: refs/heads/mr-runner
Commit: 8d3386d479b5704fa9448c7a9b1eab9c66e75549
Parents: 4e7062c
Author: Pei He 
Authored: Wed Aug 30 19:40:24 2017 +0800
Committer: Pei He 
Committed: Fri Sep 1 17:13:39 2017 +0800

--
 runners/map-reduce/pom.xml  | 14 
 .../beam/runners/mapreduce/MapReduceRunner.java |  5 +
 .../translation/ConfigurationUtils.java | 23 +++-
 .../mapreduce/translation/GraphPlanner.java | 11 +-
 .../mapreduce/translation/JobPrototype.java |  4 +++-
 5 files changed, 44 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/8d3386d4/runners/map-reduce/pom.xml
--
diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml
index e858031..d65bb34 100644
--- a/runners/map-reduce/pom.xml
+++ b/runners/map-reduce/pom.xml
@@ -56,6 +56,20 @@
   
 org.apache.beam.sdk.testing.ValidatesRunner
   
+  
+
org.apache.beam.sdk.testing.PAssertTest.java
+  
+  
+org.apache.beam.sdk.testing.UsesSetState,
+org.apache.beam.sdk.testing.UsesSplittableParDo,
+org.apache.beam.sdk.testing.UsesDistributionMetrics,
+org.apache.beam.sdk.testing.UsesGaugeMetrics,
+org.apache.beam.sdk.testing.UsesCommittedMetrics,
+org.apache.beam.sdk.testing.LargeKeys$Above10MB,
+org.apache.beam.sdk.testing.UsesTimersInParDo,
+org.apache.beam.sdk.testing.UsesStatefulParDo,
+org.apache.beam.sdk.testing.UsesTestStream
+  
   none
   true
   

http://git-wip-us.apache.org/repos/asf/beam/blob/8d3386d4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
index 88ed01e..71edf1a 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -31,9 +31,11 @@ import 
org.apache.beam.runners.mapreduce.translation.TranslationContext;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.log4j.BasicConfigurator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,6 +64,9 @@ public class MapReduceRunner extends 
PipelineRunner {
 
   @Override
   public PipelineResult run(Pipeline pipeline) {
+BasicConfigurator.configure();
+MetricsEnvironment.setMetricsSupported(true);
+
 TranslationContext context = new TranslationContext(options);
 GraphConverter graphConverter = new GraphConverter(context);
 pipeline.traverseTopologically(graphConverter);

http://git-wip-us.apache.org/repos/asf/beam/blob/8d3386d4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
index 6d7a81a..4ec50bd 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ConfigurationUtils.java
@@ -17,9 +17,13 @@
  */
 package org.apache.beam.runners.mapreduce.translation;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.ResolveOptions;
 import 

[23/36] beam git commit: mr-runner: fix NPE in PipelineTest.testIdentityTransform().

2017-09-07 Thread kenn
mr-runner: fix NPE in PipelineTest.testIdentityTransform().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/807f9034
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/807f9034
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/807f9034

Branch: refs/heads/mr-runner
Commit: 807f903413bec1d8052406adeac2ddd793765511
Parents: d71975e
Author: Pei He 
Authored: Wed Aug 30 20:01:06 2017 +0800
Committer: Pei He 
Committed: Fri Sep 1 17:13:40 2017 +0800

--
 .../apache/beam/runners/mapreduce/translation/GraphConverter.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/807f9034/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
index de1c80b..458961f 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphConverter.java
@@ -70,7 +70,8 @@ public class GraphConverter extends 
Pipeline.PipelineVisitor.Defaults {
   @Override
   public void leaveCompositeTransform(TransformHierarchy.Node node) {
 if (node.getTransform() != null) {
-  if (enclosedTransformCounts.get(node) > 1) {
+  Integer enclosedTransformCount = enclosedTransformCounts.get(node);
+  if (enclosedTransformCount != null && enclosedTransformCount > 1) {
 dotfileNodesBuilders.peek().insert(0, new StringBuilder()
 .append(getIndent()).append(
 String.format("subgraph \"cluster_%s\" {", node.getFullName()))



[35/36] beam git commit: mr-runner-hack: disable unrelated modules to shorten build time during development.

2017-09-07 Thread kenn
mr-runner-hack: disable unrelated modules to shorten build time during 
development.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9d1db98a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9d1db98a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9d1db98a

Branch: refs/heads/mr-runner
Commit: 9d1db98af41eb5ab375c51a7057bd6b52dbabfc6
Parents: c62b3ad
Author: Pei He 
Authored: Mon Sep 4 11:06:11 2017 +0800
Committer: Pei He 
Committed: Mon Sep 4 11:06:41 2017 +0800

--
 pom.xml   | 2 +-
 sdks/java/pom.xml | 3 ++-
 sdks/pom.xml  | 2 +-
 3 files changed, 4 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/9d1db98a/pom.xml
--
diff --git a/pom.xml b/pom.xml
index e0ec136..25cd51b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -183,7 +183,7 @@
 sdks/java/build-tools
 sdks
 runners
-examples
+
 
 sdks/java/javadoc
   

http://git-wip-us.apache.org/repos/asf/beam/blob/9d1db98a/sdks/java/pom.xml
--
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index 3144193..75b2043 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -37,7 +37,8 @@
  and other project configuration to be used in all modules.
 build-tools -->
 core
-io
+
+io/hadoop-file-system
 maven-archetypes
 extensions
 
   
 
   



[30/36] beam git commit: mr-runner: translate empty flatten into EmptySource, this fixes few empty FalttenTests.

2017-09-07 Thread kenn
mr-runner: translate empty flatten into EmptySource, this fixes few empty 
FalttenTests.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/99bffd2a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/99bffd2a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/99bffd2a

Branch: refs/heads/mr-runner
Commit: 99bffd2a75b7461d15723567a57db6d3b17367cd
Parents: 8627913
Author: Pei He 
Authored: Fri Sep 1 14:11:30 2017 +0800
Committer: Pei He 
Committed: Fri Sep 1 17:13:53 2017 +0800

--
 .../translation/FlattenTranslator.java  | 71 +++-
 1 file changed, 68 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/99bffd2a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
index b966f2a..b869936 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
@@ -17,6 +17,14 @@
  */
 package org.apache.beam.runners.mapreduce.translation;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Flatten;
 
 /**
@@ -26,11 +34,68 @@ public class FlattenTranslator extends 
TransformTranslator.Default transform, 
TranslationContext context) {
 TranslationContext.UserGraphContext userGraphContext = 
context.getUserGraphContext();
-
-Operation operation = new FlattenOperation();
+List inputTags = userGraphContext.getInputTags();
+Operation operation;
+if (inputTags.isEmpty()) {
+  // Create a empty source
+  operation = new SourceReadOperation(new EmptySource(), 
userGraphContext.getOnlyOutputTag());
+} else {
+  operation = new FlattenOperation();
+}
 context.addInitStep(
 Graphs.Step.of(userGraphContext.getStepName(), operation),
-userGraphContext.getInputTags(),
+inputTags,
 userGraphContext.getOutputTags());
   }
+
+  private static class EmptySource extends BoundedSource {
+@Override
+public List> split(
+long desiredBundleSizeBytes, PipelineOptions options) throws Exception 
{
+  return Collections.EMPTY_LIST;
+}
+
+@Override
+public long getEstimatedSizeBytes(PipelineOptions options) throws 
Exception {
+  return 0;
+}
+
+@Override
+public BoundedReader createReader(PipelineOptions options) throws 
IOException {
+  return new BoundedReader() {
+@Override
+public BoundedSource getCurrentSource() {
+  return EmptySource.this;
+}
+
+@Override
+public boolean start() throws IOException {
+  return false;
+}
+
+@Override
+public boolean advance() throws IOException {
+  return false;
+}
+
+@Override
+public Void getCurrent() throws NoSuchElementException {
+  throw new NoSuchElementException();
+}
+
+@Override
+public void close() throws IOException {
+}
+  };
+}
+
+@Override
+public void validate() {
+}
+
+@Override
+public Coder getDefaultOutputCoder() {
+  return VoidCoder.of();
+}
+  }
 }



[32/36] beam git commit: mr-runner: Graph.getSteps() to return with topological order, this fixes few CombineTests.

2017-09-07 Thread kenn
mr-runner: Graph.getSteps() to return with topological order, this fixes few 
CombineTests.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e330d360
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e330d360
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e330d360

Branch: refs/heads/mr-runner
Commit: e330d360b4c90899b0ea94060955d519fd190a95
Parents: 989d7d8
Author: Pei He 
Authored: Fri Sep 1 13:06:49 2017 +0800
Committer: Pei He 
Committed: Fri Sep 1 17:13:53 2017 +0800

--
 .../runners/mapreduce/translation/Graph.java| 34 
 1 file changed, 21 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/e330d360/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
index 66e573f..144f9a4 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.mapreduce.translation;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
@@ -25,6 +27,7 @@ import com.google.common.collect.Sets;
 import com.google.common.graph.ElementOrder;
 import com.google.common.graph.GraphBuilder;
 import com.google.common.graph.MutableGraph;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -91,24 +94,29 @@ public class Graph getSteps() {
-return castToStepList(FluentIterable.from(graph.nodes())
+List ret = new ArrayList<>();
+
+Set pendingNodes = Sets.newHashSet(graph.nodes());
+while (!pendingNodes.isEmpty()) {
+  List readyNodes = new ArrayList<>();
+  for (Vertex v : pendingNodes) {
+if (Sets.intersection(pendingNodes, graph.predecessors(v)).isEmpty()) {
+  readyNodes.add(v);
+}
+  }
+  checkState(
+  !readyNodes.isEmpty(),
+  "No ready nodes found, there are cycles in graph: " + graph);
+  ret.addAll(readyNodes);
+  pendingNodes.removeAll(readyNodes);
+}
+return castToStepList(FluentIterable.from(ret)
 .filter(new Predicate() {
   @Override
   public boolean apply(Vertex input) {
 return input instanceof AbstractStep;
   }}))
-.toSortedList(new Comparator() {
-  @Override
-  public int compare(StepT left, StepT right) {
-if (left.equals(right)) {
-  return 0;
-} else if (com.google.common.graph.Graphs.reachableNodes(graph, 
left).contains(right)) {
-  return -1;
-} else {
-  return 1;
-}
-  }
-});
+.toList();
   }
 
   public List getStartSteps() {



[GitHub] beam pull request #3705: [BEAM-165] Initial implementation of the MapReduce ...

2017-09-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3705


---


[22/36] beam git commit: mr-runner: use the correct step name in ParDoTranslator, this fixes MetricsTest.testAttemptedCounterMetrics().

2017-09-07 Thread kenn
mr-runner: use the correct step name in ParDoTranslator, this fixes 
MetricsTest.testAttemptedCounterMetrics().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b371875
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b371875
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b371875

Branch: refs/heads/mr-runner
Commit: 0b3718756b6c22a9cb58dbdf4a3bc8b41cca3d1c
Parents: 2d8b12a
Author: Pei He 
Authored: Thu Aug 31 19:25:10 2017 +0800
Committer: Pei He 
Committed: Fri Sep 1 17:13:40 2017 +0800

--
 .../apache/beam/runners/mapreduce/translation/ParDoTranslator.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/0b371875/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
index e866fe2..f8f1a02 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoTranslator.java
@@ -31,7 +31,7 @@ class ParDoTranslator
   ParDo.MultiOutput transform, TranslationContext 
context) {
 TranslationContext.UserGraphContext userGraphContext = 
context.getUserGraphContext();
 NormalParDoOperation operation = new NormalParDoOperation(
-transform.getName(),
+userGraphContext.getStepName(),
 transform.getFn(),
 userGraphContext.getOptions(),
 transform.getMainOutputTag(),



[27/36] beam git commit: mr-runner: filter out unsupported features in ValidatesRunner tests.

2017-09-07 Thread kenn
mr-runner: filter out unsupported features in ValidatesRunner tests.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d71975ed
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d71975ed
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d71975ed

Branch: refs/heads/mr-runner
Commit: d71975ed2be21657ada3e66950d4e1b2a4d8b148
Parents: 8d3386d
Author: Pei He 
Authored: Thu Aug 31 19:24:19 2017 +0800
Committer: Pei He 
Committed: Fri Sep 1 17:13:40 2017 +0800

--
 runners/map-reduce/pom.xml | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/d71975ed/runners/map-reduce/pom.xml
--
diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml
index d65bb34..3b253a7 100644
--- a/runners/map-reduce/pom.xml
+++ b/runners/map-reduce/pom.xml
@@ -68,6 +68,7 @@
 org.apache.beam.sdk.testing.LargeKeys$Above10MB,
 org.apache.beam.sdk.testing.UsesTimersInParDo,
 org.apache.beam.sdk.testing.UsesStatefulParDo,
+org.apache.beam.sdk.testing.UsesTimersInParDo,
 org.apache.beam.sdk.testing.UsesTestStream
   
   none



[08/36] beam git commit: mr-runner: support BoundedSource with BeamInputFormat.

2017-09-07 Thread kenn
mr-runner: support BoundedSource with BeamInputFormat.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a884a2f0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a884a2f0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a884a2f0

Branch: refs/heads/mr-runner
Commit: a884a2f0b33b6621ef3a2fff6f5467109707df54
Parents: a8b366d
Author: Pei He 
Authored: Fri Jul 21 13:46:36 2017 +0800
Committer: Pei He 
Committed: Thu Aug 31 14:13:47 2017 +0800

--
 runners/map-reduce/pom.xml  |  14 +-
 .../runners/mapreduce/MapReduceWordCount.java   | 218 +++
 .../mapreduce/translation/BeamInputFormat.java  | 154 +
 .../mapreduce/translation/BeamMapper.java   |  30 +++
 4 files changed, 415 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/a884a2f0/runners/map-reduce/pom.xml
--
diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml
index 2e8a8c9..d18eee8 100644
--- a/runners/map-reduce/pom.xml
+++ b/runners/map-reduce/pom.xml
@@ -84,7 +84,19 @@
   hadoop-mapreduce-client-core
   ${mapreduce.version}
 
-
+
+
+  org.apache.hadoop
+  hadoop-mapreduce-client-common
+  ${mapreduce.version}
+
+
+
+  org.apache.hadoop
+  hadoop-common
+  ${mapreduce.version}
+
+
 
 
   org.apache.beam

http://git-wip-us.apache.org/repos/asf/beam/blob/a884a2f0/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java
new file mode 100644
index 000..4ba3a29
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceWordCount.java
@@ -0,0 +1,218 @@
+package org.apache.beam.runners.mapreduce;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.StringTokenizer;
+
+import javax.annotation.Nullable;
+import org.apache.beam.runners.mapreduce.translation.BeamInputFormat;
+import org.apache.beam.runners.mapreduce.translation.BeamMapper;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.log4j.BasicConfigurator;
+
+public class MapReduceWordCount {
+
+  public static class CreateSource extends OffsetBasedSource {
+private final List allElementsBytes;
+private final long totalSize;
+private final Coder coder;
+
+public static  CreateSource fromIterable(Iterable elements, 
Coder elemCoder)
+throws CoderException, IOException {
+  ImmutableList.Builder allElementsBytes = ImmutableList.builder();
+  long totalSize = 0L;
+  for (T element : elements) {
+byte[] bytes = CoderUtils.encodeToByteArray(elemCoder, element);
+allElementsBytes.add(bytes);
+totalSize += bytes.length;
+  }
+  return new CreateSource<>(allElementsBytes.build(), totalSize, 
elemCoder);
+}
+
+/**
+ * Create a new source with the specified bytes. The new source owns the 
input element bytes,
+ * which must not be modified after this constructor is called.
+ */
+private CreateSource(List elementBytes, long totalSize, Coder 
coder) {
+  super(0, elementBytes.size(), 1);
+  this.allElementsBytes = ImmutableList.copyOf(elementBytes);
+  this.totalSize = totalSize;
+  this.coder = coder;
+}
+
+@Override
+public long getEstimatedSizeBytes(PipelineOptions options) throws 
Exception {
+  return totalSize;
+}
+
+@Override
+public 

[16/36] beam git commit: mr-runner: support side inputs by reading in all views contents.

2017-09-07 Thread kenn
mr-runner: support side inputs by reading in all views contents.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0ebd14c4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0ebd14c4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0ebd14c4

Branch: refs/heads/mr-runner
Commit: 0ebd14c446421bdb29a95ae231975875b4532031
Parents: e562a44
Author: Pei He 
Authored: Tue Aug 8 17:38:58 2017 +0800
Committer: Pei He 
Committed: Thu Aug 31 14:13:49 2017 +0800

--
 .../translation/FileReadOperation.java  |   2 +-
 .../translation/FileSideInputReader.java| 128 +++
 .../runners/mapreduce/translation/Graphs.java   |   3 +-
 .../GroupAlsoByWindowsParDoOperation.java   |   3 +-
 .../translation/NormalParDoOperation.java   |   3 +-
 .../mapreduce/translation/ParDoOperation.java   |   6 +-
 .../mapreduce/translation/ParDoTranslator.java  |   1 +
 .../ReifyTimestampAndWindowsParDoOperation.java |   4 +-
 .../translation/TranslationContext.java |  25 
 9 files changed, 168 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/0ebd14c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
index 6bd893a..70263c3 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
@@ -95,7 +95,7 @@ public class FileReadOperation extends 
SourceOperation {
   FileStatus[] files = fs.globStatus(pattern);
   Queue readers = new LinkedList<>();
   for (FileStatus f : files) {
-readers.add(new SequenceFile.Reader(fs, files[0].getPath(), conf));
+readers.add(new SequenceFile.Reader(fs, f.getPath(), conf));
   }
   return new Reader<>(this, readers, coder);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0ebd14c4/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
new file mode 100644
index 000..18bff2a
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce.translation;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+
+/**
+ * Files based {@link 

[10/36] beam git commit: mr-runner: support graph visualization with dotfiles.

2017-09-07 Thread kenn
mr-runner: support graph visualization with dotfiles.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/98da2a2a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/98da2a2a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/98da2a2a

Branch: refs/heads/mr-runner
Commit: 98da2a2ac88c544dc3623b4f6bbe1cbbfaf569b2
Parents: 16e6320
Author: Pei He 
Authored: Wed Aug 2 19:19:14 2017 +0800
Committer: Pei He 
Committed: Thu Aug 31 14:13:48 2017 +0800

--
 .../mapreduce/MapReducePipelineOptions.java |  1 -
 .../beam/runners/mapreduce/MapReduceRunner.java | 12 ++-
 .../mapreduce/translation/DotfileWriter.java| 54 
 .../mapreduce/translation/GraphConverter.java   | 88 +++-
 .../runners/mapreduce/translation/Graphs.java   | 19 +++--
 .../mapreduce/translation/JobPrototype.java |  6 +-
 .../translation/TranslationContext.java |  6 +-
 7 files changed, 171 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/98da2a2a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
index 73c7d47..c37da58 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
@@ -38,7 +38,6 @@ public interface MapReducePipelineOptions extends 
PipelineOptions {
   PipelineOptionsFactory.Builder.class.getName(),
   "org.apache.beam.sdk.options.ProxyInvocationHandler");
 
-
   @Description("The jar class of the user Beam program.")
   @Default.InstanceFactory(JarClassInstanceFactory.class)
   Class getJarClass();

http://git-wip-us.apache.org/repos/asf/beam/blob/98da2a2a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
index b6a82d1..c5626a4 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -20,9 +20,10 @@ package org.apache.beam.runners.mapreduce;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.base.Throwables;
-import org.apache.beam.runners.mapreduce.translation.Graphs;
+import org.apache.beam.runners.mapreduce.translation.DotfileWriter;
 import org.apache.beam.runners.mapreduce.translation.GraphConverter;
 import org.apache.beam.runners.mapreduce.translation.GraphPlanner;
+import org.apache.beam.runners.mapreduce.translation.Graphs;
 import org.apache.beam.runners.mapreduce.translation.JobPrototype;
 import org.apache.beam.runners.mapreduce.translation.TranslationContext;
 import org.apache.beam.sdk.Pipeline;
@@ -31,12 +32,16 @@ import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * {@link PipelineRunner} for MapReduce.
  */
 public class MapReduceRunner extends PipelineRunner {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(MapReduceRunner.class);
+
   /**
* Construct a runner from the provided options.
*
@@ -59,8 +64,13 @@ public class MapReduceRunner extends 
PipelineRunner {
 GraphConverter graphConverter = new GraphConverter(context);
 pipeline.traverseTopologically(graphConverter);
 
+LOG.info(graphConverter.getDotfile());
+
 GraphPlanner planner = new GraphPlanner();
 Graphs.FusedGraph fusedGraph = planner.plan(context.getInitGraph());
+
+LOG.info(DotfileWriter.toDotfile(fusedGraph));
+
 int stageId = 0;
 for (Graphs.FusedStep fusedStep : fusedGraph.getFusedSteps()) {
   JobPrototype jobPrototype = JobPrototype.create(stageId++, fusedStep, 
options);

http://git-wip-us.apache.org/repos/asf/beam/blob/98da2a2a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/DotfileWriter.java
 

[26/36] beam git commit: mr-runner: use InMemoryStateInternals in ParDoOperation, this fixed ParDoTest that uses state.

2017-09-07 Thread kenn
mr-runner: use InMemoryStateInternals in ParDoOperation, this fixed ParDoTest 
that uses state.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9f312c56
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9f312c56
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9f312c56

Branch: refs/heads/mr-runner
Commit: 9f312c561a7a21c92072e91eebdca7fb6f72c9eb
Parents: 0b37187
Author: Pei He 
Authored: Thu Aug 31 21:01:59 2017 +0800
Committer: Pei He 
Committed: Fri Sep 1 17:13:40 2017 +0800

--
 .../mapreduce/translation/ParDoOperation.java   | 29 +++-
 1 file changed, 28 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/9f312c56/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
index 2c2fbde..ef83e72 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/ParDoOperation.java
@@ -28,7 +28,12 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.InMemoryStateInternals;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StepContext;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -91,6 +96,18 @@ public abstract class ParDoOperation 
extends Operation
 for (Graphs.Tag tag : sideInputTags) {
   tupleTagToCoder.put(tag.getTupleTag(), tag.getCoder());
 }
+
+final StateInternals stateInternals;
+try {
+  stateInternals = 
InMemoryStateInternals.forKey(taskContext.getCurrentKey());
+} catch (IOException | InterruptedException e) {
+  if (e instanceof InterruptedException) {
+Thread.currentThread().interrupt();
+  }
+  throw new RuntimeException(e);
+}
+final TimerInternals timerInternals = new InMemoryTimerInternals();
+
 fnRunner = DoFnRunners.simpleRunner(
 options.getPipelineOptions(),
 getDoFn(),
@@ -100,7 +117,17 @@ public abstract class ParDoOperation 
extends Operation
 createOutputManager(),
 mainOutputTag,
 sideOutputTags,
-null,
+new StepContext() {
+  @Override
+  public StateInternals stateInternals() {
+return stateInternals;
+  }
+
+  @Override
+  public TimerInternals timerInternals() {
+return timerInternals;
+  }
+},
 windowingStrategy);
 
 try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(



[28/36] beam git commit: mr-runner: fail early in the runner when MapReduce job fails.

2017-09-07 Thread kenn
mr-runner: fail early in the runner when MapReduce job fails.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/989d7d8e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/989d7d8e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/989d7d8e

Branch: refs/heads/mr-runner
Commit: 989d7d8e4eabe6aafd2008eed16f533ec75a43d1
Parents: 9f312c5
Author: Pei He 
Authored: Fri Sep 1 13:04:54 2017 +0800
Committer: Pei He 
Committed: Fri Sep 1 17:13:52 2017 +0800

--
 .../java/org/apache/beam/runners/mapreduce/MapReduceRunner.java  | 4 
 1 file changed, 4 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/989d7d8e/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
index 71edf1a..8198848 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.log4j.BasicConfigurator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,6 +96,9 @@ public class MapReduceRunner extends 
PipelineRunner {
   try {
 Job job = jobPrototype.build(options.getJarClass(), config);
 job.waitForCompletion(true);
+if (!job.getStatus().getState().equals(JobStatus.State.SUCCEEDED)) {
+  throw new RuntimeException("MapReduce job failed: " + 
job.getJobID());
+}
 jobs.add(job);
   } catch (Exception e) {
 Throwables.throwIfUnchecked(e);



[31/36] beam git commit: mr-runner: introduces duplicateFactor in FlattenOperation, this fixes testFlattenInputMultipleCopies().

2017-09-07 Thread kenn
mr-runner: introduces duplicateFactor in FlattenOperation, this fixes 
testFlattenInputMultipleCopies().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5248ce42
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5248ce42
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5248ce42

Branch: refs/heads/mr-runner
Commit: 5248ce42f3ab31e8952f6604ef804b342c57d962
Parents: 99bffd2
Author: Pei He 
Authored: Fri Sep 1 15:10:48 2017 +0800
Committer: Pei He 
Committed: Fri Sep 1 17:13:53 2017 +0800

--
 .../mapreduce/translation/FlattenOperation.java |  9 ++-
 .../translation/FlattenTranslator.java  | 69 +---
 2 files changed, 67 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/5248ce42/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java
index 191b346..3c5ac95 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java
@@ -24,14 +24,19 @@ import org.apache.beam.sdk.util.WindowedValue;
  */
 public class FlattenOperation extends Operation {
 
-  public FlattenOperation() {
+  private final int duplicateFactor;
+
+  public FlattenOperation(int duplicateFactor) {
 super(1);
+this.duplicateFactor = duplicateFactor;
   }
 
   @Override
   public void process(WindowedValue elem) {
 for (OutputReceiver receiver : getOutputReceivers()) {
-  receiver.process(elem);
+  for (int i = 0; i < duplicateFactor; ++i) {
+receiver.process(elem);
+  }
 }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/5248ce42/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
index b869936..817f2bf 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
@@ -17,15 +17,22 @@
  */
 package org.apache.beam.runners.mapreduce.translation;
 
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Translates a {@link Flatten} to a {@link FlattenOperation}.
@@ -34,18 +41,62 @@ public class FlattenTranslator extends 
TransformTranslator.Default transform, 
TranslationContext context) {
 TranslationContext.UserGraphContext userGraphContext = 
context.getUserGraphContext();
-List inputTags = userGraphContext.getInputTags();
-Operation operation;
-if (inputTags.isEmpty()) {
+
+Map inputTagToCount = Maps.newHashMap();
+boolean containsDuplicates = false;
+for (Graphs.Tag inputTag : userGraphContext.getInputTags()) {
+  Integer count = inputTagToCount.get(inputTag);
+  if (count == null) {
+count = Integer.valueOf(0);
+  }
+  inputTagToCount.put(inputTag, ++count);
+  if (count > 1) {
+containsDuplicates = true;
+  }
+}
+
+if (inputTagToCount.isEmpty()) {
   // Create a empty source
-  operation = new SourceReadOperation(new EmptySource(), 
userGraphContext.getOnlyOutputTag());
+  Operation operation =
+  new SourceReadOperation(new EmptySource(), 
userGraphContext.getOnlyOutputTag());
+  context.addInitStep(
+  Graphs.Step.of(userGraphContext.getStepName(), operation),
+  userGraphContext.getInputTags(),
+  userGraphContext.getOutputTags());
+} else if (!containsDuplicates) {
+  Operation operation = new 

[15/36] beam git commit: mr-runner: setup file paths for read and write sides of materialization.

2017-09-07 Thread kenn
mr-runner: setup file paths for read and write sides of materialization.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b87ae78b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b87ae78b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b87ae78b

Branch: refs/heads/mr-runner
Commit: b87ae78b5e9c204e03c01b986abf8dc185b6a9ef
Parents: 0ebd14c
Author: Pei He 
Authored: Tue Aug 8 22:07:12 2017 +0800
Committer: Pei He 
Committed: Thu Aug 31 14:13:49 2017 +0800

--
 runners/map-reduce/pom.xml  |  5 ++
 .../mapreduce/MapReducePipelineOptions.java |  7 ++-
 .../beam/runners/mapreduce/MapReduceRunner.java |  8 +--
 .../mapreduce/translation/BeamInputFormat.java  | 20 +++
 .../translation/ConfigurationUtils.java | 52 +
 .../translation/FileReadOperation.java  | 41 -
 .../translation/FileSideInputReader.java| 41 ++---
 .../mapreduce/translation/GraphPlanner.java | 55 +-
 .../GroupAlsoByWindowsParDoOperation.java   |  1 -
 .../mapreduce/translation/JobPrototype.java | 29 +++---
 .../mapreduce/translation/Operation.java|  6 ++
 .../mapreduce/translation/ParDoOperation.java   | 26 -
 .../translation/PartitionOperation.java | 20 +++
 .../translation/ReadBoundedTranslator.java  |  6 +-
 .../mapreduce/translation/ReadOperation.java| 57 ++
 .../ReifyTimestampAndWindowsParDoOperation.java |  2 -
 .../translation/SerializableConfiguration.java  | 52 +
 .../mapreduce/translation/SourceOperation.java  | 61 
 .../translation/SourceReadOperation.java| 42 ++
 .../mapreduce/translation/ViewTranslator.java   |  2 +-
 .../mapreduce/translation/GraphPlannerTest.java |  2 +-
 21 files changed, 370 insertions(+), 165 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/pom.xml
--
diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml
index 06e5227..e858031 100644
--- a/runners/map-reduce/pom.xml
+++ b/runners/map-reduce/pom.xml
@@ -113,6 +113,11 @@
   beam-runners-core-construction-java
 
 
+
+  org.apache.beam
+  beam-sdks-java-io-hadoop-file-system
+
+
 
 
 com.google.auto.service

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
index 9224eb6..cfbc006 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
@@ -43,9 +43,10 @@ public interface MapReducePipelineOptions extends 
PipelineOptions {
   Class getJarClass();
   void setJarClass(Class jarClass);
 
-  @Description("The jar class of the user Beam program.")
-  String getTmpDir();
-  void setTmpDir(String tmpDir);
+  @Description("The directory for files output.")
+  @Default.String("/tmp/mapreduce/")
+  String getFileOutputDir();
+  void setFileOutputDir(String fileOutputDir);
 
   class JarClassInstanceFactory implements DefaultValueFactory {
 @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/b87ae78b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
index a7e75bb..3f76808 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -69,18 +69,18 @@ public class MapReduceRunner extends 
PipelineRunner {
 Graphs.FusedGraph fusedGraph = new 
Graphs.FusedGraph(context.getInitGraph());
 LOG.info(DotfileWriter.toDotfile(fusedGraph));
 
-GraphPlanner planner = new GraphPlanner();
+GraphPlanner planner = new GraphPlanner(options);
 fusedGraph = planner.plan(fusedGraph);
 
 LOG.info(DotfileWriter.toDotfile(fusedGraph));
 
-Configuration config = new Configuration();
-config.set("keep.failed.task.files", "true");
-
 

[19/36] beam git commit: [BEAM-2783] support metrics in MapReduceRunner.

2017-09-07 Thread kenn
[BEAM-2783] support metrics in MapReduceRunner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6c2390a1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6c2390a1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6c2390a1

Branch: refs/heads/mr-runner
Commit: 6c2390a1f7d7d912d186e84eed18f94e36d2a65f
Parents: b87ae78
Author: Pei He 
Authored: Wed Aug 30 17:11:07 2017 +0800
Committer: Pei He 
Committed: Thu Aug 31 14:13:49 2017 +0800

--
 .../mapreduce/MapReducePipelineResult.java  |  61 +++
 .../beam/runners/mapreduce/MapReduceRunner.java |   6 +-
 .../GroupAlsoByWindowsParDoOperation.java   |   3 +-
 .../mapreduce/translation/JobPrototype.java |   8 +-
 .../translation/MapReduceMetricResults.java | 106 +++
 .../mapreduce/translation/MetricsReporter.java  |  97 +
 .../translation/NormalParDoOperation.java   |   3 +-
 .../mapreduce/translation/ParDoOperation.java   |  33 +-
 .../mapreduce/translation/ParDoTranslator.java  |   1 +
 .../ReifyTimestampAndWindowsParDoOperation.java |   3 +-
 .../beam/runners/mapreduce/WordCountTest.java   |  16 ++-
 11 files changed, 325 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/6c2390a1/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
new file mode 100644
index 000..90c521a
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.mapreduce;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.runners.mapreduce.translation.MapReduceMetricResults;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.hadoop.mapreduce.Job;
+import org.joda.time.Duration;
+
+public class MapReducePipelineResult implements PipelineResult {
+
+  private final List jobs;
+  public MapReducePipelineResult(List jobs) {
+this.jobs = checkNotNull(jobs, "jobs");
+  }
+
+  @Override
+  public State getState() {
+return State.DONE;
+  }
+
+  @Override
+  public State cancel() throws IOException {
+return State.DONE;
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration) {
+return State.DONE;
+  }
+
+  @Override
+  public State waitUntilFinish() {
+return State.DONE;
+  }
+
+  @Override
+  public MetricResults metrics() {
+return new MapReduceMetricResults(jobs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6c2390a1/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
index 3f76808..88ed01e 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.mapreduce;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.base.Throwables;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.beam.runners.mapreduce.translation.DotfileWriter;
 import org.apache.beam.runners.mapreduce.translation.GraphConverter;
 import org.apache.beam.runners.mapreduce.translation.GraphPlanner;
@@ -76,6 

[34/36] beam git commit: mr-runner: Removes WordCountTest, fixes checkstyle, findbugs, and addressed comments.

2017-09-07 Thread kenn
mr-runner: Removes WordCountTest, fixes checkstyle, findbugs, and addressed 
comments.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/32aeb7ac
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/32aeb7ac
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/32aeb7ac

Branch: refs/heads/mr-runner
Commit: 32aeb7ac3d49ade0dc3ad79e711e7b624091d485
Parents: 9d1db98
Author: Pei He 
Authored: Thu Aug 31 16:21:17 2017 +0800
Committer: Pei He 
Committed: Mon Sep 4 11:06:41 2017 +0800

--
 runners/map-reduce/pom.xml  |   2 +-
 .../mapreduce/MapReducePipelineOptions.java |   3 +
 .../mapreduce/MapReducePipelineResult.java  |   6 +
 .../runners/mapreduce/MapReduceRegistrar.java   |   6 +
 .../beam/runners/mapreduce/MapReduceRunner.java |   5 +-
 .../beam/runners/mapreduce/package-info.java|   2 +-
 .../mapreduce/translation/BeamInputFormat.java  |   2 +-
 .../mapreduce/translation/BeamMapper.java   |   4 +-
 .../mapreduce/translation/BeamReducer.java  |   6 +-
 .../translation/ConfigurationUtils.java |   1 -
 .../mapreduce/translation/DotfileWriter.java|  22 ++--
 .../translation/FileReadOperation.java  |   1 -
 .../runners/mapreduce/translation/Graph.java|   8 +-
 .../mapreduce/translation/GraphConverter.java   |   2 +-
 .../mapreduce/translation/GraphPlanner.java |   1 -
 .../runners/mapreduce/translation/Graphs.java   |  13 +++
 .../translation/PartitionOperation.java |   2 -
 .../ReifyTimestampAndWindowsParDoOperation.java |   2 +-
 .../translation/ShuffleWriteOperation.java  |   1 -
 .../translation/TranslationContext.java |   7 +-
 .../beam/runners/mapreduce/WordCountTest.java   | 117 ---
 21 files changed, 65 insertions(+), 148 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/pom.xml
--
diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml
index 3b253a7..90d876b 100644
--- a/runners/map-reduce/pom.xml
+++ b/runners/map-reduce/pom.xml
@@ -24,7 +24,7 @@
 ../pom.xml
   
   
-  beam-runners-map-reduce
+  beam-runners-mapreduce
 
   Apache Beam :: Runners :: MapReduce
 

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
index cfbc006..7cff40d 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
@@ -48,6 +48,9 @@ public interface MapReducePipelineOptions extends 
PipelineOptions {
   String getFileOutputDir();
   void setFileOutputDir(String fileOutputDir);
 
+  /**
+   * Returns the {@link Class} that constructs MapReduce job through Beam.
+   */
   class JarClassInstanceFactory implements DefaultValueFactory {
 @Override
 public Class create(PipelineOptions options) {

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
index 90c521a..933d8f6 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineResult.java
@@ -27,6 +27,12 @@ import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.hadoop.mapreduce.Job;
 import org.joda.time.Duration;
 
+/**
+ * A {@link PipelineResult} of executing {@link org.apache.beam.sdk.Pipeline 
Pipelines} using
+ * {@link MapReduceRunner}.
+ *
+ * It is synchronous (returned after the pipeline is finished), and is used 
for querying metrics.
+ */
 public class MapReducePipelineResult implements PipelineResult {
 
   private final List jobs;

http://git-wip-us.apache.org/repos/asf/beam/blob/32aeb7ac/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java
--
diff --git 

[18/36] beam git commit: mr-runner: support multiple SourceOperations by composing and partitioning.

2017-09-07 Thread kenn
mr-runner: support multiple SourceOperations by composing and partitioning.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e562a443
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e562a443
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e562a443

Branch: refs/heads/mr-runner
Commit: e562a4432d407759876e147fdeb132518a1c9637
Parents: 40396d7
Author: Pei He 
Authored: Tue Aug 8 15:49:04 2017 +0800
Committer: Pei He 
Committed: Thu Aug 31 14:13:49 2017 +0800

--
 .../mapreduce/translation/BeamInputFormat.java  | 72 +---
 .../mapreduce/translation/BeamMapper.java   |  3 +-
 .../translation/FileReadOperation.java  |  9 ++-
 .../translation/FileWriteOperation.java |  8 +--
 .../mapreduce/translation/GraphPlanner.java | 51 --
 .../runners/mapreduce/translation/Graphs.java   |  2 +-
 .../mapreduce/translation/JobPrototype.java | 28 ++--
 .../mapreduce/translation/Operation.java|  1 +
 .../translation/PartitionOperation.java | 72 
 .../translation/ReadBoundedTranslator.java  |  3 +-
 .../mapreduce/translation/SourceOperation.java  | 24 +--
 .../mapreduce/translation/ViewOperation.java| 59 
 .../mapreduce/translation/ViewTranslator.java   | 19 --
 13 files changed, 224 insertions(+), 127 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/e562a443/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
index 03a88aa..23534de 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamInputFormat.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -52,7 +53,7 @@ public class BeamInputFormat extends InputFormat {
 
   private static final long DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES = 5 * 1000 
* 1000;
 
-  private List sources;
+  private List sources;
   private SerializedPipelineOptions options;
 
   public BeamInputFormat() {
@@ -67,30 +68,37 @@ public class BeamInputFormat extends InputFormat {
 || Strings.isNullOrEmpty(serializedPipelineOptions)) {
   return ImmutableList.of();
 }
-sources = (List) 
SerializableUtils.deserializeFromByteArray(
-Base64.decodeBase64(serializedBoundedSource), "BoundedSource");
+sources = (List) 
SerializableUtils.deserializeFromByteArray(
+Base64.decodeBase64(serializedBoundedSource), "TaggedSources");
 options = ((SerializedPipelineOptions) 
SerializableUtils.deserializeFromByteArray(
 Base64.decodeBase64(serializedPipelineOptions), 
"SerializedPipelineOptions"));
 
 try {
 
   return FluentIterable.from(sources)
-  .transformAndConcat(new Function>() {
+  .transformAndConcat(
+  new Function() {
+@Override
+public Iterable apply(
+final SourceOperation.TaggedSource taggedSource) {
+  try {
+return FluentIterable.from(taggedSource.getSource().split(
+DEFAULT_DESIRED_BUNDLE_SIZE_SIZE_BYTES, 
options.getPipelineOptions()))
+.transform(new Function() {
+  @Override
+  public SourceOperation.TaggedSource 
apply(BoundedSource input) {
+return SourceOperation.TaggedSource.of(input, 
taggedSource.getTag());
+  }});
+  } catch (Exception e) {
+Throwables.throwIfUnchecked(e);
+throw new RuntimeException(e);
+  }
+}
+  })
+  .transform(new Function() {
 @Override
-public Iterable apply(BoundedSource input) {
-  try {

[36/36] beam git commit: This closes #3705: [BEAM-165] Initial implementation of the MapReduce runner

2017-09-07 Thread kenn
This closes #3705: [BEAM-165] Initial implementation of the MapReduce runner

  mr-runner: Removes WordCountTest, fixes checkstyle, findbugs, and addressed 
comments.
  mr-runner-hack: disable unrelated modules to shorten build time during 
development.
  mr-runner: support SourceMetrics, this fixes 
MetricsTest.testBoundedSourceMetrics().
  mr-runner: introduces duplicateFactor in FlattenOperation, this fixes 
testFlattenInputMultipleCopies().
  mr-runner: translate empty flatten into EmptySource, this fixes few empty 
FalttenTests.
  mr-runner: ensure Operation only start/finish once for diamond shaped DAG, 
this fixes ParDoLifecycleTest.
  mr-runner: Graph.getSteps() to return with topological order, this fixes few 
CombineTests.
  mr-runner: fail early in the runner when MapReduce job fails.
  mr-runner: use InMemoryStateInternals in ParDoOperation, this fixed ParDoTest 
that uses state.
  mr-runner: use the correct step name in ParDoTranslator, this fixes 
MetricsTest.testAttemptedCounterMetrics().
  mr-runner: remove the hard-coded GlobalWindow coder, and fixes WindowingTest.
  mr-runner: handle no files case in FileSideInputReader for empty views.
  mr-runner: fix NPE in PipelineTest.testIdentityTransform().
  mr-runner: filter out unsupported features in ValidatesRunner tests.
  mr-runner: setMetricsSupported to run ValidatesRunner tests with TestPipeline.
  mr-runner: fix the bug that steps are attached multiple times in diamond 
shaped DAG.
  [BEAM-2783] support metrics in MapReduceRunner.
  mr-runner: setup file paths for read and write sides of materialization.
  mr-runner: support side inputs by reading in all views contents.
  mr-runner: support multiple SourceOperations by composing and partitioning.
  mr-runner: support PCollections materialization with multiple MR jobs.
  mr-runner: hack to get around that ViewAsXXX.expand() return wrong output 
PValue.
  mr-runner: support graph visualization with dotfiles.
  mr-runner: refactors and creates Graph data structures to handle general Beam 
pipelines.
  mr-runner: add JarClassInstanceFactory to run ValidatesRunner tests.
  mr-runner: support reduce side ParDos and WordCount.
  core-java: InMemoryTimerInternals expose getTimers() for timer firings in 
mr-runner.
  mr-runner: add BeamReducer and support GroupByKey.
  mr-runner: add ParDoOperation and support ParDos chaining.
  mr-runner: add JobPrototype and translate it to a MR job.
  mr-runner: support BoundedSource with BeamInputFormat.
  MapReduceRunner: add unit tests for GraphConverter and GraphPlanner.
  MapReduceRunner: add Graph and its visitors.
  Initial commit for MapReduceRunner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5fa0b14d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5fa0b14d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5fa0b14d

Branch: refs/heads/mr-runner
Commit: 5fa0b14d20fab007d9e2d954eb4a34155a6f199f
Parents: 2fa4fde 32aeb7a
Author: Kenneth Knowles 
Authored: Thu Sep 7 11:38:22 2017 -0700
Committer: Kenneth Knowles 
Committed: Thu Sep 7 11:38:22 2017 -0700

--
 pom.xml |   2 +-
 .../runners/core/InMemoryTimerInternals.java|   5 +
 runners/map-reduce/pom.xml  | 197 ++
 .../mapreduce/MapReducePipelineOptions.java |  89 +++
 .../mapreduce/MapReducePipelineResult.java  |  67 +
 .../runners/mapreduce/MapReduceRegistrar.java   |  55 
 .../beam/runners/mapreduce/MapReduceRunner.java | 113 
 .../beam/runners/mapreduce/package-info.java|  21 ++
 .../mapreduce/translation/BeamInputFormat.java  | 257 ++
 .../mapreduce/translation/BeamMapper.java   |  71 +
 .../mapreduce/translation/BeamReducer.java  | 104 
 .../translation/ConfigurationUtils.java |  62 +
 .../mapreduce/translation/DotfileWriter.java|  93 +++
 .../translation/FileReadOperation.java  | 181 +
 .../translation/FileSideInputReader.java| 122 +
 .../translation/FileWriteOperation.java |  73 +
 .../mapreduce/translation/FlattenOperation.java |  42 +++
 .../translation/FlattenTranslator.java  | 154 +++
 .../runners/mapreduce/translation/Graph.java| 218 +++
 .../mapreduce/translation/GraphConverter.java   | 178 +
 .../mapreduce/translation/GraphPlanner.java | 144 ++
 .../runners/mapreduce/translation/Graphs.java   | 267 +++
 .../GroupAlsoByWindowsParDoOperation.java   |  56 
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  | 142 ++
 .../translation/GroupByKeyOperation.java|  54 
 .../translation/GroupByKeyTranslator.java   |  45 
 .../mapreduce/translation/JobPrototype.java | 211 +++
 

[12/36] beam git commit: mr-runner: refactors and creates Graph data structures to handle general Beam pipelines.

2017-09-07 Thread kenn
mr-runner: refactors and creates Graph data structures to handle general Beam 
pipelines.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/16e63205
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/16e63205
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/16e63205

Branch: refs/heads/mr-runner
Commit: 16e63205bee3ade711ebffc3c74e18aec6d50c01
Parents: ee1cce9
Author: Pei He 
Authored: Fri Jul 28 16:31:41 2017 +0800
Committer: Pei He 
Committed: Thu Aug 31 14:13:48 2017 +0800

--
 .../mapreduce/MapReducePipelineOptions.java |  17 +
 .../runners/mapreduce/MapReduceRegistrar.java   |  17 +
 .../beam/runners/mapreduce/MapReduceRunner.java |  48 ++-
 .../runners/mapreduce/MapReduceWordCount.java   | 218 --
 .../beam/runners/mapreduce/package-info.java|  21 +
 .../mapreduce/translation/BeamInputFormat.java  |  21 +-
 .../mapreduce/translation/BeamMapper.java   |  19 +-
 .../mapreduce/translation/BeamReducer.java  |  20 +-
 .../mapreduce/translation/FlattenOperation.java |  37 ++
 .../translation/FlattenTranslator.java  |  37 ++
 .../runners/mapreduce/translation/Graph.java| 400 +++
 .../mapreduce/translation/GraphConverter.java   | 108 ++---
 .../mapreduce/translation/GraphPlanner.java | 142 ++-
 .../mapreduce/translation/GraphVisitor.java |  11 -
 .../runners/mapreduce/translation/Graphs.java   | 188 +
 .../GroupAlsoByWindowsParDoOperation.java   |  24 +-
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |  17 +
 .../translation/GroupByKeyOperation.java|  54 +++
 .../translation/GroupByKeyTranslator.java   |  46 +++
 .../mapreduce/translation/JobPrototype.java | 257 +---
 .../translation/NormalParDoOperation.java   |  12 +-
 .../mapreduce/translation/Operation.java|  30 +-
 .../mapreduce/translation/OutputReceiver.java   |   3 +-
 .../mapreduce/translation/ParDoOperation.java   |  14 +-
 .../mapreduce/translation/ParDoTranslator.java  |  46 +++
 .../translation/ReadBoundedTranslator.java  |  37 ++
 .../mapreduce/translation/ReadOperation.java|  45 +++
 .../ReifyTimestampAndWindowsParDoOperation.java |  24 +-
 .../translation/TransformTranslator.java|  48 +++
 .../translation/TranslationContext.java | 128 ++
 .../translation/TranslatorRegistry.java |  58 +++
 .../mapreduce/translation/ViewOperation.java|  59 +++
 .../mapreduce/translation/ViewTranslator.java   |  42 ++
 .../translation/WindowAssignOperation.java  |  35 +-
 .../translation/WindowAssignTranslator.java |  38 ++
 .../mapreduce/translation/WriteOperation.java   |  33 +-
 .../beam/runners/mapreduce/WordCountTest.java   |  25 +-
 .../translation/GraphConverterTest.java |  33 +-
 .../mapreduce/translation/GraphPlannerTest.java |  37 +-
 39 files changed, 1568 insertions(+), 881 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
index 7fe66ba..73c7d47 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce;
 
 import com.google.common.collect.ImmutableSet;

http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java
--
diff --git 

[05/36] beam git commit: mr-runner: add ParDoOperation and support ParDos chaining.

2017-09-07 Thread kenn
mr-runner: add ParDoOperation and support ParDos chaining.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/389b02b5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/389b02b5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/389b02b5

Branch: refs/heads/mr-runner
Commit: 389b02b576e1d9ea5123905048de3004e462a89a
Parents: 0cbdc5b
Author: Pei He 
Authored: Tue Jul 25 21:44:34 2017 +0800
Committer: Pei He 
Committed: Thu Aug 31 14:13:47 2017 +0800

--
 .../beam/runners/mapreduce/MapReduceRunner.java |   6 +-
 .../mapreduce/translation/BeamMapper.java   |  57 ++-
 .../runners/mapreduce/translation/Graph.java|  71 +
 .../mapreduce/translation/GraphConverter.java   |  29 ++--
 .../mapreduce/translation/GraphPlanner.java |  25 ++--
 .../mapreduce/translation/JobPrototype.java |  71 +
 .../mapreduce/translation/OutputReceiver.java   |  53 +++
 .../mapreduce/translation/ParDoOperation.java   | 149 +++
 .../translation/SerializedPipelineOptions.java  |  76 ++
 .../mapreduce/translation/package-info.java |  22 +++
 .../beam/runners/mapreduce/WordCountTest.java   |   4 +-
 11 files changed, 438 insertions(+), 125 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
index 0e3142c..11ac9a7 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -3,7 +3,6 @@ package org.apache.beam.runners.mapreduce;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.base.Throwables;
-import java.io.IOException;
 import org.apache.beam.runners.mapreduce.translation.Graph;
 import org.apache.beam.runners.mapreduce.translation.GraphConverter;
 import org.apache.beam.runners.mapreduce.translation.GraphPlanner;
@@ -48,10 +47,7 @@ public class MapReduceRunner extends 
PipelineRunner {
 GraphPlanner planner = new GraphPlanner();
 Graph fusedGraph = planner.plan(graph);
 for (Graph.Vertex vertex : fusedGraph.getAllVertices()) {
-  if (vertex.getTransform() instanceof GroupByKey
-  || vertex.getTransform() instanceof Read.Bounded) {
-continue;
-  } else {
+  if (vertex.getStep().getTransform() instanceof GroupByKey) {
 JobPrototype jobPrototype = JobPrototype.create(1, vertex);
 try {
   Job job = jobPrototype.build(options.getJarClass(), new 
Configuration());

http://git-wip-us.apache.org/repos/asf/beam/blob/389b02b5/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
index 9d2f80d..b74797d 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
@@ -2,17 +2,8 @@ package org.apache.beam.runners.mapreduce.translation;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.NullSideInputReader;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.slf4j.Logger;
@@ -24,29 +15,19 @@ import org.slf4j.LoggerFactory;
 public class BeamMapper
 extends Mapper {
 
-  public static final String BEAM_SERIALIZED_DO_FN = "beam-serialized-do-fn";
-  private static final Logger LOG = LoggerFactory.getLogger(BeamMapper.class);
+  public static final String BEAM_SERIALIZED_PAR_DO_OPERATION = 

[17/36] beam git commit: mr-runner: support PCollections materialization with multiple MR jobs.

2017-09-07 Thread kenn
mr-runner: support PCollections materialization with multiple MR jobs.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/40396d75
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/40396d75
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/40396d75

Branch: refs/heads/mr-runner
Commit: 40396d758ad21e4938d395007583bf7c61ebdd97
Parents: 5905efd
Author: Pei He 
Authored: Tue Aug 8 11:30:29 2017 +0800
Committer: Pei He 
Committed: Thu Aug 31 14:13:49 2017 +0800

--
 .../mapreduce/MapReducePipelineOptions.java |   4 +
 .../beam/runners/mapreduce/MapReduceRunner.java |  15 +-
 .../mapreduce/translation/BeamInputFormat.java  |  85 +++---
 .../mapreduce/translation/BeamMapper.java   |   8 +-
 .../mapreduce/translation/BeamReducer.java  |  20 +--
 .../mapreduce/translation/DotfileWriter.java|  53 +-
 .../translation/FileReadOperation.java  | 165 +++
 .../translation/FileWriteOperation.java |  77 +
 .../translation/FlattenTranslator.java  |   7 +-
 .../runners/mapreduce/translation/Graph.java|  79 +
 .../mapreduce/translation/GraphPlanner.java |  67 +---
 .../runners/mapreduce/translation/Graphs.java   | 106 +---
 .../GroupAlsoByWindowsParDoOperation.java   |   5 +-
 .../translation/GroupByKeyTranslator.java   |   7 +-
 .../mapreduce/translation/JobPrototype.java |  93 +++
 .../mapreduce/translation/Operation.java|  11 +-
 .../mapreduce/translation/OutputReceiver.java   |   9 +-
 .../mapreduce/translation/ParDoOperation.java   |  43 +++--
 .../mapreduce/translation/ParDoTranslator.java  |   8 +-
 .../translation/ReadBoundedTranslator.java  |  11 +-
 .../mapreduce/translation/ReadOperation.java|  45 -
 .../ReifyTimestampAndWindowsParDoOperation.java |   5 +-
 .../translation/ShuffleWriteOperation.java  |  62 +++
 .../mapreduce/translation/SourceOperation.java  |  45 +
 .../translation/TranslationContext.java |   4 +-
 .../translation/TranslatorRegistry.java |  11 +-
 .../mapreduce/translation/ViewTranslator.java   |   8 +-
 .../translation/WindowAssignTranslator.java |   7 +-
 .../mapreduce/translation/WriteOperation.java   |  66 
 .../mapreduce/translation/GraphPlannerTest.java |   3 +-
 30 files changed, 801 insertions(+), 328 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
index c37da58..9224eb6 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
@@ -43,6 +43,10 @@ public interface MapReducePipelineOptions extends 
PipelineOptions {
   Class getJarClass();
   void setJarClass(Class jarClass);
 
+  @Description("The jar class of the user Beam program.")
+  String getTmpDir();
+  void setTmpDir(String tmpDir);
+
   class JarClassInstanceFactory implements DefaultValueFactory {
 @Override
 public Class create(PipelineOptions options) {

http://git-wip-us.apache.org/repos/asf/beam/blob/40396d75/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
index c5626a4..a7e75bb 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -66,19 +66,30 @@ public class MapReduceRunner extends 
PipelineRunner {
 
 LOG.info(graphConverter.getDotfile());
 
+Graphs.FusedGraph fusedGraph = new 
Graphs.FusedGraph(context.getInitGraph());
+LOG.info(DotfileWriter.toDotfile(fusedGraph));
+
 GraphPlanner planner = new GraphPlanner();
-Graphs.FusedGraph fusedGraph = planner.plan(context.getInitGraph());
+fusedGraph = planner.plan(fusedGraph);
 
 LOG.info(DotfileWriter.toDotfile(fusedGraph));
 
+Configuration config = new Configuration();
+config.set("keep.failed.task.files", "true");
+
+fusedGraph.getFusedSteps();
+
 int stageId = 0;
 for (Graphs.FusedStep fusedStep 

[14/36] beam git commit: mr-runner: support reduce side ParDos and WordCount.

2017-09-07 Thread kenn
mr-runner: support reduce side ParDos and WordCount.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c6a3a18d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c6a3a18d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c6a3a18d

Branch: refs/heads/mr-runner
Commit: c6a3a18d2c71c8f523deb54b323f26408c7de207
Parents: d09fb42
Author: Pei He 
Authored: Thu Jul 27 10:52:32 2017 +0800
Committer: Pei He 
Committed: Thu Aug 31 14:13:48 2017 +0800

--
 runners/map-reduce/pom.xml  |   2 +-
 .../mapreduce/translation/BeamMapper.java   |  12 +-
 .../mapreduce/translation/BeamReducer.java  |  56 ---
 .../runners/mapreduce/translation/Graph.java|   4 +-
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |   5 +
 .../mapreduce/translation/JobPrototype.java | 164 ++-
 .../mapreduce/translation/Operation.java|   8 +-
 .../mapreduce/translation/OutputReceiver.java   |   3 +-
 .../ReifyTimestampAndWindowsParDoOperation.java |  46 ++
 .../translation/WindowAssignOperation.java  |  75 +
 .../mapreduce/translation/WriteOperation.java   |  13 +-
 .../beam/runners/mapreduce/WordCountTest.java   |  13 +-
 12 files changed, 318 insertions(+), 83 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/pom.xml
--
diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml
index d18eee8..226c5c0 100644
--- a/runners/map-reduce/pom.xml
+++ b/runners/map-reduce/pom.xml
@@ -20,7 +20,7 @@
   
 org.apache.beam
 beam-runners-parent
-2.1.0-SNAPSHOT
+2.2.0-SNAPSHOT
 ../pom.xml
   
   

http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
index 11ecc8d..b5e4edc 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamMapper.java
@@ -16,7 +16,7 @@ public class BeamMapper
 
   public static final String BEAM_PAR_DO_OPERATION_MAPPER = 
"beam-par-do-op-mapper";
 
-  private ParDoOperation parDoOperation;
+  private Operation operation;
 
   @Override
   protected void setup(
@@ -24,9 +24,9 @@ public class BeamMapper
 String serializedParDo = checkNotNull(
 context.getConfiguration().get(BEAM_PAR_DO_OPERATION_MAPPER),
 BEAM_PAR_DO_OPERATION_MAPPER);
-parDoOperation = (ParDoOperation) 
SerializableUtils.deserializeFromByteArray(
-Base64.decodeBase64(serializedParDo), "ParDoOperation");
-parDoOperation.start((TaskInputOutputContext) context);
+operation = (Operation) SerializableUtils.deserializeFromByteArray(
+Base64.decodeBase64(serializedParDo), "Operation");
+operation.start((TaskInputOutputContext) context);
   }
 
   @Override
@@ -34,12 +34,12 @@ public class BeamMapper
   Object key,
   WindowedValue value,
   Mapper.Context context) {
-parDoOperation.process(value);
+operation.process(value);
   }
 
   @Override
   protected void cleanup(
   Mapper.Context context) {
-parDoOperation.finish();
+operation.finish();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c6a3a18d/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
index 8eb7938..9b8bd82 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/BeamReducer.java
@@ -5,14 +5,17 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import com.google.common.base.Function;
 import com.google.common.base.Throwables;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import 

[03/36] beam git commit: MapReduceRunner: add Graph and its visitors.

2017-09-07 Thread kenn
MapReduceRunner: add Graph and its visitors.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/092380cf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/092380cf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/092380cf

Branch: refs/heads/mr-runner
Commit: 092380cf87ada9d4a2b5faa7a42dff7005c44f17
Parents: 9fffd55
Author: Pei He 
Authored: Tue Jul 11 10:45:11 2017 +0800
Committer: Pei He 
Committed: Thu Aug 31 14:13:46 2017 +0800

--
 .../runners/mapreduce/translation/Graph.java| 190 +++
 .../mapreduce/translation/GraphConverter.java   |  40 
 .../mapreduce/translation/GraphPlanner.java |  99 ++
 .../mapreduce/translation/GraphVisitor.java |  11 ++
 .../MapReducePipelineTranslator.java|  11 --
 5 files changed, 340 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/092380cf/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
new file mode 100644
index 000..a9831bd
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
@@ -0,0 +1,190 @@
+package org.apache.beam.runners.mapreduce.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+
+/**
+ * Created by peihe on 06/07/2017.
+ */
+public class Graph {
+
+  private final Map vertices;
+  private final Map edges;
+  private final Set leafVertices;
+
+  public Graph() {
+this.vertices = Maps.newHashMap();
+this.edges = Maps.newHashMap();
+this.leafVertices = Sets.newHashSet();
+  }
+
+  public Vertex addVertex(PTransform transform) {
+checkState(!vertices.containsKey(transform));
+Vertex v = new Vertex(transform);
+vertices.put(transform, v);
+leafVertices.add(v);
+return v;
+  }
+
+  public Edge addEdge(Vertex head, Vertex tail) {
+HeadTail headTail = HeadTail.of(head, tail);
+checkState(!edges.containsKey(headTail));
+Edge e = new Edge(headTail);
+edges.put(headTail, e);
+head.addOutgoing(e);
+tail.addIncoming(e);
+leafVertices.remove(head);
+return e;
+  }
+
+  public Vertex getVertex(PTransform transform) {
+return vertices.get(transform);
+  }
+
+  public Edge getEdge(Vertex head, Vertex tail) {
+return edges.get(HeadTail.of(head, tail));
+  }
+
+  public Set getLeafVertices() {
+return leafVertices;
+  }
+
+  public void accept(GraphVisitor visitor) {
+for (Vertex v : leafVertices) {
+  v.accept(visitor);
+}
+  }
+
+  //TODO: add equals, hashCode, toString for following classses.
+
+  public static class Vertex {
+private final PTransform transform;
+private final Set incoming;
+private final Set outgoing;
+
+public Vertex(PTransform transform) {
+  this.transform = checkNotNull(transform, "transform");
+  this.incoming = Sets.newHashSet();
+  this.outgoing = Sets.newHashSet();
+}
+
+public PTransform getTransform() {
+  return transform;
+}
+
+public Set getIncoming() {
+  return incoming;
+}
+
+public Set getOutgoing() {
+  return outgoing;
+}
+
+public boolean isSource() {
+  return transform instanceof Read.Bounded || transform instanceof 
Read.Unbounded;
+}
+
+public boolean isGroupByKey() {
+  return transform instanceof GroupByKey;
+}
+
+public void addIncoming(Edge edge) {
+  incoming.add(edge);
+}
+
+public void addOutgoing(Edge edge) {
+  outgoing.add(edge);
+}
+
+public void accept(GraphVisitor visitor) {
+  if (transform instanceof ParDo.SingleOutput || transform instanceof 
ParDo.MultiOutput) {
+visitor.visitParDo(this);
+  } else if (transform instanceof GroupByKey) {
+visitor.visitGroupByKey(this);
+  } else if (transform instanceof Read.Bounded) {
+visitor.visitRead(this);
+  } else if (transform instanceof Flatten.PCollections
+  || transform 

[11/36] beam git commit: mr-runner: refactors and creates Graph data structures to handle general Beam pipelines.

2017-09-07 Thread kenn
http://git-wip-us.apache.org/repos/asf/beam/blob/16e63205/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
index 576c6bf..c336a70 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
@@ -1,27 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.mapreduce.translation;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -33,22 +42,25 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 
 /**
- * Created by peihe on 24/07/2017.
+ * Class that translates a {@link Graphs.FusedStep} to a MapReduce job.
  */
 public class JobPrototype {
 
-  public static JobPrototype create(int stageId, Graph.Vertex vertex) {
-return new JobPrototype(stageId, vertex);
+  public static JobPrototype create(
+  int stageId, Graphs.FusedStep fusedStep, PipelineOptions options) {
+return new JobPrototype(stageId, fusedStep, options);
   }
 
   private final int stageId;
-  private final Graph.Vertex vertex;
+  private final Graphs.FusedStep fusedStep;
   private final Set dependencies;
+  private final PipelineOptions options;
 
-  private JobPrototype(int stageId, Graph.Vertex vertex) {
+  private JobPrototype(int stageId, Graphs.FusedStep fusedStep, 
PipelineOptions options) {
 this.stageId = stageId;
-this.vertex = checkNotNull(vertex, "vertex");
+this.fusedStep = checkNotNull(fusedStep, "fusedStep");
 this.dependencies = Sets.newHashSet();
+this.options = checkNotNull(options, "options");
   }
 
   public Job build(Class jarClass, Configuration conf) throws IOException {
@@ -57,168 +69,101 @@ public class JobPrototype {
 job.setJarByClass(jarClass);
 conf.set(
 "io.serializations",
-"org.apache.hadoop.io.serializer.WritableSerialization," +
-"org.apache.hadoop.io.serializer.JavaSerialization");
+"org.apache.hadoop.io.serializer.WritableSerialization,"
++ "org.apache.hadoop.io.serializer.JavaSerialization");
 
 // Setup BoundedSources in BeamInputFormat.
-// TODO: support more than one in-edge
-Graph.Edge inEdge = Iterables.getOnlyElement(vertex.getIncoming());
-Graph.Vertex head = inEdge.getHead();
-Graph.Step headStep = head.getStep();
-checkState(headStep.getTransform() instanceof Read.Bounded);
-Read.Bounded read = (Read.Bounded) headStep.getTransform();
+// TODO: support more than one read steps by introducing a composed 
BeamInputFormat
+// and a partition operation.
+Graphs.Step readStep = 

[07/36] beam git commit: core-java: InMemoryTimerInternals expose getTimers() for timer firings in mr-runner.

2017-09-07 Thread kenn
core-java: InMemoryTimerInternals expose getTimers() for timer firings in 
mr-runner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d09fb427
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d09fb427
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d09fb427

Branch: refs/heads/mr-runner
Commit: d09fb42773daccac01e9e680c52e1666fd03cfe1
Parents: 923190d
Author: Pei He 
Authored: Thu Jul 27 15:01:22 2017 +0800
Committer: Pei He 
Committed: Thu Aug 31 14:13:47 2017 +0800

--
 .../org/apache/beam/runners/core/InMemoryTimerInternals.java| 5 +
 1 file changed, 5 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/d09fb427/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index c29ea19..c1d42d6 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Table;
 import java.util.NavigableSet;
 import java.util.TreeSet;
@@ -65,6 +66,10 @@ public class InMemoryTimerInternals implements 
TimerInternals {
 return outputWatermarkTime;
   }
 
+  public Iterable getTimers(TimeDomain domain) {
+return ImmutableList.copyOf(timersForDomain(domain));
+  }
+
   /**
* Returns when the next timer in the given time domain will fire, or {@code 
null}
* if there are no timers scheduled in that time domain.



[06/36] beam git commit: mr-runner: add JobPrototype and translate it to a MR job.

2017-09-07 Thread kenn
mr-runner: add JobPrototype and translate it to a MR job.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0cbdc5b7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0cbdc5b7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0cbdc5b7

Branch: refs/heads/mr-runner
Commit: 0cbdc5b75ed5581ffef8d129b4e61e339d459697
Parents: a884a2f
Author: Pei He 
Authored: Mon Jul 24 20:15:37 2017 +0800
Committer: Pei He 
Committed: Thu Aug 31 14:13:47 2017 +0800

--
 .../mapreduce/MapReducePipelineOptions.java |  5 ++
 .../beam/runners/mapreduce/MapReduceRunner.java | 41 -
 .../runners/mapreduce/MapReduceWordCount.java   |  2 +-
 .../mapreduce/translation/BeamInputFormat.java  | 44 -
 .../mapreduce/translation/BeamMapper.java   | 75 +---
 .../runners/mapreduce/translation/Graph.java|  5 ++
 .../mapreduce/translation/JobPrototype.java | 95 
 .../beam/runners/mapreduce/WordCountTest.java   | 42 +++--
 8 files changed, 244 insertions(+), 65 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/0cbdc5b7/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
index da29931..ce8f937 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
@@ -1,9 +1,14 @@
 package org.apache.beam.runners.mapreduce;
 
+import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
  * {@link PipelineOptions} for {@link MapReduceRunner}.
  */
 public interface MapReducePipelineOptions extends PipelineOptions {
+
+  @Description("The jar class of the user Beam program.")
+  Class getJarClass();
+  void setJarClass(Class jarClass);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0cbdc5b7/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
index 247a8e5..0e3142c 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -1,9 +1,21 @@
 package org.apache.beam.runners.mapreduce;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Throwables;
+import java.io.IOException;
+import org.apache.beam.runners.mapreduce.translation.Graph;
+import org.apache.beam.runners.mapreduce.translation.GraphConverter;
+import org.apache.beam.runners.mapreduce.translation.GraphPlanner;
+import org.apache.beam.runners.mapreduce.translation.JobPrototype;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
 
 /**
  * {@link PipelineRunner} for crunch.
@@ -17,11 +29,38 @@ public class MapReduceRunner extends 
PipelineRunner {
* @return The newly created runner.
*/
   public static MapReduceRunner fromOptions(PipelineOptions options) {
-return new MapReduceRunner();
+return new MapReduceRunner(options.as(MapReducePipelineOptions.class));
+  }
+
+  private final MapReducePipelineOptions options;
+
+  MapReduceRunner(MapReducePipelineOptions options) {
+this.options = checkNotNull(options, "options");
   }
 
   @Override
   public PipelineResult run(Pipeline pipeline) {
+GraphConverter graphConverter = new GraphConverter();
+pipeline.traverseTopologically(graphConverter);
+
+Graph graph = graphConverter.getGraph();
+
+GraphPlanner planner = new GraphPlanner();
+Graph fusedGraph = planner.plan(graph);
+for (Graph.Vertex vertex : fusedGraph.getAllVertices()) {
+  if (vertex.getTransform() instanceof GroupByKey
+  || vertex.getTransform() instanceof Read.Bounded) {
+continue;
+  } else {
+JobPrototype jobPrototype = JobPrototype.create(1, vertex);
+try {
+  Job job = 

[13/36] beam git commit: mr-runner: add JarClassInstanceFactory to run ValidatesRunner tests.

2017-09-07 Thread kenn
mr-runner: add JarClassInstanceFactory to run ValidatesRunner tests.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee1cce92
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee1cce92
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee1cce92

Branch: refs/heads/mr-runner
Commit: ee1cce92d620c78b0243ee0b9c7c0f6ae232b0cb
Parents: c6a3a18
Author: Pei He 
Authored: Thu Jul 27 15:05:06 2017 +0800
Committer: Pei He 
Committed: Thu Aug 31 14:13:48 2017 +0800

--
 runners/map-reduce/pom.xml  |  2 +-
 .../mapreduce/MapReducePipelineOptions.java | 51 
 runners/pom.xml |  1 +
 3 files changed, 53 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/ee1cce92/runners/map-reduce/pom.xml
--
diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml
index 226c5c0..06e5227 100644
--- a/runners/map-reduce/pom.xml
+++ b/runners/map-reduce/pom.xml
@@ -39,7 +39,7 @@
   
   local-validates-runner-tests
-  false
+  true
   
 
   

http://git-wip-us.apache.org/repos/asf/beam/blob/ee1cce92/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
index ce8f937..7fe66ba 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
@@ -1,14 +1,65 @@
 package org.apache.beam.runners.mapreduce;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterators;
+import java.util.Iterator;
+import java.util.Set;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 
 /**
  * {@link PipelineOptions} for {@link MapReduceRunner}.
  */
 public interface MapReducePipelineOptions extends PipelineOptions {
 
+  /** Classes that are used as the boundary in the stack trace to find the 
callers class name. */
+  Set PIPELINE_OPTIONS_FACTORY_CLASSES = ImmutableSet.of(
+  PipelineOptionsFactory.class.getName(),
+  PipelineOptionsFactory.Builder.class.getName(),
+  "org.apache.beam.sdk.options.ProxyInvocationHandler");
+
+
   @Description("The jar class of the user Beam program.")
+  @Default.InstanceFactory(JarClassInstanceFactory.class)
   Class getJarClass();
   void setJarClass(Class jarClass);
+
+  class JarClassInstanceFactory implements DefaultValueFactory {
+@Override
+public Class create(PipelineOptions options) {
+  return findCallersClassName(options);
+}
+
+/**
+ * Returns the simple name of the calling class using the current threads 
stack.
+ */
+private static Class findCallersClassName(PipelineOptions options) {
+  Iterator elements =
+  Iterators.forArray(Thread.currentThread().getStackTrace());
+  // First find the PipelineOptionsFactory/Builder class in the stack 
trace.
+  while (elements.hasNext()) {
+StackTraceElement next = elements.next();
+if (PIPELINE_OPTIONS_FACTORY_CLASSES.contains(next.getClassName())) {
+  break;
+}
+  }
+  // Then find the first instance after that is not the 
PipelineOptionsFactory/Builder class.
+  while (elements.hasNext()) {
+StackTraceElement next = elements.next();
+if (!PIPELINE_OPTIONS_FACTORY_CLASSES.contains(next.getClassName())
+&& !next.getClassName().contains("com.sun.proxy.$Proxy")
+&& !next.getClassName().equals(options.getRunner().getName())) {
+  try {
+return Class.forName(next.getClassName());
+  } catch (ClassNotFoundException e) {
+break;
+  }
+}
+  }
+  return null;
+}
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ee1cce92/runners/pom.xml
--
diff --git a/runners/pom.xml b/runners/pom.xml
index b00ba9c..4cba41a 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -35,6 +35,7 @@
   
 core-construction-java
 core-java
+map-reduce
 direct-java
 flink
 google-cloud-dataflow-java



[01/36] beam git commit: Initial commit for MapReduceRunner.

2017-09-07 Thread kenn
Repository: beam
Updated Branches:
  refs/heads/mr-runner 2fa4fdecd -> 5fa0b14d2


Initial commit for MapReduceRunner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9fffd554
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9fffd554
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9fffd554

Branch: refs/heads/mr-runner
Commit: 9fffd554f1e5fd6465989bb3568dfb6f2d854eeb
Parents: f54072a
Author: Pei He 
Authored: Thu Jul 6 10:22:27 2017 +0800
Committer: Pei He 
Committed: Mon Jul 24 20:15:42 2017 +0800

--
 runners/map-reduce/pom.xml  | 165 +++
 .../mapreduce/MapReducePipelineOptions.java |   9 +
 .../runners/mapreduce/MapReduceRegistrar.java   |  32 
 .../beam/runners/mapreduce/MapReduceRunner.java |  15 ++
 .../MapReducePipelineTranslator.java|  11 ++
 5 files changed, 232 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/9fffd554/runners/map-reduce/pom.xml
--
diff --git a/runners/map-reduce/pom.xml b/runners/map-reduce/pom.xml
new file mode 100644
index 000..2e8a8c9
--- /dev/null
+++ b/runners/map-reduce/pom.xml
@@ -0,0 +1,165 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+  4.0.0
+
+  
+org.apache.beam
+beam-runners-parent
+2.1.0-SNAPSHOT
+../pom.xml
+  
+  
+  beam-runners-map-reduce
+
+  Apache Beam :: Runners :: MapReduce
+
+  jar
+
+  
+2.8.1
+  
+  
+  
+
+  
+  local-validates-runner-tests
+  false
+  
+
+  
+org.apache.maven.plugins
+maven-surefire-plugin
+
+  
+validates-runner-tests
+integration-test
+
+  test
+
+
+  
+org.apache.beam.sdk.testing.ValidatesRunner
+  
+  none
+  true
+  
+
org.apache.beam:beam-sdks-java-core
+  
+  
+
+  [
+
"--runner=org.apache.beam.runners.mapreduce.MapReduceRunner"
+  ]
+
+  
+
+  
+
+  
+
+  
+
+  
+
+  
+
+
+  org.apache.hadoop
+  hadoop-mapreduce-client-core
+  ${mapreduce.version}
+
+
+
+
+  org.apache.beam
+  beam-sdks-java-core
+
+
+
+  org.apache.beam
+  beam-runners-core-java
+
+
+
+  org.apache.beam
+  beam-runners-core-construction-java
+
+
+
+
+com.google.auto.service
+auto-service
+true
+
+
+com.google.auto.value
+auto-value
+
+
+
+
+  org.apache.beam
+  beam-sdks-java-core
+  tests
+  test
+
+
+
+
+  com.fasterxml.jackson.dataformat
+  jackson-dataformat-yaml
+  test
+
+
+
+
+junit
+junit
+test
+
+
+  org.hamcrest
+  hamcrest-all
+  test
+
+
+  org.mockito
+  mockito-all
+  test
+
+  
+
+  
+  
+
+  org.apache.maven.plugins
+  maven-dependency-plugin
+  
+
+  analyze-only
+  
+
+false
+  
+
+  
+
+  
+  
+

http://git-wip-us.apache.org/repos/asf/beam/blob/9fffd554/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
new file mode 100644
index 000..da29931
--- /dev/null
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReducePipelineOptions.java
@@ -0,0 +1,9 @@
+package org.apache.beam.runners.mapreduce;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * {@link PipelineOptions} for {@link MapReduceRunner}.
+ */
+public interface MapReducePipelineOptions extends PipelineOptions {
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9fffd554/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRegistrar.java
--

[02/36] beam git commit: MapReduceRunner: add unit tests for GraphConverter and GraphPlanner.

2017-09-07 Thread kenn
MapReduceRunner: add unit tests for GraphConverter and GraphPlanner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a8b366de
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a8b366de
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a8b366de

Branch: refs/heads/mr-runner
Commit: a8b366de9e4e0c79a7800184afc79b377477b8ed
Parents: 092380c
Author: Pei He 
Authored: Thu Jul 13 14:09:10 2017 +0800
Committer: Pei He 
Committed: Thu Aug 31 14:13:46 2017 +0800

--
 .../beam/runners/mapreduce/MapReduceRunner.java |  12 +++
 .../runners/mapreduce/translation/Graph.java|  89 ++-
 .../mapreduce/translation/GraphConverter.java   |   6 +-
 .../mapreduce/translation/GraphPlanner.java |   1 +
 .../beam/runners/mapreduce/WordCountTest.java   | 108 +++
 .../translation/GraphConverterTest.java |  39 +++
 .../mapreduce/translation/GraphPlannerTest.java |  42 
 7 files changed, 294 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/a8b366de/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
index bb9555e..247a8e5 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/MapReduceRunner.java
@@ -3,11 +3,23 @@ package org.apache.beam.runners.mapreduce;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
  * {@link PipelineRunner} for crunch.
  */
 public class MapReduceRunner extends PipelineRunner {
+
+  /**
+   * Construct a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static MapReduceRunner fromOptions(PipelineOptions options) {
+return new MapReduceRunner();
+  }
+
   @Override
   public PipelineResult run(Pipeline pipeline) {
 return null;

http://git-wip-us.apache.org/repos/asf/beam/blob/a8b366de/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
index a9831bd..1ca5a05 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
@@ -4,16 +4,21 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.lang.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
 
 /**
  * Created by peihe on 06/07/2017.
@@ -57,8 +62,16 @@ public class Graph {
 return edges.get(HeadTail.of(head, tail));
   }
 
-  public Set getLeafVertices() {
-return leafVertices;
+  public Iterable getAllVertices() {
+return vertices.values();
+  }
+
+  public Iterable getAllEdges() {
+return edges.values();
+  }
+
+  public Iterable getLeafVertices() {
+return ImmutableList.copyOf(leafVertices);
   }
 
   public void accept(GraphVisitor visitor) {
@@ -122,6 +135,29 @@ public class Graph {
 throw new RuntimeException("Unexpected transform type: " + 
transform.getClass());
   }
 }
+
+@Override
+public boolean equals(Object obj) {
+  if (obj == this) {
+return true;
+  }
+  if (obj instanceof Vertex) {
+Vertex other = (Vertex) obj;
+return transform.equals(other.transform);
+  }
+  return false;
+}
+
+@Override
+public int hashCode() {
+  

[24/36] beam git commit: mr-runner: remove the hard-coded GlobalWindow coder, and fixes WindowingTest.

2017-09-07 Thread kenn
mr-runner: remove the hard-coded GlobalWindow coder, and fixes WindowingTest.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2d8b12a6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2d8b12a6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2d8b12a6

Branch: refs/heads/mr-runner
Commit: 2d8b12a6ec6d58a01084c6c06e92b3f884c166ba
Parents: ca0b15a
Author: Pei He 
Authored: Thu Aug 31 18:58:16 2017 +0800
Committer: Pei He 
Committed: Fri Sep 1 17:13:40 2017 +0800

--
 .../translation/FileReadOperation.java  | 11 
 .../mapreduce/translation/GraphPlanner.java |  7 +++--
 .../runners/mapreduce/translation/Graphs.java   | 10 ++--
 .../mapreduce/translation/JobPrototype.java |  2 +-
 .../translation/TranslationContext.java | 27 ++--
 5 files changed, 36 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/2d8b12a6/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
index a95e79e..cbbfbd2 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileReadOperation.java
@@ -47,12 +47,12 @@ import org.apache.hadoop.io.SequenceFile;
 public class FileReadOperation extends ReadOperation {
 
   private final String fileName;
-  private final Coder coder;
+  private final Coder coder;
   private final TupleTag tupleTag;
 
   public FileReadOperation(
   String fileName,
-  Coder coder,
+  Coder coder,
   TupleTag tupleTag) {
 super();
 this.fileName = checkNotNull(fileName, "fileName");
@@ -73,11 +73,10 @@ public class FileReadOperation extends 
ReadOperation {
 private final Coder coder;
 private final SerializableConfiguration conf;
 
-FileBoundedSource(String fileName, Coder coder, 
SerializableConfiguration conf) {
+FileBoundedSource(
+String fileName, Coder coder, 
SerializableConfiguration conf) {
   this.fileName = checkNotNull(fileName, "fileName");
-  checkNotNull(coder, "coder");
-  this.coder = WindowedValue.getFullCoder(
-  coder, 
WindowingStrategy.globalDefault().getWindowFn().windowCoder());
+  this.coder = checkNotNull(coder, "coder");
   this.conf = checkNotNull(conf, "conf");
 }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2d8b12a6/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
--
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
index 608b304..6c79277 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/GraphPlanner.java
@@ -58,9 +58,8 @@ public class GraphPlanner {
 String tagName = tag.getName();
 String fileName = ConfigurationUtils.toFileName(tagName);
 
-// TODO: should not hard-code windows coder.
 WindowedValue.WindowedValueCoder writeValueCoder = 
WindowedValue.getFullCoder(
-tag.getCoder(), 
WindowingStrategy.globalDefault().getWindowFn().windowCoder());
+tag.getCoder(), 
tag.getWindowingStrategy().getWindowFn().windowCoder());
 
 fusedStep.addStep(
 Graphs.Step.of(
@@ -71,7 +70,7 @@ public class GraphPlanner {
 
 String readStepName = tagName + "/Read";
 Graphs.Tag readOutput = Graphs.Tag.of(
-readStepName + ".out", tag.getTupleTag(), tag.getCoder());
+readStepName + ".out", tag.getTupleTag(), tag.getCoder(), 
tag.getWindowingStrategy());
 for (Graphs.FusedStep consumer : consumers) {
   // Re-direct tag to readOutput.
   List receivers = consumer.getConsumers(tag);
@@ -84,7 +83,7 @@ public class GraphPlanner {
   consumer.addStep(
   Graphs.Step.of(
   readStepName,
-  new FileReadOperation(filePath, tag.getCoder(), 
tag.getTupleTag())),
+  new FileReadOperation(filePath, writeValueCoder, 

[jira] [Updated] (BEAM-2822) Add support for progress reporting in fn API

2017-09-07 Thread Luke Cwik (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Cwik updated BEAM-2822:

Description: https://s.apache.org/beam-fn-api-progress-reporting  (was: 
https)

> Add support for progress reporting in fn API
> 
>
> Key: BEAM-2822
> URL: https://issues.apache.org/jira/browse/BEAM-2822
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Vikas Kedigehalli
>Assignee: Vikas Kedigehalli
>Priority: Minor
>
> https://s.apache.org/beam-fn-api-progress-reporting



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2822) Add support for progress reporting in fn API

2017-09-07 Thread Luke Cwik (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Cwik updated BEAM-2822:

Description: https

> Add support for progress reporting in fn API
> 
>
> Key: BEAM-2822
> URL: https://issues.apache.org/jira/browse/BEAM-2822
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model
>Reporter: Vikas Kedigehalli
>Assignee: Vikas Kedigehalli
>Priority: Minor
>
> https



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2858) temp file garbage collection in BigQuery sink should be in a separate DoFn

2017-09-07 Thread Chamikara Jayalath (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16157354#comment-16157354
 ] 

Chamikara Jayalath commented on BEAM-2858:
--

I tried intentionally deleting one of the files before running a load job.

Dataflow job: 2017-09-07_10_47_31-9640870329732038724
BigQuery load job: 
f0953eede88e454bb3b01d2cdba3c10d_1a2e1a938a117f8e3ff2af5ee4f58b45_1_0-0

This Dataflow job succeeded. So looks like this issue will manifest as a 
dataloss.

Reuven, feel free to grab this issue if you are hoping to produce a fix 
otherwise I'll look into this early next week.

> temp file garbage collection in BigQuery sink should be in a separate DoFn
> --
>
> Key: BEAM-2858
> URL: https://issues.apache.org/jira/browse/BEAM-2858
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Affects Versions: 2.1.0
>Reporter: Reuven Lax
>Assignee: Chamikara Jayalath
> Fix For: 2.2.0
>
>
> Currently the WriteTables transform deletes the set of input files as soon as 
> the load() job completes. However this is incorrect - if the task fails 
> partially through deleting files (e.g. if the worker crashes), the task will 
> be retried. If the write disposition is WRITE_TRUNCATE, bad things could 
> result.
> The resulting behavior will depend on what BQ does if one of input files is 
> missing (because we had previously deleted it). In the best case, BQ will 
> fail the load. In this case the step will keep failing until the runner 
> finally fails the entire job. If however BQ ignores the missing file, the 
> load will overwrite the previously-written table with the smaller set of 
> files and the job will succeed. This is the worst-case scenario, as it will 
> result in data loss.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (BEAM-2842) python test failure in local maven build

2017-09-07 Thread Xu Mingmin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xu Mingmin closed BEAM-2842.

   Resolution: Duplicate
Fix Version/s: Not applicable

> python test failure in local maven build
> 
>
> Key: BEAM-2842
> URL: https://issues.apache.org/jira/browse/BEAM-2842
> Project: Beam
>  Issue Type: Test
>  Components: sdk-ideas, website
>Reporter: Xu Mingmin
>Assignee: Reuven Lax
>Priority: Minor
> Fix For: Not applicable
>
>
> See below error when running {{mvn clean install}} locally (Mac OS), with a 
> fresh environment. 
> I resolve it following the guideline mentioned in error log, and wanna what's 
> the proper way to improve it:
> 1). add a task in prepare step of 'Contribution Guide';
> 2). option for shared credential?
> {code}
> File 
> “/Users//Desktop/beam/sdks/python/target/.tox/py27gcp/lib/python2.7/site-packages/google/auth/_default.py”,
>  line 282, in default
> raise exceptions.DefaultCredentialsError(_HELP_MESSAGE)
> DefaultCredentialsError: Could not automatically determine credentials. 
> Please set GOOGLE_APPLICATION_CREDENTIALS or
> explicitly create credential and re-run the application. For more
> information, please see
> https://developers.google.com/accounts/docs/application-default-credentials.
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >