[GitHub] incubator-beam pull request #803: [BEAM-498] Port most overrides in Dataflow...

2016-08-08 Thread kennknowles
GitHub user kennknowles opened a pull request:

https://github.com/apache/incubator-beam/pull/803

[BEAM-498] Port most overrides in Dataflow runner to new DoFn

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [x] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

R: @bjchambers 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kennknowles/incubator-beam DataflowRunner

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/803.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #803


commit e0374daa98cf378485297619d5dcc2564d208a1b
Author: Kenneth Knowles 
Date:   2016-08-08T20:27:00Z

Deprecate more uses of OldDoFn

commit c559e5b847faa541c18e3c7608a58cee661492df
Author: Kenneth Knowles 
Date:   2016-08-09T03:35:59Z

Port easy parts of DataflowRunner to new DoFn




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-124) Testing -- End to End WordCount Batch and Streaming Tests

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15412581#comment-15412581
 ] 

ASF GitHub Bot commented on BEAM-124:
-

Github user jasonkuster closed the pull request at:

https://github.com/apache/incubator-beam/pull/345


> Testing -- End to End WordCount Batch and Streaming Tests
> -
>
> Key: BEAM-124
> URL: https://issues.apache.org/jira/browse/BEAM-124
> Project: Beam
>  Issue Type: New Feature
>  Components: testing
>Reporter: Steve Wheeler
>Assignee: Mark Liu
>
> Set up testing infrastructure so that an end to end test for WordCount (both 
> batch and streaming) will be run periodically. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-539) Error when writing to the root of a GCS location

2016-08-08 Thread Ahmet Altay (JIRA)
Ahmet Altay created BEAM-539:


 Summary: Error when writing to the root of a GCS location
 Key: BEAM-539
 URL: https://issues.apache.org/jira/browse/BEAM-539
 Project: Beam
  Issue Type: Bug
  Components: sdk-py
Reporter: Ahmet Altay
Assignee: Chamikara Jayalath
Priority: Minor


User issue: 
http://stackoverflow.com/questions/38811152/google-dataflow-python-pipeline-write-failure

Reproduction: use a TextFileSink and set output locations as gs://mybucket and 
it fails. Change it to gs://mybucket/ and it works.

The final output path is generated here:
https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/io/fileio.py#L495

And this seemingly works in the Java SDK.

Stack:

  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/iobase.py", line 
1058, in finish_bundle
yield window.TimestampedValue(self.writer.close(), window.MAX_TIMESTAMP)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/fileio.py", line 
601, in close
self.sink.close(self.temp_handle)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/fileio.py", line 
687, in close
file_handle.close()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcsio.py", line 
617, in close
self._flush_write_buffer()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/io/gcsio.py", line 
647, in _flush_write_buffer
raise self.upload_thread.last_error  # pylint: disable=raising-bad-type
HttpError: HttpError accessing 
:
 response: <{'status': '404', 'alternate-protocol': '443:quic', 
'content-length': '165', 'vary': 'Origin, X-Origin', 'server': 'UploadServer', 
'x-guploader-uploadid': 
'AEnB2Uq6ZGb_CsrMVxozv6aL48k4OMMiRgYVeVGmJrM-sMQWRGeGMkesOQg5F0W7HZuaqTBog_d4ml-DlIars_ZvJTejdfcbAUr4gswZWVieq82ufc3WR2g',
 'date': 'Mon, 08 Aug 2016 21:29:46 GMT', 'alt-svc': 'quic=":443"; ma=2592000; 
v="36,35,34,33,32,31,30"', 'content-type': 'application/json; charset=UTF-8'}>, 
content <{
 "error": {
  "errors": [
   {
"domain": "global",
"reason": "notFound",
"message": "Not Found"
   }
  ],
  "code": 404,
  "message": "Not Found"
 }
}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[2/2] incubator-beam git commit: This closes #520

2016-08-08 Thread kenn
This closes #520


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5049011a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5049011a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5049011a

Branch: refs/heads/master
Commit: 5049011a2602acc1ce0e1997b467ddee38c66c10
Parents: d60a0a0 c376b45
Author: Kenneth Knowles 
Authored: Mon Aug 8 13:51:21 2016 -0700
Committer: Kenneth Knowles 
Committed: Mon Aug 8 13:51:21 2016 -0700

--
 .../beam/runners/direct/SideInputContainer.java |   2 +-
 .../runners/direct/ViewEvaluatorFactory.java|   5 +-
 .../functions/FlinkProcessContext.java  |   2 +-
 .../functions/SideInputInitializer.java |   2 +-
 .../spark/translation/SparkProcessContext.java  |   2 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  |   2 +-
 .../org/apache/beam/sdk/transforms/ViewFn.java  |  45 
 .../beam/sdk/util/DirectSideInputReader.java|   4 +-
 .../apache/beam/sdk/util/PCollectionViews.java  | 228 +--
 .../apache/beam/sdk/values/PCollectionView.java |  29 ++-
 .../sdk/testing/PCollectionViewTesting.java |  35 +--
 11 files changed, 262 insertions(+), 94 deletions(-)
--




[GitHub] incubator-beam pull request #520: [BEAM-115] Add ViewFn to SDK and adjust PC...

2016-08-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/520


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: Add ViewFn and port SDK to use it

2016-08-08 Thread kenn
Repository: incubator-beam
Updated Branches:
  refs/heads/master d60a0a0e4 -> 5049011a2


Add ViewFn and port SDK to use it

This is a preliminary step towards the architecture at
https://s.apache.org/beam-side-input-1-pager

This separates the ViewFn part of each PCollectionView
class/transform, toward eliminating extraneous public
subclasses of PCollectionView and PTransform.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c376b45c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c376b45c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c376b45c

Branch: refs/heads/master
Commit: c376b45cac8568d7242d29725f4a9a701673df75
Parents: 2b5c6bc
Author: Kenneth Knowles 
Authored: Wed Jun 22 08:39:33 2016 -0700
Committer: Kenneth Knowles 
Committed: Mon Aug 8 12:31:11 2016 -0700

--
 .../beam/runners/direct/SideInputContainer.java |   2 +-
 .../runners/direct/ViewEvaluatorFactory.java|   5 +-
 .../functions/FlinkProcessContext.java  |   2 +-
 .../functions/SideInputInitializer.java |   2 +-
 .../spark/translation/SparkProcessContext.java  |   2 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  |   2 +-
 .../org/apache/beam/sdk/transforms/ViewFn.java  |  45 
 .../beam/sdk/util/DirectSideInputReader.java|   4 +-
 .../apache/beam/sdk/util/PCollectionViews.java  | 228 +--
 .../apache/beam/sdk/values/PCollectionView.java |  29 ++-
 .../sdk/testing/PCollectionViewTesting.java |  35 +--
 11 files changed, 262 insertions(+), 94 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
index 7a19ed9..6458215 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java
@@ -247,7 +247,7 @@ class SideInputContainer {
   @SuppressWarnings("unchecked") Iterable values =
   (Iterable) 
viewContents.getUnchecked(PCollectionViewWindow.of(view,
   window)).get();
-  return view.fromIterableInternal(values);
+  return view.getViewFn().apply(values);
 }
 
 @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 7a0b0f7..362e903 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -133,8 +133,9 @@ class ViewEvaluatorFactory implements 
TransformEvaluatorFactory {
   /**
* An in-process implementation of the {@link CreatePCollectionView} 
primitive.
*
-   * This implementation requires the input {@link PCollection} to be an 
iterable, which is provided
-   * to {@link PCollectionView#fromIterableInternal(Iterable)}.
+   * This implementation requires the input {@link PCollection} to be an 
iterable
+   * of {@code WindowedValue}, which is provided
+   * to {@link PCollectionView#getViewFn()} for conversion to {@link ViewT}.
*/
   public static final class WriteView
   extends PTransform, PCollectionView> 
{

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c376b45c/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
--
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
index 3954d1f..64b93c8 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
@@ -249,7 +249,7 @@ class FlinkProcessContext
 

[3/3] incubator-beam git commit: This closes #768

2016-08-08 Thread kenn
This closes #768


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d60a0a0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d60a0a0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d60a0a0e

Branch: refs/heads/master
Commit: d60a0a0e412f51e98f6f024dfc7ec4f21d1355a4
Parents: 574c377 2b6c873
Author: Kenneth Knowles 
Authored: Mon Aug 8 13:43:07 2016 -0700
Committer: Kenneth Knowles 
Committed: Mon Aug 8 13:43:07 2016 -0700

--
 .travis.yml |  5 +
 pom.xml | 21 +++--
 2 files changed, 8 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d60a0a0e/pom.xml
--



[1/3] incubator-beam git commit: Move test-jar back to default profile

2016-08-08 Thread kenn
Repository: incubator-beam
Updated Branches:
  refs/heads/master 574c3777d -> d60a0a0e4


Move test-jar back to default profile

The test jars for beam-sdks-java-core and
beam-runners-core-java are both mandatory artifacts
with testing utilities intended for general use.

These were moved out of the default profile since for most
normal projects they are optional release-time distributions,
but for us they are not.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a5436a31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a5436a31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a5436a31

Branch: refs/heads/master
Commit: a5436a31fa23cec8a5649b65e0ccfb9aa9d74e18
Parents: da31a2d
Author: Kenneth Knowles 
Authored: Tue Aug 2 13:55:13 2016 -0700
Committer: Kenneth Knowles 
Committed: Wed Aug 3 10:56:14 2016 -0700

--
 pom.xml | 21 +++--
 1 file changed, 7 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5436a31/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 3b98382..5e3601a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -149,7 +149,7 @@
 
   
 
 
   release
@@ -192,19 +192,6 @@
 
 
 
-  org.apache.maven.plugins
-  maven-jar-plugin
-  
-
-  default-test-jar
-  
-test-jar
-  
-
-  
-
-
-
   org.apache.rat
   apache-rat-plugin
   
@@ -752,6 +739,12 @@
 jar
   
 
+
+  default-test-jar
+  
+test-jar
+  
+
   
 
 



[GitHub] incubator-beam pull request #768: Move test-jar back to default profile

2016-08-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/768


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-498) Make DoFnWithContext the new DoFn

2016-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15412423#comment-15412423
 ] 

ASF GitHub Bot commented on BEAM-498:
-

Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/797


> Make DoFnWithContext the new DoFn
> -
>
> Key: BEAM-498
> URL: https://issues.apache.org/jira/browse/BEAM-498
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #797: [BEAM-498] Port more easy cases to new DoF...

2016-08-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/797


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[02/13] incubator-beam git commit: Port BigQueryIO to new DoFn

2016-08-08 Thread kenn
Port BigQueryIO to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d6395e9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d6395e9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d6395e9d

Branch: refs/heads/master
Commit: d6395e9d45dcbeb9b3d3e2f8214a49866622b9cf
Parents: 87313f1
Author: Kenneth Knowles 
Authored: Fri Aug 5 12:26:53 2016 -0700
Committer: Kenneth Knowles 
Committed: Mon Aug 8 11:35:17 2016 -0700

--
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java| 21 ++--
 1 file changed, 10 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6395e9d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
--
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index ed2c32e..36e09f1 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -44,7 +44,6 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -1785,7 +1784,7 @@ public class BigQueryIO {
 return PDone.in(input.getPipeline());
   }
 
-  private class WriteBundles extends OldDoFn> {
+  private class WriteBundles extends DoFn> {
 private TableRowWriter writer = null;
 private final String tempFilePrefix;
 
@@ -1793,7 +1792,7 @@ public class BigQueryIO {
   this.tempFilePrefix = tempFilePrefix;
 }
 
-@Override
+@ProcessElement
 public void processElement(ProcessContext c) throws Exception {
   if (writer == null) {
 writer = new TableRowWriter(tempFilePrefix);
@@ -1806,7 +1805,7 @@ public class BigQueryIO {
 // Discard write result and close the write.
 try {
   writer.close();
-  // The writer does not need to be reset, as this OldDoFn cannot 
be reused.
+  // The writer does not need to be reset, as this DoFn cannot be 
reused.
 } catch (Exception closeException) {
   // Do not mask the exception that caused the write to fail.
   e.addSuppressed(closeException);
@@ -1815,7 +1814,7 @@ public class BigQueryIO {
   }
 }
 
-@Override
+@FinishBundle
 public void finishBundle(Context c) throws Exception {
   if (writer != null) {
 c.output(writer.close());
@@ -1959,7 +1958,7 @@ public class BigQueryIO {
 /**
  * Partitions temporary files based on number of files and file sizes.
  */
-static class WritePartition extends OldDoFn> {
+static class WritePartition extends DoFn> {
   private final PCollectionView>> resultsView;
   private TupleTag> multiPartitionsTag;
   private TupleTag> singlePartitionTag;
@@ -1973,7 +1972,7 @@ public class BigQueryIO {
 this.singlePartitionTag = singlePartitionTag;
   }
 
-  @Override
+  @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
 List> results = 
Lists.newArrayList(c.sideInput(resultsView));
 if (results.isEmpty()) {
@@ -2015,7 +2014,7 @@ public class BigQueryIO {
 /**
  * Writes partitions to BigQuery tables.
  */
-static class WriteTables extends OldDoFn>, 
String> {
+static class WriteTables extends DoFn>, 
String> {
   private final boolean singlePartition;
   private final BigQueryServices bqServices;
   private final String jobIdToken;
@@ -2044,7 +2043,7 @@ public class BigQueryIO {
 this.createDisposition = createDisposition;
   }
 
-  @Override
+  @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
 List partition = 
Lists.newArrayList(c.element().getValue()).get(0);
 String jobIdPrefix = 

[10/13] incubator-beam git commit: Port Flink fork of examples to new DoFn

2016-08-08 Thread kenn
Port Flink fork of examples to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/87313f1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/87313f1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/87313f1c

Branch: refs/heads/master
Commit: 87313f1c3d8cf874e04aaf528161478afa030f38
Parents: ae1f6d1
Author: Kenneth Knowles 
Authored: Fri Aug 5 12:24:24 2016 -0700
Committer: Kenneth Knowles 
Committed: Mon Aug 8 11:35:17 2016 -0700

--
 .../beam/runners/flink/examples/TFIDF.java  | 28 +++
 .../beam/runners/flink/examples/WordCount.java  |  5 +--
 .../flink/examples/streaming/AutoComplete.java  | 37 ++--
 .../flink/examples/streaming/JoinExamples.java  | 14 
 .../examples/streaming/KafkaIOExamples.java |  7 ++--
 .../KafkaWindowedWordCountExample.java  | 10 +++---
 .../examples/streaming/WindowedWordCount.java   | 10 +++---
 7 files changed, 57 insertions(+), 54 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87313f1c/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
--
diff --git 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index 716c8ad..4deca12 100644
--- 
a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ 
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Keys;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -230,10 +230,10 @@ public class TFIDF {
   // Create a collection of pairs mapping a URI to each
   // of the words in the document associated with that that URI.
   PCollection> uriToWords = uriToContent
-  .apply("SplitWords", ParDo.of(new OldDoFn, KV>() {
+  .apply("SplitWords", ParDo.of(new DoFn, KV>() {
 private static final long serialVersionUID = 0;
 
-@Override
+@ProcessElement
 public void processElement(ProcessContext c) {
   URI uri = c.element().getKey();
   String line = c.element().getValue();
@@ -275,10 +275,10 @@ public class TFIDF {
   // by the URI key.
   PCollection>> uriToWordAndCount = 
uriAndWordToCount
   .apply("ShiftKeys", ParDo.of(
-  new OldDoFn, Long>, KV>>() {
+  new DoFn, Long>, KV>>() 
{
 private static final long serialVersionUID = 0;
 
-@Override
+@ProcessElement
 public void processElement(ProcessContext c) {
   URI uri = c.element().getKey().getKey();
   String word = c.element().getKey().getValue();
@@ -316,10 +316,10 @@ public class TFIDF {
   // divided by the total number of words in the document.
   PCollection>> wordToUriAndTf = 
uriToWordAndCountAndTotal
   .apply("ComputeTermFrequencies", ParDo.of(
-  new OldDoFn, KV>>() 
{
+  new DoFn, KV>>() {
 private static final long serialVersionUID = 0;
 
-@Override
+@ProcessElement
 public void processElement(ProcessContext c) {
   URI uri = c.element().getKey();
   Long wordTotal = 
c.element().getValue().getOnly(wordTotalsTag);
@@ -339,14 +339,14 @@ public class TFIDF {
   // documents in which the word appears divided by the total
   // number of documents in the corpus. Note how the total number of
   // documents is passed as a side input; the same value is
-  // presented to each invocation of the OldDoFn.
+  // presented to each invocation of the DoFn.
   PCollection> wordToDf = wordToDocCount
   .apply("ComputeDocFrequencies", ParDo
   .withSideInputs(totalDocuments)
-  .of(new 

[05/13] incubator-beam git commit: Port Reshuffle to new DoFn

2016-08-08 Thread kenn
Port Reshuffle to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ecf21a5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ecf21a5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ecf21a5c

Branch: refs/heads/master
Commit: ecf21a5cc177c39e515e4c78e16b579ac298c999
Parents: d798413
Author: Kenneth Knowles 
Authored: Fri Aug 5 11:47:23 2016 -0700
Committer: Kenneth Knowles 
Committed: Mon Aug 8 11:35:17 2016 -0700

--
 .../core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ecf21a5c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index 66c7cc0..ad33a25 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.sdk.util;
 
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -70,8 +70,8 @@ public class Reshuffle extends 
PTransform>, PCollecti
 // set allowed lateness.
 .setWindowingStrategyInternal(originalStrategy)
 .apply("ExpandIterable", ParDo.of(
-new OldDoFn, KV>() {
-  @Override
+new DoFn, KV>() {
+  @ProcessElement
   public void processElement(ProcessContext c) {
 K key = c.element().getKey();
 for (V value : c.element().getValue()) {



[01/13] incubator-beam git commit: Port various Spark runner tests to new DoFn

2016-08-08 Thread kenn
Repository: incubator-beam
Updated Branches:
  refs/heads/master bb00810ad -> 574c3777d


Port various Spark runner tests to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f5df3583
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f5df3583
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f5df3583

Branch: refs/heads/master
Commit: f5df358320cfde6a1c4d012d4169af691f6a18e9
Parents: d6395e9
Author: Kenneth Knowles 
Authored: Fri Aug 5 12:31:07 2016 -0700
Committer: Kenneth Knowles 
Committed: Mon Aug 8 11:35:17 2016 -0700

--
 .../apache/beam/runners/spark/TfIdfTest.java| 22 ++--
 .../spark/translation/CombinePerKeyTest.java|  6 +++---
 .../translation/MultiOutputWordCountTest.java   | 10 -
 .../spark/translation/SerializationTest.java| 10 -
 .../streaming/KafkaStreamingTest.java   |  6 +++---
 5 files changed, 27 insertions(+), 27 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f5df3583/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
--
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
index 074e6aa..17bf6dd 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java
@@ -24,8 +24,8 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Keys;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.RemoveDuplicates;
@@ -101,8 +101,8 @@ public class TfIdfTest {
   // of the words in the document associated with that that URI.
   PCollection> uriToWords = uriToContent
   .apply("SplitWords", ParDo.of(
-  new OldDoFn, KV>() {
-@Override
+  new DoFn, KV>() {
+@ProcessElement
 public void processElement(ProcessContext c) {
   URI uri = c.element().getKey();
   String line = c.element().getValue();
@@ -144,8 +144,8 @@ public class TfIdfTest {
   // by the URI key.
   PCollection>> uriToWordAndCount = 
uriAndWordToCount
   .apply("ShiftKeys", ParDo.of(
-  new OldDoFn, Long>, KV>>() {
-@Override
+  new DoFn, Long>, KV>>() 
{
+@ProcessElement
 public void processElement(ProcessContext c) {
   URI uri = c.element().getKey().getKey();
   String word = c.element().getKey().getValue();
@@ -183,8 +183,8 @@ public class TfIdfTest {
   // divided by the total number of words in the document.
   PCollection>> wordToUriAndTf = 
uriToWordAndCountAndTotal
   .apply("ComputeTermFrequencies", ParDo.of(
-  new OldDoFn, KV>>() 
{
-@Override
+  new DoFn, KV>>() {
+@ProcessElement
 public void processElement(ProcessContext c) {
   URI uri = c.element().getKey();
   Long wordTotal = 
c.element().getValue().getOnly(wordTotalsTag);
@@ -208,8 +208,8 @@ public class TfIdfTest {
   PCollection> wordToDf = wordToDocCount
   .apply("ComputeDocFrequencies", ParDo
   .withSideInputs(totalDocuments)
-  .of(new OldDoFn, KV>() {
-@Override
+  .of(new DoFn, KV>() {
+@ProcessElement
 public void processElement(ProcessContext c) {
   String word = c.element().getKey();
   Long documentCount = c.element().getValue();
@@ -237,8 +237,8 @@ public class TfIdfTest {
   // divided by the log of the document frequency.
   return wordToUriAndTfAndDf
   .apply("ComputeTfIdf", ParDo.of(
-  new OldDoFn, KV

[07/13] incubator-beam git commit: Port Write to new DoFn

2016-08-08 Thread kenn
Port Write to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/86291de3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/86291de3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/86291de3

Branch: refs/heads/master
Commit: 86291de39772765f4d6d404ac8a8430d8ad8a15f
Parents: 2c6aaf7
Author: Kenneth Knowles 
Authored: Fri Aug 5 11:49:37 2016 -0700
Committer: Kenneth Knowles 
Committed: Mon Aug 8 11:35:17 2016 -0700

--
 .../main/java/org/apache/beam/sdk/io/Write.java | 26 ++--
 .../java/org/apache/beam/sdk/io/WriteTest.java  | 22 ++---
 2 files changed, 26 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/86291de3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
--
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index 3e997b0..a846b7c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -27,8 +27,8 @@ import org.apache.beam.sdk.io.Sink.WriteOperation;
 import org.apache.beam.sdk.io.Sink.Writer;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -156,7 +156,7 @@ public class Write {
  * Writes all the elements in a bundle using a {@link Writer} produced by 
the
  * {@link WriteOperation} associated with the {@link Sink}.
  */
-private class WriteBundles extends OldDoFn {
+private class WriteBundles extends DoFn {
   // Writer that will write the records in this bundle. Lazily
   // initialized in processElement.
   private Writer writer = null;
@@ -166,7 +166,7 @@ public class Write {
 this.writeOperationView = writeOperationView;
   }
 
-  @Override
+  @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
 // Lazily initialize the Writer
 if (writer == null) {
@@ -182,7 +182,7 @@ public class Write {
   // Discard write result and close the write.
   try {
 writer.close();
-// The writer does not need to be reset, as this OldDoFn cannot be 
reused.
+// The writer does not need to be reset, as this DoFn cannot be 
reused.
   } catch (Exception closeException) {
 if (closeException instanceof InterruptedException) {
   // Do not silently ignore interrupted state.
@@ -195,7 +195,7 @@ public class Write {
 }
   }
 
-  @Override
+  @FinishBundle
   public void finishBundle(Context c) throws Exception {
 if (writer != null) {
   WriteT result = writer.close();
@@ -217,14 +217,14 @@ public class Write {
  *
  * @see WriteBundles
  */
-private class WriteShardedBundles extends OldDoFn, WriteT> {
+private class WriteShardedBundles extends DoFn, WriteT> {
   private final PCollectionView> 
writeOperationView;
 
   WriteShardedBundles(PCollectionView> 
writeOperationView) {
 this.writeOperationView = writeOperationView;
   }
 
-  @Override
+  @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
 // In a sharded write, single input element represents one shard. We 
can open and close
 // the writer in each call to processElement.
@@ -296,8 +296,8 @@ public class Write {
  * This singleton collection containing the WriteOperation is then used 
as a side input to a
  * ParDo over the PCollection of elements to write. In this bundle-writing 
phase,
  * {@link WriteOperation#createWriter} is called to obtain a {@link 
Writer}.
- * {@link Writer#open} and {@link Writer#close} are called in {@link 
OldDoFn#startBundle} and
- * {@link OldDoFn#finishBundle}, respectively, and {@link Writer#write} 
method is called for
+ * {@link Writer#open} and {@link Writer#close} are called in {@link 
DoFn#startBundle} and
+ * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} 
method is called for
  * every element in the bundle. The output of this ParDo is a PCollection 

[08/13] incubator-beam git commit: Remove references to OldDoFn from DoFnTest

2016-08-08 Thread kenn
Remove references to OldDoFn from DoFnTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/32f84bb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/32f84bb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/32f84bb4

Branch: refs/heads/master
Commit: 32f84bb4d442a9b2060a4a5fbdc77d14d6b0976b
Parents: 86291de
Author: Kenneth Knowles 
Authored: Fri Aug 5 11:54:57 2016 -0700
Committer: Kenneth Knowles 
Committed: Mon Aug 8 11:35:17 2016 -0700

--
 .../apache/beam/sdk/transforms/DoFnTest.java| 29 +---
 1 file changed, 1 insertion(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32f84bb4/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
index c7e8972..710e4ce 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
@@ -22,10 +22,6 @@ import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.testing.NeedsRunner;
@@ -140,29 +136,6 @@ public class DoFnTest implements Serializable {
   }
 
   @Test
-  public void testDoFnWithContextUsingAggregators() {
-NoOpOldDoFn noOpFn = new NoOpOldDoFn<>();
-OldDoFn.Context context = noOpFn.context();
-
-OldDoFn fn = spy(noOpFn);
-context = spy(context);
-
-@SuppressWarnings("unchecked")
-Aggregator agg = mock(Aggregator.class);
-
-Sum.SumLongFn combiner = new Sum.SumLongFn();
-Aggregator delegateAggregator =
-fn.createAggregator("test", combiner);
-
-when(context.createAggregatorInternal("test", combiner)).thenReturn(agg);
-
-context.setupDelegateAggregators();
-delegateAggregator.addValue(1L);
-
-verify(agg).addValue(1L);
-  }
-
-  @Test
   public void testDefaultPopulateDisplayDataImplementation() {
 DoFn fn = new DoFn() {
 };
@@ -225,7 +198,7 @@ public class DoFnTest implements Serializable {
   }
 
   /**
-   * Initialize a test pipeline with the specified {@link OldDoFn}.
+   * Initialize a test pipeline with the specified {@link DoFn}.
*/
   private  TestPipeline createTestPipeline(DoFn fn) {
 TestPipeline pipeline = TestPipeline.create();



[12/13] incubator-beam git commit: Port Flink integration tests to new DoFn

2016-08-08 Thread kenn
Port Flink integration tests to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ae1f6d18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ae1f6d18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ae1f6d18

Branch: refs/heads/master
Commit: ae1f6d181ebe3c0bdffc35c833a6fdc858937d6c
Parents: 879f18f
Author: Kenneth Knowles 
Authored: Fri Aug 5 12:17:20 2016 -0700
Committer: Kenneth Knowles 
Committed: Mon Aug 8 11:35:17 2016 -0700

--
 .../java/org/apache/beam/runners/flink/ReadSourceITCase.java | 6 +++---
 .../apache/beam/runners/flink/ReadSourceStreamingITCase.java | 8 +---
 2 files changed, 8 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ae1f6d18/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
--
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
index ca70096..516c7ba 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.flink;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -72,8 +72,8 @@ public class ReadSourceITCase extends JavaProgramTestBase {
 
 PCollection result = p
 .apply(CountingInput.upTo(10))
-.apply(ParDo.of(new OldDoFn() {
-  @Override
+.apply(ParDo.of(new DoFn() {
+  @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
 c.output(c.element().toString());
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ae1f6d18/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
--
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
index bc69f34..ea58d0d 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
@@ -20,9 +20,11 @@ package org.apache.beam.runners.flink;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+
 import com.google.common.base.Joiner;
+
 import org.apache.flink.streaming.util.StreamingProgramTestBase;
 
 /**
@@ -59,8 +61,8 @@ public class ReadSourceStreamingITCase extends 
StreamingProgramTestBase {
 
 p
   .apply(CountingInput.upTo(10))
-  .apply(ParDo.of(new OldDoFn() {
-  @Override
+  .apply(ParDo.of(new DoFn() {
+  @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
 c.output(c.element().toString());
   }



[04/13] incubator-beam git commit: Port Filter to the new DoFn

2016-08-08 Thread kenn
Port Filter to the new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d798413b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d798413b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d798413b

Branch: refs/heads/master
Commit: d798413be41fa5941d12049d899aa6ad970b8515
Parents: 7629f97
Author: Kenneth Knowles 
Authored: Fri Aug 5 11:46:53 2016 -0700
Committer: Kenneth Knowles 
Committed: Mon Aug 8 11:35:17 2016 -0700

--
 .../src/main/java/org/apache/beam/sdk/transforms/Filter.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d798413b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
index 37cbec1..2d9bdee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
@@ -202,8 +202,8 @@ public class Filter extends PTransform {
 
   @Override
   public PCollection apply(PCollection input) {
-return input.apply(ParDo.of(new OldDoFn() {
-  @Override
+return input.apply(ParDo.of(new DoFn() {
+  @ProcessElement
   public void processElement(ProcessContext c) {
 if (predicate.apply(c.element())) {
   c.output(c.element());



[03/13] incubator-beam git commit: Port most of Combine to new DoFn

2016-08-08 Thread kenn
Port most of Combine to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/331f5234
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/331f5234
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/331f5234

Branch: refs/heads/master
Commit: 331f523461094af666a20bd97e1e15f1dec3feba
Parents: b1db02d
Author: Kenneth Knowles 
Authored: Fri Aug 5 12:11:11 2016 -0700
Committer: Kenneth Knowles 
Committed: Mon Aug 8 11:35:17 2016 -0700

--
 .../org/apache/beam/sdk/transforms/Combine.java | 20 ++--
 1 file changed, 10 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/331f5234/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 6fc2324..a825800 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -1473,9 +1473,9 @@ public class Combine {
   PCollection defaultIfEmpty = maybeEmpty.getPipeline()
   .apply("CreateVoid", Create.of((Void) 
null).withCoder(VoidCoder.of()))
   .apply("ProduceDefault", ParDo.withSideInputs(maybeEmptyView).of(
-  new OldDoFn() {
-@Override
-public void processElement(OldDoFn.ProcessContext c) {
+  new DoFn() {
+@ProcessElement
+public void processElement(ProcessContext c) {
   Iterator combined = 
c.sideInput(maybeEmptyView).iterator();
   if (!combined.hasNext()) {
 c.output(defaultValue);
@@ -2097,15 +2097,15 @@ public class Combine {
   final TupleTag, InputT>> hot = new TupleTag<>();
   final TupleTag> cold = new TupleTag<>();
   PCollectionTuple split = input.apply("AddNonce", ParDo.of(
-  new OldDoFn, KV>() {
+  new DoFn, KV>() {
 transient int counter;
-@Override
+@StartBundle
 public void startBundle(Context c) {
   counter = ThreadLocalRandom.current().nextInt(
   Integer.MAX_VALUE);
 }
 
-@Override
+@ProcessElement
 public void processElement(ProcessContext c) {
   KV kv = c.element();
   int spread = Math.max(1, hotKeyFanout.apply(kv.getKey()));
@@ -2135,9 +2135,9 @@ public class Combine {
   .setWindowingStrategyInternal(preCombineStrategy)
   .apply("PreCombineHot", Combine.perKey(hotPreCombine))
   .apply("StripNonce", ParDo.of(
-  new OldDoFn, AccumT>,
+  new DoFn, AccumT>,
  KV>>() {
-@Override
+@ProcessElement
 public void processElement(ProcessContext c) {
   c.output(KV.of(
   c.element().getKey().getKey(),
@@ -2151,8 +2151,8 @@ public class Combine {
   .get(cold)
   .setCoder(inputCoder)
   .apply("PrepareCold", ParDo.of(
-  new OldDoFn, KV>>() {
-@Override
+  new DoFn, KV>>() {
+@ProcessElement
 public void processElement(ProcessContext c) {
   c.output(KV.of(c.element().getKey(),
  InputOrAccum.input(c.element().getValue(;



[11/13] incubator-beam git commit: Port ViewTest to new DoFn

2016-08-08 Thread kenn
Port ViewTest to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b1db02d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b1db02d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b1db02d2

Branch: refs/heads/master
Commit: b1db02d23f9454ff1a169d0aa81552e8dbe59fe3
Parents: 32f84bb
Author: Kenneth Knowles 
Authored: Fri Aug 5 12:07:28 2016 -0700
Committer: Kenneth Knowles 
Committed: Mon Aug 8 11:35:17 2016 -0700

--
 .../apache/beam/sdk/transforms/ViewTest.java| 192 ++-
 1 file changed, 97 insertions(+), 95 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1db02d2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index ee240bf..170e6ce 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static org.apache.beam.sdk.values.KV.of;
+
 import static com.google.common.base.Preconditions.checkArgument;
 
 import static org.hamcrest.Matchers.isA;
@@ -100,8 +102,8 @@ public class ViewTest implements Serializable {
 PCollection output =
 pipeline.apply("Create123", Create.of(1, 2, 3))
 .apply("OutputSideInputs",
-ParDo.withSideInputs(view).of(new OldDoFn() {
-  @Override
+ParDo.withSideInputs(view).of(new DoFn() {
+  @ProcessElement
   public void processElement(ProcessContext c) {
 c.output(c.sideInput(view));
   }
@@ -131,8 +133,8 @@ public class ViewTest implements Serializable {
 TimestampedValue.of(3, new Instant(12
 .apply("MainWindowInto", 
Window.into(FixedWindows.of(Duration.millis(10
 .apply("OutputSideInputs",
-ParDo.withSideInputs(view).of(new OldDoFn() {
-  @Override
+ParDo.withSideInputs(view).of(new DoFn() {
+  @ProcessElement
   public void processElement(ProcessContext c) {
 c.output(c.sideInput(view));
   }
@@ -153,8 +155,8 @@ public class ViewTest implements Serializable {
 .apply(View.asSingleton());
 
 pipeline.apply("Create123", Create.of(1, 2, 3))
-.apply("OutputSideInputs", ParDo.withSideInputs(view).of(new 
OldDoFn() {
-  @Override
+.apply("OutputSideInputs", ParDo.withSideInputs(view).of(new 
DoFn() {
+  @ProcessElement
   public void processElement(ProcessContext c) {
 c.output(c.sideInput(view));
   }
@@ -178,8 +180,8 @@ public class ViewTest implements Serializable {
 final PCollectionView view = 
oneTwoThree.apply(View.asSingleton());
 
 oneTwoThree.apply(
-"OutputSideInputs", ParDo.withSideInputs(view).of(new OldDoFn() {
-  @Override
+"OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn() {
+  @ProcessElement
   public void processElement(ProcessContext c) {
 c.output(c.sideInput(view));
   }
@@ -205,8 +207,8 @@ public class ViewTest implements Serializable {
 PCollection output =
 pipeline.apply("CreateMainInput", Create.of(29, 31))
 .apply("OutputSideInputs",
-ParDo.withSideInputs(view).of(new OldDoFn() {
-  @Override
+ParDo.withSideInputs(view).of(new DoFn() {
+  @ProcessElement
   public void processElement(ProcessContext c) {
 checkArgument(c.sideInput(view).size() == 4);
 checkArgument(c.sideInput(view).get(0) == 
c.sideInput(view).get(0));
@@ -246,8 +248,8 @@ public class ViewTest implements Serializable {
 .apply("MainWindowInto", 
Window.into(FixedWindows.of(Duration.millis(10
 .apply(
 "OutputSideInputs",
-ParDo.withSideInputs(view).of(new OldDoFn() {
-  @Override
+ParDo.withSideInputs(view).of(new DoFn() {
+  @ProcessElement
   public void processElement(ProcessContext c) 

[06/13] incubator-beam git commit: Port Window transform to new DoFn

2016-08-08 Thread kenn
Port Window transform to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2c6aaf73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2c6aaf73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2c6aaf73

Branch: refs/heads/master
Commit: 2c6aaf730353c4db12aea60fd89851bddec0415c
Parents: ecf21a5
Author: Kenneth Knowles 
Authored: Fri Aug 5 11:47:57 2016 -0700
Committer: Kenneth Knowles 
Committed: Mon Aug 8 11:35:17 2016 -0700

--
 .../java/org/apache/beam/sdk/transforms/windowing/Window.java | 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c6aaf73/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 5b6f4c8..c1b0237 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -21,8 +21,8 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -645,8 +645,9 @@ public class Window {
   // We first apply a (trivial) transform to the input PCollection to 
produce a new
   // PCollection. This ensures that we don't modify the windowing 
strategy of the input
   // which may be used elsewhere.
-  .apply("Identity", ParDo.of(new OldDoFn() {
-@Override public void processElement(ProcessContext c) {
+  .apply("Identity", ParDo.of(new DoFn() {
+@ProcessElement
+public void processElement(ProcessContext c) {
   c.output(c.element());
 }
   }))



[GitHub] incubator-beam pull request #759: Add display data to SimpleFunction

2016-08-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/759


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: Add display data to SimpleFunction

2016-08-08 Thread kenn
Repository: incubator-beam
Updated Branches:
  refs/heads/master 7629f972c -> bb00810ad


Add display data to SimpleFunction


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4753f447
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4753f447
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4753f447

Branch: refs/heads/master
Commit: 4753f447f04e359716107ecf11e2e5286c42ac69
Parents: 2b5c6bc
Author: Kenneth Knowles 
Authored: Fri Jul 29 10:23:28 2016 -0700
Committer: Kenneth Knowles 
Committed: Fri Aug 5 10:19:21 2016 -0700

--
 .../beam/sdk/transforms/FlatMapElements.java |  2 +-
 .../apache/beam/sdk/transforms/MapElements.java  |  2 +-
 .../beam/sdk/transforms/SimpleFunction.java  | 14 +-
 .../beam/sdk/transforms/FlatMapElementsTest.java | 19 +++
 .../beam/sdk/transforms/MapElementsTest.java |  8 
 5 files changed, 42 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4753f447/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index 04d993c..6f9e3d8 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -167,7 +167,7 @@ extends PTransform {
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
 super.populateDisplayData(builder);
-builder.add(fnClassDisplayData);
+builder.include(fn).add(fnClassDisplayData);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4753f447/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index 429d3fc..17ad6e7 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -133,6 +133,6 @@ extends PTransform {
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
 super.populateDisplayData(builder);
-builder.add(fnClassDisplayData);
+builder.include(fn).add(fnClassDisplayData);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4753f447/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
index 6c540cc..bf075f8 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
@@ -25,7 +27,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * {@link org.apache.beam.sdk.coders.Coder Coder} inference.
  */
 public abstract class SimpleFunction
-implements SerializableFunction {
+implements SerializableFunction, HasDisplayData {
 
   public static 
   SimpleFunction fromSerializableFunctionWithOutputType(
@@ -60,6 +62,15 @@ public abstract class SimpleFunction
   }
 
   /**
+* {@inheritDoc}
+*
+* By default, does not register any display data. Implementors may 
override this method
+* to provide their own display data.
+*/
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {}
+
+  /**
* A {@link SimpleFunction} built from a {@link SerializableFunction}, having
* a known output type that is explicitly set.
*/
@@ -76,6 +87,7 @@ public abstract class SimpleFunction
   this.outputType = outputType;
 }
 
+
 @Override
 public OutputT apply(InputT input) {
   return fn.apply(input);


[GitHub] incubator-beam-site pull request #36: Revise Beam programming guide for new ...

2016-08-08 Thread kennknowles
GitHub user kennknowles opened a pull request:

https://github.com/apache/incubator-beam-site/pull/36

Revise Beam programming guide for new DoFn

R: @francesperry 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kennknowles/incubator-beam-site new-DoFn

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam-site/pull/36.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #36


commit 303864a311abca93170ee1693a42a5e265e37a35
Author: Kenneth Knowles 
Date:   2016-08-08T17:09:43Z

Revise Beam programming guide for new DoFn

commit a44ffa576f73676060a99953f81a91f92cd300dc
Author: Kenneth Knowles 
Date:   2016-08-08T17:19:41Z

Rebuild affected files after source change




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #693: [BEAM-473] Fix stats querying for Datastor...

2016-08-08 Thread vikkyrk
Github user vikkyrk closed the pull request at:

https://github.com/apache/incubator-beam/pull/693


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-537) Create Apache Bigtop Beam packages

2016-08-08 Thread JIRA

[ 
https://issues.apache.org/jira/browse/BEAM-537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15411434#comment-15411434
 ] 

Ismaël Mejía commented on BEAM-537:
---

I don't know how many distribution related issues, we have, but probably we 
should tag this one and the others in a new Category (I don't know the exact 
name in JIRA-speak).

> Create Apache Bigtop Beam packages
> --
>
> Key: BEAM-537
> URL: https://issues.apache.org/jira/browse/BEAM-537
> Project: Beam
>  Issue Type: Wish
>Reporter: Ismaël Mejía
>Priority: Minor
>
> Considering that Apache Bigtop is a well tested and used framework to create 
> Big Data distributions (e.g. Cloudera, EMR, etc) , it is probably a good idea 
> to provide packages with ready to go Beam packages. This is probably more of 
> an Issue for Bigtop, but I create this issue for reference, and to document 
> any fix or PR towards this goal.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)