[1/3] incubator-beam git commit: Add Parameters to finishSpecifying

2016-12-21 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 57d9bbd79 -> 7984fe3fc


Add Parameters to finishSpecifying

Remove the need to use getProducingTransformInternal in TypedPValue.

Ensure that all nodes are finished specifying before a call to
TransformHierarchy#visit. This ensures that all nodes are fully
specified without requiring the Pipeline or Runner to do so explicitly.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/038950df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/038950df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/038950df

Branch: refs/heads/master
Commit: 038950df02fa553cbb91f57978efe125a9ebc80f
Parents: b053be4
Author: Thomas Groh 
Authored: Thu Dec 8 14:33:36 2016 -0800
Committer: Thomas Groh 
Committed: Wed Dec 21 15:26:17 2016 -0800

--
 .../translation/ParDoBoundTranslatorTest.java   | 32 
 .../beam/runners/direct/DirectGraphVisitor.java | 21 --
 .../beam/runners/direct/DirectRunner.java   |  1 -
 .../runners/direct/DirectGraphVisitorTest.java  | 32 +---
 .../direct/FlattenEvaluatorFactoryTest.java |  2 +
 .../direct/KeyedPValueTrackingVisitorTest.java  | 17 -
 .../beam/runners/spark/ForceStreamingTest.java  |  2 -
 .../main/java/org/apache/beam/sdk/Pipeline.java |  3 +
 .../beam/sdk/runners/TransformHierarchy.java| 45 ++-
 .../transforms/join/KeyedPCollectionTuple.java  | 32 
 .../java/org/apache/beam/sdk/values/PBegin.java |  5 --
 .../apache/beam/sdk/values/PCollectionList.java | 13 +---
 .../beam/sdk/values/PCollectionTuple.java   | 13 +---
 .../java/org/apache/beam/sdk/values/PInput.java |  9 ---
 .../org/apache/beam/sdk/values/POutput.java | 20 ++---
 .../beam/sdk/values/POutputValueBase.java   |  4 +-
 .../java/org/apache/beam/sdk/values/PValue.java | 15 
 .../org/apache/beam/sdk/values/PValueBase.java  |  3 +-
 .../org/apache/beam/sdk/values/TypedPValue.java | 78 +++-
 .../sdk/runners/TransformHierarchyTest.java | 34 +
 .../apache/beam/sdk/transforms/ParDoTest.java   |  7 +-
 .../apache/beam/sdk/values/TypedPValueTest.java |  7 +-
 22 files changed, 185 insertions(+), 210 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
--
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
index fa94b2a..f88a94d 100644
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
@@ -286,21 +287,22 @@ public class ParDoBoundTranslatorTest {
 Arrays.asList(sideInput1, sideInput2),
 Arrays.asList(;
 
- outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector()));
- ApexRunnerResult result = (ApexRunnerResult) pipeline.run();
-
- HashSet expected = Sets.newHashSet("processing: 3: [11, 222]",
- "processing: -42: [11, 222]", "processing: 666: [11, 222]");
- long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
- while (System.currentTimeMillis() < timeout) {
-   if (EmbeddedCollector.RESULTS.containsAll(expected)) {
- break;
-   }
-   LOG.info("Waiting for expected results.");
-   Thread.sleep(SLEEP_MILLIS);
- }
- result.cancel();
- Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
+outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector()));
+outputs.get(sideOutputTag).setCoder(VoidCoder.of());
+ApexRunnerResult result = (ApexRunnerResult) pipeline.run();
+
+HashSet expected = Sets.newHashSet("processing: 3: [11, 222]",
+"processing: -42: [11, 222]", "processing: 666: [11, 222]");
+long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
+while (System.currentTimeMillis() < timeout) {
+  if (EmbeddedCollector.RESULTS.containsAll(expected)) {
+break;
+  }
+  LOG.info("Waiting for expected 

[2/3] incubator-beam git commit: Use CountingSource in ForceStreamingTest

2016-12-21 Thread tgroh
Use CountingSource in ForceStreamingTest

Removes the requirement to have a FakeUnboundedSource, plus the read is
fully specified.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b053be46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b053be46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b053be46

Branch: refs/heads/master
Commit: b053be460c2e6ff486faed1b1a0996af63f93db2
Parents: 57d9bbd
Author: Thomas Groh 
Authored: Tue Dec 20 14:23:21 2016 -0800
Committer: Thomas Groh 
Committed: Wed Dec 21 15:26:17 2016 -0800

--
 .../beam/runners/spark/ForceStreamingTest.java  | 39 +---
 1 file changed, 2 insertions(+), 37 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b053be46/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
--
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
index eb17eea..1b2ff08 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
@@ -23,10 +23,9 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import java.io.IOException;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
+import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
@@ -59,7 +58,7 @@ public class ForceStreamingTest {
 // apply the BoundedReadFromUnboundedSource.
 @SuppressWarnings("unchecked")
 BoundedReadFromUnboundedSource boundedRead =
-Read.from(new FakeUnboundedSource()).withMaxNumRecords(-1);
+Read.from(CountingSource.unbounded()).withMaxNumRecords(-1);
 //noinspection unchecked
 pipeline.apply(boundedRead);
 
@@ -86,38 +85,4 @@ public class ForceStreamingTest {
 }
 
   }
-
-  /**
-   * A fake {@link UnboundedSource} to satisfy the compiler.
-   */
-  private static class FakeUnboundedSource extends UnboundedSource {
-
-@Override
-public List generateInitialSplits(
-int desiredNumSplits,
-PipelineOptions options) throws Exception {
-  return null;
-}
-
-@Override
-public UnboundedReader createReader(
-PipelineOptions options,
-CheckpointMark checkpointMark) throws IOException {
-  return null;
-}
-
-@Override
-public Coder getCheckpointMarkCoder() {
-  return null;
-}
-
-@Override
-public void validate() { }
-
-@Override
-public Coder getDefaultOutputCoder() {
-  return null;
-}
-  }
-
 }



[3/3] incubator-beam git commit: This closes #1582

2016-12-21 Thread tgroh
This closes #1582


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7984fe3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7984fe3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7984fe3f

Branch: refs/heads/master
Commit: 7984fe3fc20160d2286433434190f35658aef158
Parents: 57d9bbd 038950d
Author: Thomas Groh 
Authored: Wed Dec 21 15:26:18 2016 -0800
Committer: Thomas Groh 
Committed: Wed Dec 21 15:26:18 2016 -0800

--
 .../translation/ParDoBoundTranslatorTest.java   | 32 
 .../beam/runners/direct/DirectGraphVisitor.java | 21 --
 .../beam/runners/direct/DirectRunner.java   |  1 -
 .../runners/direct/DirectGraphVisitorTest.java  | 32 +---
 .../direct/FlattenEvaluatorFactoryTest.java |  2 +
 .../direct/KeyedPValueTrackingVisitorTest.java  | 17 -
 .../beam/runners/spark/ForceStreamingTest.java  | 41 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java |  3 +
 .../beam/sdk/runners/TransformHierarchy.java| 45 ++-
 .../transforms/join/KeyedPCollectionTuple.java  | 32 
 .../java/org/apache/beam/sdk/values/PBegin.java |  5 --
 .../apache/beam/sdk/values/PCollectionList.java | 13 +---
 .../beam/sdk/values/PCollectionTuple.java   | 13 +---
 .../java/org/apache/beam/sdk/values/PInput.java |  9 ---
 .../org/apache/beam/sdk/values/POutput.java | 20 ++---
 .../beam/sdk/values/POutputValueBase.java   |  4 +-
 .../java/org/apache/beam/sdk/values/PValue.java | 15 
 .../org/apache/beam/sdk/values/PValueBase.java  |  3 +-
 .../org/apache/beam/sdk/values/TypedPValue.java | 78 +++-
 .../sdk/runners/TransformHierarchyTest.java | 34 +
 .../apache/beam/sdk/transforms/ParDoTest.java   |  7 +-
 .../apache/beam/sdk/values/TypedPValueTest.java |  7 +-
 22 files changed, 187 insertions(+), 247 deletions(-)
--




[GitHub] incubator-beam pull request #1677: Add a Test for Flatten with Heterogeneous...

2016-12-21 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1677

Add a Test for Flatten with Heterogeneous Coders

Add a category, and suppress in the Flink and Apex runners

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam flatten_multiple_coders

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1677.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1677


commit 13a8da998c9c03a8ccf914e1dcc16c2aed038930
Author: Thomas Groh <tg...@google.com>
Date:   2016-12-21T21:53:48Z

Add a Test for Flatten with Heterogeneous Coders

Add a category, and suppress in the Flink and Apex runners




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: This closes #1569

2016-12-20 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 6a05d7f17 -> aadcf3a12


This closes #1569


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aadcf3a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aadcf3a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aadcf3a1

Branch: refs/heads/master
Commit: aadcf3a1203b257961a1a474acf74e6bbca1e2ad
Parents: 6a05d7f 34373c2
Author: Thomas Groh 
Authored: Tue Dec 20 15:18:55 2016 -0800
Committer: Thomas Groh 
Committed: Tue Dec 20 15:18:55 2016 -0800

--
 .../beam/runners/direct/DirectGraphVisitor.java |  18 +--
 .../beam/runners/direct/EvaluationContext.java  |   7 +-
 .../direct/KeyedPValueTrackingVisitor.java  |  16 ++-
 .../beam/runners/direct/WatermarkManager.java   |  19 +--
 .../apache/beam/runners/spark/SparkRunner.java  |  13 ++-
 .../beam/sdk/runners/TransformHierarchy.java|  49 
 .../transforms/join/KeyedPCollectionTuple.java  |   9 +-
 .../java/org/apache/beam/sdk/values/PBegin.java |   4 +-
 .../apache/beam/sdk/values/PCollectionList.java |  65 +++
 .../beam/sdk/values/PCollectionTuple.java   |  28 -
 .../java/org/apache/beam/sdk/values/PDone.java  |   4 +-
 .../java/org/apache/beam/sdk/values/PInput.java |   4 +-
 .../org/apache/beam/sdk/values/POutput.java |   4 +-
 .../java/org/apache/beam/sdk/values/PValue.java |  10 ++
 .../org/apache/beam/sdk/values/PValueBase.java  |  11 +-
 .../apache/beam/sdk/values/TaggedPValue.java|  42 +++
 .../sdk/runners/TransformHierarchyTest.java |  23 +++-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  34 ++
 .../beam/sdk/values/PCollectionListTest.java| 117 +++
 .../beam/sdk/values/PCollectionTupleTest.java   |  70 +++
 20 files changed, 449 insertions(+), 98 deletions(-)
--




[2/2] incubator-beam git commit: Provide local tags in PInput, POutput expansions

2016-12-20 Thread tgroh
Provide local tags in PInput, POutput expansions

Output an ordered colleciton in PInput and POutput expansions.

This provides information that is necessary to reconstruct a PInput
or POutput from its expansion.

Implement PCollectionList.equals, PCollectionTuple.equals


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/34373c21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/34373c21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/34373c21

Branch: refs/heads/master
Commit: 34373c21ed67696235d88ef40d50e31c77b84c33
Parents: 6a05d7f
Author: Thomas Groh 
Authored: Tue Dec 6 11:03:52 2016 -0800
Committer: Thomas Groh 
Committed: Tue Dec 20 15:18:55 2016 -0800

--
 .../beam/runners/direct/DirectGraphVisitor.java |  18 +--
 .../beam/runners/direct/EvaluationContext.java  |   7 +-
 .../direct/KeyedPValueTrackingVisitor.java  |  16 ++-
 .../beam/runners/direct/WatermarkManager.java   |  19 +--
 .../apache/beam/runners/spark/SparkRunner.java  |  13 ++-
 .../beam/sdk/runners/TransformHierarchy.java|  49 
 .../transforms/join/KeyedPCollectionTuple.java  |   9 +-
 .../java/org/apache/beam/sdk/values/PBegin.java |   4 +-
 .../apache/beam/sdk/values/PCollectionList.java |  65 +++
 .../beam/sdk/values/PCollectionTuple.java   |  28 -
 .../java/org/apache/beam/sdk/values/PDone.java  |   4 +-
 .../java/org/apache/beam/sdk/values/PInput.java |   4 +-
 .../org/apache/beam/sdk/values/POutput.java |   4 +-
 .../java/org/apache/beam/sdk/values/PValue.java |  10 ++
 .../org/apache/beam/sdk/values/PValueBase.java  |  11 +-
 .../apache/beam/sdk/values/TaggedPValue.java|  42 +++
 .../sdk/runners/TransformHierarchyTest.java |  23 +++-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  34 ++
 .../beam/sdk/values/PCollectionListTest.java| 117 +++
 .../beam/sdk/values/PCollectionTupleTest.java   |  70 +++
 20 files changed, 449 insertions(+), 98 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
index 0283d03..425bbf1 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 
 /**
  * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each 
{@link PValue} in the
@@ -79,14 +80,16 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
 
   @Override
   public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-toFinalize.removeAll(node.getInputs());
+for (TaggedPValue consumed : node.getInputs()) {
+  toFinalize.remove(consumed.getValue());
+}
 AppliedPTransform appliedTransform = getAppliedTransform(node);
 stepNames.put(appliedTransform, genStepName());
 if (node.getInputs().isEmpty()) {
   rootTransforms.add(appliedTransform);
 } else {
-  for (PValue value : node.getInputs()) {
-primitiveConsumers.put(value, appliedTransform);
+  for (TaggedPValue value : node.getInputs()) {
+primitiveConsumers.put(value.getValue(), appliedTransform);
   }
 }
   }
@@ -96,15 +99,12 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
 toFinalize.add(value);
 
 AppliedPTransform appliedTransform = 
getAppliedTransform(producer);
+if (value instanceof PCollectionView) {
+  views.add((PCollectionView) value);
+}
 if (!producers.containsKey(value)) {
   producers.put(value, appliedTransform);
 }
-   if (value instanceof PCollectionView) {
- views.add((PCollectionView) value);
-   }
-   if (!producers.containsKey(value)) {
- producers.put(value, appliedTransform);
-   }
   }
 
   private AppliedPTransform 
getAppliedTransform(TransformHierarchy.Node node) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
--
diff --git 

[1/2] incubator-beam git commit: Add PTransformOverrideFactory to the Core SDK

2016-12-16 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 217c29bfc -> cc28f0cb4


Add PTransformOverrideFactory to the Core SDK

This migrates PTransformOverrideFactory from the DirectRunner to the
Core SDK, as part of BEAM-646.

Add getOriginalToReplacements to provide a mapping from the original
outputs to replaced outputs. This enables all replaced nodes to be
rewired to output the original output.

Migrate all DirectRunner Override Factories to the new
PTransformOverrideFactory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3f227a0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3f227a0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3f227a0a

Branch: refs/heads/master
Commit: 3f227a0ad18f425767e89f88d8a1c9fdcebdd80c
Parents: 217c29b
Author: Thomas Groh 
Authored: Mon Dec 5 16:01:57 2016 -0800
Committer: Thomas Groh 
Committed: Fri Dec 16 14:21:49 2016 -0800

--
 ...ectGBKIntoKeyedWorkItemsOverrideFactory.java |  7 ++--
 .../direct/DirectGroupByKeyOverrideFactory.java |  3 +-
 .../beam/runners/direct/DirectRunner.java   |  7 +++-
 .../direct/PTransformOverrideFactory.java   | 35 -
 .../direct/ParDoMultiOverrideFactory.java   |  9 +++--
 .../ParDoSingleViaMultiOverrideFactory.java | 11 +++---
 .../direct/TestStreamEvaluatorFactory.java  |  5 ++-
 .../runners/direct/ViewEvaluatorFactory.java|  4 +-
 .../direct/WriteWithShardingFactory.java|  6 ++-
 .../direct/WriteWithShardingFactoryTest.java|  4 +-
 .../beam/sdk/annotations/Experimental.java  |  5 ++-
 .../sdk/runners/PTransformOverrideFactory.java  | 41 
 12 files changed, 80 insertions(+), 57 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
index 1fa059c..ab4c114 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
@@ -18,7 +18,8 @@
 package org.apache.beam.runners.direct;
 
 import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.core.SplittableParDo.GBKIntoKeyedWorkItems;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -30,10 +31,10 @@ import org.apache.beam.sdk.values.PCollection;
 class DirectGBKIntoKeyedWorkItemsOverrideFactory
 implements PTransformOverrideFactory<
 PCollection>, PCollection>,
-SplittableParDo.GBKIntoKeyedWorkItems> {
+GBKIntoKeyedWorkItems> {
   @Override
   public PTransform>, 
PCollection>>
-  override(SplittableParDo.GBKIntoKeyedWorkItems transform) {
+  getReplacementTransform(GBKIntoKeyedWorkItems transform) {
 return new DirectGroupByKey.DirectGroupByKeyOnly<>();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3f227a0a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
index 9acf5e9..7cf3256 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.KV;
@@ -27,7 +28,7 @@ final class DirectGroupByKeyOverrideFactory
 implements PTransformOverrideFactory<
 

[2/2] incubator-beam git commit: This closes #1547

2016-12-16 Thread tgroh
This closes #1547


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cc28f0cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cc28f0cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cc28f0cb

Branch: refs/heads/master
Commit: cc28f0cb4c44169f933475ae29a32599024d3a1f
Parents: 217c29b 3f227a0
Author: Thomas Groh 
Authored: Fri Dec 16 14:21:50 2016 -0800
Committer: Thomas Groh 
Committed: Fri Dec 16 14:21:50 2016 -0800

--
 ...ectGBKIntoKeyedWorkItemsOverrideFactory.java |  7 ++--
 .../direct/DirectGroupByKeyOverrideFactory.java |  3 +-
 .../beam/runners/direct/DirectRunner.java   |  7 +++-
 .../direct/PTransformOverrideFactory.java   | 35 -
 .../direct/ParDoMultiOverrideFactory.java   |  9 +++--
 .../ParDoSingleViaMultiOverrideFactory.java | 11 +++---
 .../direct/TestStreamEvaluatorFactory.java  |  5 ++-
 .../runners/direct/ViewEvaluatorFactory.java|  4 +-
 .../direct/WriteWithShardingFactory.java|  6 ++-
 .../direct/WriteWithShardingFactoryTest.java|  4 +-
 .../beam/sdk/annotations/Experimental.java  |  5 ++-
 .../sdk/runners/PTransformOverrideFactory.java  | 41 
 12 files changed, 80 insertions(+), 57 deletions(-)
--




[2/2] incubator-beam git commit: This closes #1625

2016-12-15 Thread tgroh
This closes #1625


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c09fbd43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c09fbd43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c09fbd43

Branch: refs/heads/release-0.4.0-incubating
Commit: c09fbd43b0cdaa57cffc1925c313dcf91390adc7
Parents: 5dcffe0 e864ac2
Author: Thomas Groh 
Authored: Thu Dec 15 11:12:01 2016 -0800
Committer: Thomas Groh 
Committed: Thu Dec 15 11:12:01 2016 -0800

--
 .../main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--




[1/2] incubator-beam git commit: Don't incorrectly log error in MetricsEnvironment

2016-12-15 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/release-0.4.0-incubating 5dcffe066 -> c09fbd43b


Don't incorrectly log error in MetricsEnvironment

Using getCurrentContainer() logs an error if metrics are not supported.
This is because it acts as the common point of access for user code that
reports metrics.

It should not be used within setCurrentContainer(), because the first
container being set will have a null previous-current-container, which
will cause the error to be incorrectly logged.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e864ac21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e864ac21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e864ac21

Branch: refs/heads/release-0.4.0-incubating
Commit: e864ac2141e31a930b85264506d06579ba6811de
Parents: 5dcffe0
Author: bchambers 
Authored: Wed Dec 14 11:23:39 2016 -0800
Committer: Thomas Groh 
Committed: Thu Dec 15 11:12:00 2016 -0800

--
 .../main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e864ac21/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
index 7c06cbf..5d7cb0b 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
@@ -56,7 +56,7 @@ public class MetricsEnvironment {
*/
   @Nullable
   public static MetricsContainer setCurrentContainer(@Nullable 
MetricsContainer container) {
-MetricsContainer previous = getCurrentContainer();
+MetricsContainer previous = CONTAINER_FOR_THREAD.get();
 if (container == null) {
   CONTAINER_FOR_THREAD.remove();
 } else {



[1/2] incubator-beam git commit: [BEAM-1033] Retry Bigquery Verifier when Query Fails

2016-12-14 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 4927cc1ab -> 0b0a1b797


[BEAM-1033] Retry Bigquery Verifier when Query Fails

Update Junit to 4.12


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b626f0e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b626f0e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b626f0e6

Branch: refs/heads/master
Commit: b626f0e627af85b2aa01213587b4130932030166
Parents: 4927cc1
Author: Mark Liu 
Authored: Wed Nov 30 22:20:12 2016 -0800
Committer: Thomas Groh 
Committed: Wed Dec 14 12:44:47 2016 -0800

--
 pom.xml |  2 +-
 .../beam/sdk/testing/BigqueryMatcher.java   | 48 
 .../beam/sdk/testing/BigqueryMatcherTest.java   | 82 ++--
 .../main/resources/archetype-resources/pom.xml  |  2 +-
 .../main/resources/archetype-resources/pom.xml  |  2 +-
 5 files changed, 76 insertions(+), 60 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b626f0e6/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 4faa971..970547d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,7 +123,7 @@
 2.7.2
 3.0.1
 2.4
-4.11
+4.12
 1.9.5
 4.1.3.Final
 1.4.0.Final

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b626f0e6/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
index 9b8589a..8f752c0 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/BigqueryMatcher.java
@@ -39,6 +39,7 @@ import com.google.common.collect.Lists;
 import com.google.common.hash.HashCode;
 import com.google.common.hash.Hashing;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
 import java.util.Collections;
@@ -117,20 +118,23 @@ public class BigqueryMatcher extends 
TypeSafeMatcher
 
   response = queryWithRetries(
   bigqueryClient, queryContent, Sleeper.DEFAULT, 
BACKOFF_FACTORY.backoff());
-} catch (Exception e) {
+} catch (IOException | InterruptedException e) {
+  if (e instanceof InterruptedIOException) {
+Thread.currentThread().interrupt();
+  }
   throw new RuntimeException("Failed to fetch BigQuery data.", e);
 }
 
-// validate BigQuery response
-if (response == null || response.getRows() == null || 
response.getRows().isEmpty()) {
+if (!response.getJobComplete()) {
+  // query job not complete, verification failed
   return false;
-}
-
-// compute checksum
-actualChecksum = generateHash(response.getRows());
-LOG.debug("Generated a SHA1 checksum based on queried data: {}", 
actualChecksum);
+} else {
+  // compute checksum
+  actualChecksum = generateHash(response.getRows());
+  LOG.debug("Generated a SHA1 checksum based on queried data: {}", 
actualChecksum);
 
-return expectedChecksum.equals(actualChecksum);
+  return expectedChecksum.equals(actualChecksum);
+}
   }
 
   @VisibleForTesting
@@ -144,23 +148,35 @@ public class BigqueryMatcher extends 
TypeSafeMatcher
 .build();
   }
 
+  @Nonnull
   @VisibleForTesting
   QueryResponse queryWithRetries(Bigquery bigqueryClient, QueryRequest 
queryContent,
  Sleeper sleeper, BackOff backOff)
   throws IOException, InterruptedException {
 IOException lastException = null;
 do {
+  if (lastException != null) {
+LOG.warn("Retrying query ({}) after exception", 
queryContent.getQuery(), lastException);
+  }
   try {
-return bigqueryClient.jobs().query(projectId, queryContent).execute();
+QueryResponse response = bigqueryClient.jobs().query(projectId, 
queryContent).execute();
+if (response != null) {
+  return response;
+} else {
+  lastException =
+  new IOException("Expected valid response from query job, but 
received null.");
+}
   } catch (IOException e) {
 // ignore and retry
-LOG.warn("Ignore the error and retry the query.");
 lastException = e;
   }
 } while(BackOffUtils.next(sleeper, backOff));
-throw new IOException(
+
+throw new RuntimeException(
 String.format(
-"Unable to get 

[2/2] incubator-beam git commit: This closes #1479

2016-12-14 Thread tgroh
This closes #1479


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0b0a1b79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0b0a1b79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0b0a1b79

Branch: refs/heads/master
Commit: 0b0a1b79794598478033105e0582650fe17953ac
Parents: 4927cc1 b626f0e
Author: Thomas Groh 
Authored: Wed Dec 14 12:45:09 2016 -0800
Committer: Thomas Groh 
Committed: Wed Dec 14 12:45:09 2016 -0800

--
 pom.xml |  2 +-
 .../beam/sdk/testing/BigqueryMatcher.java   | 48 
 .../beam/sdk/testing/BigqueryMatcherTest.java   | 82 ++--
 .../main/resources/archetype-resources/pom.xml  |  2 +-
 .../main/resources/archetype-resources/pom.xml  |  2 +-
 5 files changed, 76 insertions(+), 60 deletions(-)
--




[1/2] incubator-beam git commit: This closes #1583

2016-12-14 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 5a51ace8d -> fdf07318f


This closes #1583


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fdf07318
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fdf07318
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fdf07318

Branch: refs/heads/master
Commit: fdf07318f6833bad4634b59e3676033288f0c4aa
Parents: 5a51ace 4cbccee
Author: Thomas Groh 
Authored: Wed Dec 14 11:29:29 2016 -0800
Committer: Thomas Groh 
Committed: Wed Dec 14 11:29:29 2016 -0800

--
 .../beam/runners/direct/DirectGroupByKey.java   | 36 +++-
 1 file changed, 20 insertions(+), 16 deletions(-)
--




[2/2] incubator-beam git commit: Implement GetDefaultOutputCoder in DirectGroupByKey

2016-12-14 Thread tgroh
Implement GetDefaultOutputCoder in DirectGroupByKey

This uses the standard Coder Inference path to set coders, rather than
explicitly setting the output coders for intermediate PCollections.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4cbccee8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4cbccee8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4cbccee8

Branch: refs/heads/master
Commit: 4cbccee8ee9a3b4235c6338fe49efc1f8a079812
Parents: 5a51ace
Author: Thomas Groh 
Authored: Mon Dec 12 13:55:49 2016 -0800
Committer: Thomas Groh 
Committed: Wed Dec 14 11:29:29 2016 -0800

--
 .../beam/runners/direct/DirectGroupByKey.java   | 36 +++-
 1 file changed, 20 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4cbccee8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 405d913..6c10bd2 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -22,6 +22,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -46,9 +47,6 @@ class DirectGroupByKey
 
   @Override
   public PCollection> expand(PCollection> input) {
-@SuppressWarnings("unchecked")
-KvCoder inputCoder = (KvCoder) input.getCoder();
-
 // This operation groups by the combination of key and window,
 // merging windows as needed, using the windows assigned to the
 // key/value input elements and the window merge operation of the
@@ -61,19 +59,11 @@ class DirectGroupByKey
 // By default, implement GroupByKey via a series of lower-level operations.
 return input
 .apply(new DirectGroupByKeyOnly())
-.setCoder(
-KeyedWorkItemCoder.of(
-inputCoder.getKeyCoder(),
-inputCoder.getValueCoder(),
-inputWindowingStrategy.getWindowFn().windowCoder()))
 
 // Group each key's values by window, merging windows as needed.
 .apply(
 "GroupAlsoByWindow",
-new DirectGroupAlsoByWindow(inputWindowingStrategy, 
outputWindowingStrategy))
-
-.setCoder(
-KvCoder.of(inputCoder.getKeyCoder(), 
IterableCoder.of(inputCoder.getValueCoder(;
+new DirectGroupAlsoByWindow(inputWindowingStrategy, 
outputWindowingStrategy));
   }
 
   static final class DirectGroupByKeyOnly
@@ -85,6 +75,16 @@ class DirectGroupByKey
 }
 
 DirectGroupByKeyOnly() {}
+
+@Override
+protected Coder getDefaultOutputCoder(
+@SuppressWarnings("unused") PCollection> input)
+throws CannotProvideCoderException {
+  return KeyedWorkItemCoder.of(
+  GroupByKey.getKeyCoder(input.getCoder()),
+  GroupByKey.getInputValueCoder(input.getCoder()),
+  input.getWindowingStrategy().getWindowFn().windowCoder());
+}
   }
 
   static final class DirectGroupAlsoByWindow
@@ -117,15 +117,19 @@ class DirectGroupByKey
   return kvCoder;
 }
 
-public Coder getKeyCoder(Coder> inputCoder) {
-  return getKeyedWorkItemCoder(inputCoder).getKeyCoder();
-}
-
 public Coder getValueCoder(Coder> inputCoder) {
   return getKeyedWorkItemCoder(inputCoder).getElementCoder();
 }
 
 @Override
+protected Coder getDefaultOutputCoder(
+@SuppressWarnings("unused") PCollection> input)
+throws CannotProvideCoderException {
+  KeyedWorkItemCoder inputCoder = 
getKeyedWorkItemCoder(input.getCoder());
+  return KvCoder.of(inputCoder.getKeyCoder(), 
IterableCoder.of(inputCoder.getElementCoder()));
+}
+
+@Override
 public PCollection> expand(PCollection> input) {
   return PCollection.createPrimitiveOutputInternal(
   input.getPipeline(), outputWindowingStrategy, input.isBounded());



[2/3] incubator-beam git commit: Add Tests for Kryo Serialization of URFBS

2016-12-12 Thread tgroh
Add Tests for Kryo Serialization of URFBS


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/47cc2dca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/47cc2dca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/47cc2dca

Branch: refs/heads/master
Commit: 47cc2dca05daa4075093c414e13bf0cacaa77744
Parents: 3c2e550
Author: Thomas Groh 
Authored: Mon Dec 12 16:33:53 2016 -0800
Committer: Thomas Groh 
Committed: Mon Dec 12 18:17:08 2016 -0800

--
 runners/core-java/pom.xml   |  7 ++
 .../UnboundedReadFromBoundedSourceTest.java | 97 ++--
 2 files changed, 94 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47cc2dca/runners/core-java/pom.xml
--
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index 0e2b4b0..bab9d57 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -152,6 +152,13 @@
 
 
 
+
+  com.esotericsoftware.kryo
+  kryo
+  2.21
+  test
+
+
 
 
   org.apache.beam

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/47cc2dca/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
--
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
index 7fd8807..8a1b70b 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
@@ -17,19 +17,28 @@
  */
 package org.apache.beam.runners.core;
 
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Random;
@@ -44,11 +53,13 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.FileBasedSource;
 import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Distinct;
@@ -65,6 +76,7 @@ import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.objenesis.strategy.StdInstantiatorStrategy;
 
 /**
  * Unit tests for {@link UnboundedReadFromBoundedSource}.
@@ -101,28 +113,93 @@ public class UnboundedReadFromBoundedSourceTest {
 
 PCollection output =
 p.apply(Read.from(unboundedSource).withMaxNumRecords(numElements));
-
 // Count == numElements
 PAssert
-  .thatSingleton(output.apply("Count", Count.globally()))
-  .isEqualTo(numElements);
+.thatSingleton(output.apply("Count", Count.globally()))
+.isEqualTo(numElements);
 // Unique count == numElements
 PAssert
-  .thatSingleton(output.apply(Distinct.create())
-  .apply("UniqueCount", Count.globally()))
-  .isEqualTo(numElements);
+.thatSingleton(output.apply(Distinct.create())
+.apply("UniqueCount", Count.globally()))
+.isEqualTo(numElements);
 // Min == 0
 PAssert
-  .thatSingleton(output.apply("Min", Min.globally()))
-  .isEqualTo(0L);
+

[3/3] incubator-beam git commit: Add no-arg constructor for UnboundedReadFromBoundedSource

2016-12-12 Thread tgroh
Add no-arg constructor for UnboundedReadFromBoundedSource

This allows Kryo to work with the type, currently required by
the Apex runner.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3c2e550a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3c2e550a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3c2e550a

Branch: refs/heads/master
Commit: 3c2e550aa7a56fe16f278cb353314a1ee51dbfe3
Parents: d9657ff
Author: Kenneth Knowles 
Authored: Mon Dec 12 14:59:36 2016 -0800
Committer: Thomas Groh 
Committed: Mon Dec 12 18:17:08 2016 -0800

--
 .../beam/runners/core/UnboundedReadFromBoundedSource.java | 7 ++-
 1 file changed, 6 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c2e550a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index f3f93e1..c804725 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -118,7 +118,12 @@ public class UnboundedReadFromBoundedSource extends 
PTransform {
 
-private BoundedSource boundedSource;
+@SuppressWarnings("unused") // for Kryo
+private BoundedToUnboundedSourceAdapter() {
+  this.boundedSource = null;
+}
+
+private final BoundedSource boundedSource;
 
 public BoundedToUnboundedSourceAdapter(BoundedSource boundedSource) {
   this.boundedSource = boundedSource;



[1/3] incubator-beam git commit: This closes #1586

2016-12-12 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master d9657ffc3 -> 91cc606b4


This closes #1586


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/91cc606b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/91cc606b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/91cc606b

Branch: refs/heads/master
Commit: 91cc606b469b1166b2a2d41425eab63a4795b1e0
Parents: d9657ff 47cc2dc
Author: Thomas Groh 
Authored: Mon Dec 12 18:17:08 2016 -0800
Committer: Thomas Groh 
Committed: Mon Dec 12 18:17:08 2016 -0800

--
 runners/core-java/pom.xml   |  7 ++
 .../core/UnboundedReadFromBoundedSource.java|  7 +-
 .../UnboundedReadFromBoundedSourceTest.java | 97 ++--
 3 files changed, 100 insertions(+), 11 deletions(-)
--




[GitHub] incubator-beam pull request #1586: [BEAM-1139] Add no-arg constructor for Un...

2016-12-12 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1586

[BEAM-1139] Add no-arg constructor for UnboundedReadFromBoundedSource

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

This allows Kryo to work with the type, currently required by
the Apex runner.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam take_over_1585

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1586.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1586


commit e88e9edfcca65066d3f40d0fa49ba7386e09652f
Author: Kenneth Knowles <k...@google.com>
Date:   2016-12-12T22:59:36Z

Add no-arg constructor for UnboundedReadFromBoundedSource

This allows Kryo to work with the type, currently required by
the Apex runner.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1583: Implement GetDefaultOutputCoder in Direct...

2016-12-12 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1583

Implement GetDefaultOutputCoder in DirectGroupByKey

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

This uses the standard Coder Inference path to set coders, rather than
explicitly setting the output coders for intermediate PCollections.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam 
coder_inference_direct_runner

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1583.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1583


commit 06796b2396ea5d56cd4c75e82a7f13a2d5ad52aa
Author: Thomas Groh <tg...@google.com>
Date:   2016-12-12T21:55:49Z

Implement GetDefaultOutputCoder in DirectGroupByKey

This uses the standard Coder Inference path to set coders, rather than
explicitly setting the output coders for intermediate PCollections.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: Stop expanding PValues in DirectRunner visitors

2016-12-12 Thread tgroh
Stop expanding PValues in DirectRunner visitors

A PValue always expands to itself, and these calls are unneccessary.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8ef74a74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8ef74a74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8ef74a74

Branch: refs/heads/master
Commit: 8ef74a744327c40fbb05030fd7657db8a865cb94
Parents: 5d619e8
Author: Thomas Groh 
Authored: Fri Dec 9 15:52:15 2016 -0800
Committer: Thomas Groh 
Committed: Mon Dec 12 13:51:29 2016 -0800

--
 .../beam/runners/direct/DirectGraphVisitor.java   | 14 ++
 .../runners/direct/KeyedPValueTrackingVisitor.java|  2 +-
 2 files changed, 7 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8ef74a74/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
index 4f38bce..0283d03 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
@@ -99,14 +99,12 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
 if (!producers.containsKey(value)) {
   producers.put(value, appliedTransform);
 }
-for (PValue expandedValue : value.expand()) {
-  if (expandedValue instanceof PCollectionView) {
-views.add((PCollectionView) expandedValue);
-  }
-  if (!producers.containsKey(expandedValue)) {
-producers.put(value, appliedTransform);
-  }
-}
+   if (value instanceof PCollectionView) {
+ views.add((PCollectionView) value);
+   }
+   if (!producers.containsKey(value)) {
+ producers.put(value, appliedTransform);
+   }
   }
 
   private AppliedPTransform 
getAppliedTransform(TransformHierarchy.Node node) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8ef74a74/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
index 4161f9e..7f85169 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
@@ -84,7 +84,7 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor {
   @Override
   public void visitValue(PValue value, TransformHierarchy.Node producer) {
 if (producesKeyedOutputs.contains(producer.getTransform().getClass())) {
-  keyedValues.addAll(value.expand());
+  keyedValues.add(value);
 }
   }
 



[1/2] incubator-beam git commit: This closes #1570

2016-12-12 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 5d619e8e3 -> 59f1fb26a


This closes #1570


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/59f1fb26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/59f1fb26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/59f1fb26

Branch: refs/heads/master
Commit: 59f1fb26a900b4e4cbdb4d16b94cfe646b1c5f7e
Parents: 5d619e8 8ef74a7
Author: Thomas Groh 
Authored: Mon Dec 12 13:51:29 2016 -0800
Committer: Thomas Groh 
Committed: Mon Dec 12 13:51:29 2016 -0800

--
 .../beam/runners/direct/DirectGraphVisitor.java   | 14 ++
 .../runners/direct/KeyedPValueTrackingVisitor.java|  2 +-
 2 files changed, 7 insertions(+), 9 deletions(-)
--




[GitHub] incubator-beam pull request #1582: [BEAM-646] Add Parameters to finishSpecif...

2016-12-12 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1582

[BEAM-646] Add Parameters to finishSpecifying

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---
This removes use of getProducingTransformInternal() in TypedPValue.

Ensure that all nodes are finished specifying before a call to
`Pipeline#traverseTopologically` or `PipelineRunner#run`. This ensures
that all nodes are fully specified without requiring the `PipelineRunner`
to do so explicitly.

Use Coder Inference rather than explicitly setting Coders within
DirectRunner overrides.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam 
parametered_finish_specifying

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1582.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1582


commit 05d3151de529e1db8ffe0e16ea03e4d2674b4696
Author: Thomas Groh <tg...@google.com>
Date:   2016-12-08T22:33:36Z

Improve Coder Inference within DirectRunner Overrides

This cleans up how coders are provided for DirectRunner overrides.

Ensure that coders are provided within tests that use an empty Flatten.

commit 97d81855b7dd54b0362121fe2b7d92aafb7d6594
Author: Thomas Groh <tg...@google.com>
Date:   2016-12-08T17:09:33Z

Add Parameters to finishSpecifying

Remove the need to use getProducingTransformInternal in TypedPValue.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: Migrate AppliedPTransform to use AutoValue

2016-12-12 Thread tgroh
Migrate AppliedPTransform to use AutoValue


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8f52b5e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8f52b5e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8f52b5e1

Branch: refs/heads/master
Commit: 8f52b5e1214061a9cbd2bd2c44f68c22102d9737
Parents: 98543e9
Author: Thomas Groh 
Authored: Fri Dec 9 15:34:25 2016 -0800
Committer: Thomas Groh 
Committed: Mon Dec 12 13:23:52 2016 -0800

--
 .../beam/sdk/transforms/AppliedPTransform.java  | 75 
 1 file changed, 15 insertions(+), 60 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8f52b5e1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
index d80c116..77de54a 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AppliedPTransform.java
@@ -17,8 +17,7 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
+import com.google.auto.value.AutoValue;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 
@@ -32,70 +31,26 @@ import org.apache.beam.sdk.values.POutput;
  * @param  transform output type
  * @param  transform type
  */
-public class AppliedPTransform
+@AutoValue
+public abstract class AppliedPTransform
 > {
 
-  private final String fullName;
-  private final InputT input;
-  private final OutputT output;
-  private final TransformT transform;
-
-  private AppliedPTransform(String fullName, InputT input, OutputT output, 
TransformT transform) {
-this.input = input;
-this.output = output;
-this.transform = transform;
-this.fullName = fullName;
+  public static <
+  InputT extends PInput,
+  OutputT extends POutput,
+  TransformT extends PTransform>
+  AppliedPTransform of(
+  String fullName, InputT input, OutputT output, TransformT transform) 
{
+return new AutoValue_AppliedPTransform(
+fullName, input, output, transform);
   }
 
-  public static >
-  AppliedPTransform of(
-  String fullName, InputT input, OutputT output, TransformT transform) {
-return new AppliedPTransform(fullName, input, 
output, transform);
-  }
+  public abstract String getFullName();
 
-  public String getFullName() {
-return fullName;
-  }
+  public abstract InputT getInput();
 
-  public InputT getInput() {
-return input;
-  }
+  public abstract OutputT getOutput();
 
-  public OutputT getOutput() {
-return output;
-  }
-
-  public TransformT getTransform() {
-return transform;
-  }
-
-  @Override
-  public int hashCode() {
-return Objects.hashCode(getFullName(), getInput(), getOutput(), 
getTransform());
-  }
-
-  @Override
-  public boolean equals(Object other) {
-if (other instanceof AppliedPTransform) {
-  AppliedPTransform that = (AppliedPTransform) other;
-  return Objects.equal(this.getFullName(), that.getFullName())
-  && Objects.equal(this.getInput(), that.getInput())
-  && Objects.equal(this.getOutput(), that.getOutput())
-  && Objects.equal(this.getTransform(), that.getTransform());
-} else {
-  return false;
-}
-  }
-
-  @Override
-  public String toString() {
-return MoreObjects.toStringHelper(getClass())
-.add("fullName", getFullName())
-.add("input", getInput())
-.add("output", getOutput())
-.add("transform", getTransform())
-.toString();
-  }
+  public abstract TransformT getTransform();
 }



[GitHub] incubator-beam pull request #1570: Stop expanding PValues in DirectRunner vi...

2016-12-09 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1570

Stop expanding PValues in DirectRunner visitors

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

A PValue always expands to itself, and these calls are unneccessary.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam rm_unneeded_expands

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1570.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1570


commit 314e3da822ba59f8ac7133cbf47c61fe429a7361
Author: Thomas Groh <tg...@google.com>
Date:   2016-12-09T23:52:15Z

Stop expanding PValues in DirectRunner visitors

A PValue always expands to itself, and these calls are unneccessary.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1569: [BEAM-646] Provide local tags in PInput, ...

2016-12-09 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1569

[BEAM-646] Provide local tags in PInput, POutput expansions

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

Output an ordered colleciton in PInput and POutput expansions.

This provides information that is necessary to reconstruct a PInput
or POutput from its expansion.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam tagged_pvalues_expansions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1569.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1569


commit f6d826115cdc4ede1a3c5bc8fc86aac3118f2902
Author: Thomas Groh <tg...@google.com>
Date:   2016-12-06T19:03:52Z

Provide local tags in PInput, POutput expansions

Output an ordered colleciton in PInput and POutput expansions.

This provides information that is necessary to reconstruct a PInput
or POutput from its expansion.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/4] incubator-beam-site git commit: [BEAM-835] Added IntelliJ instructions to the contribution guide

2016-12-09 Thread tgroh
Repository: incubator-beam-site
Updated Branches:
  refs/heads/asf-site 37adeb9ba -> 59f789b97


[BEAM-835] Added IntelliJ instructions to the contribution guide


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/33963196
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/33963196
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/33963196

Branch: refs/heads/asf-site
Commit: 3396319690ab8a9e0bd1ce53cfd3a5ef78d40a3e
Parents: 37adeb9
Author: minudika 
Authored: Fri Nov 25 18:08:51 2016 +0530
Committer: Thomas Groh 
Committed: Fri Dec 9 15:16:08 2016 -0800

--
 src/contribute/contribution-guide.md | 17 +++--
 1 file changed, 15 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/33963196/src/contribute/contribution-guide.md
--
diff --git a/src/contribute/contribution-guide.md 
b/src/contribute/contribution-guide.md
index 1db04cc..3eb670f 100644
--- a/src/contribute/contribution-guide.md
+++ b/src/contribute/contribution-guide.md
@@ -97,8 +97,21 @@ Depending on your preferred development environment, you may 
need to prepare it
 
 # IntelliJ
 
-_This documentation is incomplete. If you are an Intellij user, please 
contribute instructions!
-See [BEAM-835](https://issues.apache.org/jira/browse/BEAM-835)._
+## Enable Annotation Processing
+
+To configure annotation processing in InteliJ Idea,
+
+1. Go to File -> Settings
+Settings dialog window will be appeared.
+2. Select Execution,Deployment,Deployment -> Compiler -> Annotation processors
+3. Select,
+   * Enable annotation processing
+   * Obtain processors from project classpath
+   * Store generated sources relative to : _Module content root_
+4. Provide,
+   * Production sources directory : _target/generated-sources/annotations_
+   * Test sources directory : _target/generated-test-sources/test-annotations_
+5. Finally click on apply and you are good to go.
 
 ## Checkstyle
 IntelliJ supports checkstyle within the IDE using the Checkstyle-IDEA plugin.



[2/4] incubator-beam-site git commit: Reword IntelliJ Annotation Processor section

2016-12-09 Thread tgroh
Reword IntelliJ Annotation Processor section


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/01ec343e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/01ec343e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/01ec343e

Branch: refs/heads/asf-site
Commit: 01ec343e58a8bef3d00e80525f252ef4896adede
Parents: 3396319
Author: Thomas Groh 
Authored: Fri Dec 9 15:01:59 2016 -0800
Committer: Thomas Groh 
Committed: Fri Dec 9 15:16:09 2016 -0800

--
 src/contribute/contribution-guide.md | 25 +++--
 1 file changed, 11 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/01ec343e/src/contribute/contribution-guide.md
--
diff --git a/src/contribute/contribution-guide.md 
b/src/contribute/contribution-guide.md
index 3eb670f..6be6bf3 100644
--- a/src/contribute/contribution-guide.md
+++ b/src/contribute/contribution-guide.md
@@ -98,20 +98,17 @@ Depending on your preferred development environment, you 
may need to prepare it
 # IntelliJ
 
 ## Enable Annotation Processing
-
-To configure annotation processing in InteliJ Idea,
-
-1. Go to File -> Settings
-Settings dialog window will be appeared.
-2. Select Execution,Deployment,Deployment -> Compiler -> Annotation processors
-3. Select,
-   * Enable annotation processing
-   * Obtain processors from project classpath
-   * Store generated sources relative to : _Module content root_
-4. Provide,
-   * Production sources directory : _target/generated-sources/annotations_
-   * Test sources directory : _target/generated-test-sources/test-annotations_
-5. Finally click on apply and you are good to go.
+To configure annotation processing in IntelliJ:
+
+1. Open Annotation Processors Settings dialog box by going to Settings -> 
Build, Execution, Deployment -> Compiler -> Annotation Processors.
+1. Select the following buttons:
+   * "Enable annotation processing"
+   * "Obtain processors from project classpath"
+   * "Store generated sources relative to: _Module content root_"
+1. Set the generated source directories to be equal to the Maven directories:
+   * Set "Production sources directory:" to 
"target/generated-sources/annotations".
+   * Set "Test sources directory:" to 
"target/generated-test-sources/test-annotations".
+1. Click "OK".
 
 ## Checkstyle
 IntelliJ supports checkstyle within the IDE using the Checkstyle-IDEA plugin.



[3/4] incubator-beam-site git commit: Regenerate Content

2016-12-09 Thread tgroh
Regenerate Content


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/c676efe7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/c676efe7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/c676efe7

Branch: refs/heads/asf-site
Commit: c676efe7d7032a89eec19400628c0b40bb2b12ab
Parents: 01ec343
Author: Thomas Groh 
Authored: Fri Dec 9 15:05:21 2016 -0800
Committer: Thomas Groh 
Committed: Fri Dec 9 15:16:09 2016 -0800

--
 .../contribute/contribution-guide/index.html| 23 ++--
 1 file changed, 21 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/c676efe7/content/contribute/contribution-guide/index.html
--
diff --git a/content/contribute/contribution-guide/index.html 
b/content/contribute/contribution-guide/index.html
index 87d5f74..7d7b7c9 100644
--- a/content/contribute/contribution-guide/index.html
+++ b/content/contribute/contribution-guide/index.html
@@ -161,6 +161,7 @@
   Clone the repository 
locally
   [Optional] IDE Setup
   IntelliJ  
  
+  Enable Annotation 
Processing
   Checkstyle
 
   
@@ -301,8 +302,26 @@ $ cd incubator-beam
 
 IntelliJ
 
-This documentation is incomplete. If you are an Intellij user, please 
contribute instructions!
-See https://issues.apache.org/jira/browse/BEAM-835;>BEAM-835.
+Enable Annotation Processing
+To configure annotation processing in IntelliJ:
+
+
+  Open Annotation Processors Settings dialog box by going to Settings 
- Build, Execution, Deployment - Compiler - Annotation 
Processors.
+  Select the following buttons:
+
+  “Enable annotation processing”
+  “Obtain processors from project classpath”
+  “Store generated sources relative to: Module content 
root”
+
+  
+  Set the generated source directories to be equal to the Maven 
directories:
+
+  Set “Production sources directory:” to 
“target/generated-sources/annotations”.
+  Set “Test sources directory:” to 
“target/generated-test-sources/test-annotations”.
+
+  
+  Click “OK”.
+
 
 Checkstyle
 IntelliJ supports checkstyle within the IDE using the Checkstyle-IDEA 
plugin.



[4/4] incubator-beam-site git commit: This closes #95

2016-12-09 Thread tgroh
This closes #95


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/59f789b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/59f789b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/59f789b9

Branch: refs/heads/asf-site
Commit: 59f789b97d5c03552d571afee6eeb1e3ba12
Parents: 37adeb9 c676efe
Author: Thomas Groh 
Authored: Fri Dec 9 15:16:20 2016 -0800
Committer: Thomas Groh 
Committed: Fri Dec 9 15:16:20 2016 -0800

--
 .../contribute/contribution-guide/index.html| 23 ++--
 src/contribute/contribution-guide.md| 14 ++--
 2 files changed, 33 insertions(+), 4 deletions(-)
--




[GitHub] incubator-beam pull request #1567: Copy Runner Profiles to Java8 Archetype P...

2016-12-09 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1567

Copy Runner Profiles to Java8 Archetype POM

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

This enables Java8 examples to be run cross-runner

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam ohfouroh-rc1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1567.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1567


commit 65dd62cd9b1d6f7925ee60b73a5a3fac1e4b1bf1
Author: Thomas Groh <tg...@google.com>
Date:   2016-12-09T19:36:22Z

Copy Runner Profiles to Java8 Archetype POM

This enables Java8 examples to be run cross-runner




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1563: Enable the DirectRunner by default in Exa...

2016-12-09 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1563

Enable the DirectRunner by default in Examples

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

This ensures a runner will be on the classpath if no profile is
specified. This matches the generated examples with the quickstart.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam ohfouroh-rc1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1563.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1563


commit f83370d69d193818e5be9ff613498f9f3296a658
Author: Thomas Groh <tg...@google.com>
Date:   2016-12-09T18:52:46Z

Enable the DirectRunner by default in Examples Archetype

This ensures a runner will be on the classpath if no profile is
specified. This matches the generated examples with the quickstart.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: This closes #1525

2016-12-08 Thread tgroh
This closes #1525


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/40bd2760
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/40bd2760
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/40bd2760

Branch: refs/heads/master
Commit: 40bd27602ebe2269ccaba2685addb8e5e3ba533e
Parents: 409b5df bf1fba4
Author: Thomas Groh 
Authored: Thu Dec 8 17:17:56 2016 -0800
Committer: Thomas Groh 
Committed: Thu Dec 8 17:17:56 2016 -0800

--
 .../beam/runners/core/SplittableParDo.java  |  9 ++-
 .../org/apache/beam/sdk/transforms/ParDo.java   |  8 ++-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  6 +-
 .../beam/sdk/transforms/SplittableDoFnTest.java | 58 +++-
 4 files changed, 73 insertions(+), 8 deletions(-)
--




[1/2] incubator-beam git commit: Fix a bug in SplittableDoFn Checkpointing

2016-12-08 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 409b5dfcf -> 40bd27602


Fix a bug in SplittableDoFn Checkpointing

Call checkpoint() only once if the SDF emits output several times per
claim call.

Calling checkpoint multiple times would clobber an existing checkpoint,
and the second call would only ever return an empty residual, losing all
of the initial residual.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bf1fba45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bf1fba45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bf1fba45

Branch: refs/heads/master
Commit: bf1fba450e6b5fd6c98d006b381472eee8db7b72
Parents: 409b5df
Author: Eugene Kirpichov 
Authored: Tue Dec 6 18:00:03 2016 -0800
Committer: Thomas Groh 
Committed: Thu Dec 8 17:16:30 2016 -0800

--
 .../beam/runners/core/SplittableParDo.java  |  9 ++-
 .../org/apache/beam/sdk/transforms/ParDo.java   |  8 ++-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  6 +-
 .../beam/sdk/transforms/SplittableDoFnTest.java | 58 +++-
 4 files changed, 73 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf1fba45/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 0bf882b..8a9bfcd 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -590,9 +590,14 @@ public class SplittableParDo
 }
 
 private void noteOutput() {
-  if (++numOutputs >= MAX_OUTPUTS_PER_BUNDLE) {
+  // Take the checkpoint only if it hasn't been taken yet, because:
+  // 1) otherwise we'd lose the previous checkpoint stored in 
residualRestrictionHolder
+  // 2) it's not allowed to checkpoint a RestrictionTracker twice, 
since the first call
+  // by definition already maximally narrows its restriction, so a 
second checkpoint would
+  // have produced a useless empty residual restriction anyway.
+  if (++numOutputs >= MAX_OUTPUTS_PER_BUNDLE && 
residualRestrictionHolder[0] == null) {
 // Request a checkpoint. The fn *may* produce more output, but 
hopefully not too much.
-residualRestrictionHolder[0] = tracker.checkpoint();
+residualRestrictionHolder[0] = checkNotNull(tracker.checkpoint());
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf1fba45/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index e60c536..167f5fa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -787,7 +787,9 @@ public class ParDo {
 @Override
 public PCollection expand(PCollection input) {
   checkArgument(
-  !isSplittable(getOldFn()), "Splittable DoFn not supported by the 
current runner");
+  !isSplittable(getOldFn()),
+  "%s does not support Splittable DoFn",
+  input.getPipeline().getOptions().getRunner().getName());
   validateWindowType(input, fn);
   return PCollection.createPrimitiveOutputInternal(
   input.getPipeline(),
@@ -1044,7 +1046,9 @@ public class ParDo {
 @Override
 public PCollectionTuple expand(PCollection input) {
   checkArgument(
-  !isSplittable(getOldFn()), "Splittable DoFn not supported by the 
current runner");
+  !isSplittable(getOldFn()),
+  "%s does not support Splittable DoFn",
+  input.getPipeline().getOptions().getRunner().getName());
   validateWindowType(input, fn);
   PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(
   input.getPipeline(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf1fba45/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 

[GitHub] incubator-beam pull request #1547: [BEAM-646] Add PTransformOverrideFactory ...

2016-12-07 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1547

[BEAM-646] Add PTransformOverrideFactory to the Core SDK

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

This migrates PTransformOverrideFactory from the DirectRunner to the
Core SDK, as part of BEAM-646.

Migrate all DirectRunner Override Factories to the new
PTransformOverrideFactory.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam override_factory_in_core

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1547.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1547


commit ad7aa03e12694bceb29906d2bb9df1ce009a1df2
Author: Thomas Groh <tg...@google.com>
Date:   2016-12-06T00:01:57Z

Add PTransformOverrideFactory to the Core SDK

This migrates PTransformOverrideFactory from the DirectRunner to the
Core SDK, as part of BEAM-646.

Add getOriginalToReplacements to provide a mapping from the original
outputs to replaced outputs. This enables all replaced nodes to be
rewired to output the original output.

Migrate all DirectRunner Override Factories to the new
PTransformOverrideFactory.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1442: [BEAM-646] Add Replacement Methods to Tra...

2016-12-07 Thread tgroh
Github user tgroh closed the pull request at:

https://github.com/apache/incubator-beam/pull/1442


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: This closes #1511

2016-12-07 Thread tgroh
This closes #1511


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5b31a369
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5b31a369
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5b31a369

Branch: refs/heads/master
Commit: 5b31a369962907e257de8019fbf6cde4c615b1c0
Parents: ae52ec1 55d333b
Author: Thomas Groh 
Authored: Wed Dec 7 09:14:38 2016 -0800
Committer: Thomas Groh 
Committed: Wed Dec 7 09:14:38 2016 -0800

--
 .../apex/translation/TranslationContext.java|  4 +--
 .../beam/runners/direct/DirectGraphVisitor.java |  9 +++
 .../direct/KeyedPValueTrackingVisitor.java  |  2 +-
 .../FlinkBatchPipelineTranslator.java   |  4 +--
 .../FlinkStreamingPipelineTranslator.java   |  7 ++
 .../dataflow/DataflowPipelineTranslator.java|  3 +--
 .../apache/beam/runners/spark/SparkRunner.java  | 17 +++--
 .../beam/sdk/runners/TransformHierarchy.java| 26 +++-
 .../sdk/runners/TransformHierarchyTest.java | 13 --
 9 files changed, 38 insertions(+), 47 deletions(-)
--




[1/2] incubator-beam git commit: Only provide expanded Inputs and Outputs

2016-12-07 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master ae52ec1bc -> 5b31a3699


Only provide expanded Inputs and Outputs

This removes PInput and POutput from the immediate API Surface of
TransformHierarchy.Node, and forces Pipeline Visitors to access only
the expanded version of the output.

This is part of the move towards the runner-agnostic representation of a
graph.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/55d333bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/55d333bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/55d333bf

Branch: refs/heads/master
Commit: 55d333bff68809ff1a9154491ace02d2d16e3b85
Parents: ae52ec1
Author: Thomas Groh 
Authored: Mon Dec 5 14:29:05 2016 -0800
Committer: Thomas Groh 
Committed: Wed Dec 7 09:14:18 2016 -0800

--
 .../apex/translation/TranslationContext.java|  4 +--
 .../beam/runners/direct/DirectGraphVisitor.java |  9 +++
 .../direct/KeyedPValueTrackingVisitor.java  |  2 +-
 .../FlinkBatchPipelineTranslator.java   |  4 +--
 .../FlinkStreamingPipelineTranslator.java   |  7 ++
 .../dataflow/DataflowPipelineTranslator.java|  3 +--
 .../apache/beam/runners/spark/SparkRunner.java  | 17 +++--
 .../beam/sdk/runners/TransformHierarchy.java| 26 +++-
 .../sdk/runners/TransformHierarchyTest.java | 13 --
 9 files changed, 38 insertions(+), 47 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
index 259afbd..3bf01a8 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
@@ -35,7 +35,6 @@ import 
org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.PCollection;
@@ -72,8 +71,7 @@ class TranslationContext {
   }
 
   public void setCurrentTransform(TransformHierarchy.Node treeNode) {
-this.currentTransform = AppliedPTransform.of(treeNode.getFullName(),
-treeNode.getInput(), treeNode.getOutput(), (PTransform) 
treeNode.getTransform());
+this.currentTransform = treeNode.toAppliedPTransform();
   }
 
   public ApexPipelineOptions getPipelineOptions() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55d333bf/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
index cd9d120..4f38bce 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
@@ -79,13 +79,13 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
 
   @Override
   public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-toFinalize.removeAll(node.getInput().expand());
+toFinalize.removeAll(node.getInputs());
 AppliedPTransform appliedTransform = getAppliedTransform(node);
 stepNames.put(appliedTransform, genStepName());
-if (node.getInput().expand().isEmpty()) {
+if (node.getInputs().isEmpty()) {
   rootTransforms.add(appliedTransform);
 } else {
-  for (PValue value : node.getInput().expand()) {
+  for (PValue value : node.getInputs()) {
 primitiveConsumers.put(value, appliedTransform);
   }
 }
@@ -111,8 +111,7 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
 
   private AppliedPTransform 
getAppliedTransform(TransformHierarchy.Node node) {
 @SuppressWarnings({"rawtypes", "unchecked"})
-AppliedPTransform application = AppliedPTransform.of(
-node.getFullName(), node.getInput(), node.getOutput(), (PTransform) 
node.getTransform());
+

[1/2] incubator-beam git commit: This closes #1527

2016-12-07 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 02bb8c375 -> 1526184ae


This closes #1527


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1526184a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1526184a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1526184a

Branch: refs/heads/master
Commit: 1526184ae8be1f8ae6863f830653204157a584cd
Parents: 02bb8c3 b2d7223
Author: Thomas Groh 
Authored: Wed Dec 7 08:51:02 2016 -0800
Committer: Thomas Groh 
Committed: Wed Dec 7 08:51:02 2016 -0800

--
 .../java/org/apache/beam/runners/core/DoFnRunner.java | 14 +++---
 1 file changed, 7 insertions(+), 7 deletions(-)
--




[2/2] incubator-beam git commit: Port most of DoFnRunner Javadoc to new DoFn

2016-12-07 Thread tgroh
Port most of DoFnRunner Javadoc to new DoFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b2d72237
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b2d72237
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b2d72237

Branch: refs/heads/master
Commit: b2d72237b592e1dcb5cca30f5cbc9a11d2890c0f
Parents: 02bb8c3
Author: Kenneth Knowles 
Authored: Tue Dec 6 15:20:28 2016 -0800
Committer: Thomas Groh 
Committed: Wed Dec 7 08:51:02 2016 -0800

--
 .../java/org/apache/beam/runners/core/DoFnRunner.java | 14 +++---
 1 file changed, 7 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b2d72237/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
index aac8e8f..501667e 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -18,29 +18,29 @@
 package org.apache.beam.runners.core;
 
 import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.ProcessContext;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 
 /**
- * An wrapper interface that represents the execution of a {@link OldDoFn}.
+ * An wrapper interface that represents the execution of a {@link DoFn}.
  */
 public interface DoFnRunner {
   /**
-   * Prepares and calls {@link OldDoFn#startBundle}.
+   * Prepares and calls a {@link DoFn DoFn's} {@link DoFn.StartBundle 
@StartBundle} method.
*/
   void startBundle();
 
   /**
-   * Calls {@link OldDoFn#processElement} with a {@link ProcessContext} 
containing the current
-   * element.
+   * Calls a {@link DoFn DoFn's} {@link DoFn.ProcessElement @ProcessElement} 
method with a
+   * {@link DoFn.ProcessContext} containing the provided element.
*/
   void processElement(WindowedValue elem);
 
   /**
-   * Calls {@link OldDoFn#finishBundle} and performs additional tasks, such as
-   * flushing in-memory states.
+   * Calls a {@link DoFn DoFn's} {@link DoFn.FinishBundle @FinishBundle} 
method and performs
+   * additional tasks, such as flushing in-memory states.
*/
   void finishBundle();
 



[2/4] incubator-beam git commit: Add DirectGraphs to DirectRunner Tests

2016-12-06 Thread tgroh
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
--
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 076e0fb..eb4d0cd 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -31,7 +31,6 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
@@ -63,7 +62,6 @@ import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
@@ -116,33 +114,8 @@ public class WatermarkManagerTest implements Serializable {
 PCollectionList preFlatten = 
PCollectionList.of(createdInts).and(intsToFlatten);
 flattened = preFlatten.apply("flattened", Flatten.pCollections());
 
-Collection rootTransforms =
-ImmutableList.of(
-createdInts.getProducingTransformInternal(),
-intsToFlatten.getProducingTransformInternal());
-
-Map> consumers = new 
HashMap<>();
-consumers.put(
-createdInts,
-ImmutableList.of(filtered.getProducingTransformInternal(),
-keyed.getProducingTransformInternal(), 
flattened.getProducingTransformInternal()));
-consumers.put(
-filtered,
-Collections.singleton(
-filteredTimesTwo.getProducingTransformInternal()));
-consumers.put(filteredTimesTwo, Collections.emptyList());
-consumers.put(keyed, Collections.emptyList());
-
-consumers.put(
-intsToFlatten,
-Collections.singleton(
-flattened.getProducingTransformInternal()));
-consumers.put(flattened, Collections.emptyList());
-
 clock = MockClock.fromInstant(new Instant(1000));
-DirectGraphVisitor visitor = new DirectGraphVisitor();
-p.traverseTopologically(visitor);
-graph = visitor.getGraph();
+graph = DirectGraphs.getGraph(p);
 
 manager = WatermarkManager.create(clock, graph);
 bundleFactory = ImmutableListBundleFactory.create();
@@ -155,7 +128,7 @@ public class WatermarkManagerTest implements Serializable {
   @Test
   public void getWatermarkForUntouchedTransform() {
 TransformWatermarks watermarks =
-manager.getWatermarks(createdInts.getProducingTransformInternal());
+manager.getWatermarks(graph.getProducer(createdInts));
 
 assertThat(watermarks.getInputWatermark(), 
equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
 assertThat(watermarks.getOutputWatermark(), 
equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
@@ -170,13 +143,13 @@ public class WatermarkManagerTest implements Serializable 
{
 CommittedBundle output = multiWindowedBundle(createdInts, 1);
 manager.updateWatermarks(null,
 TimerUpdate.empty(),
-result(createdInts.getProducingTransformInternal(),
+result(graph.getProducer(createdInts),
 null,
 Collections.singleton(output)),
 new Instant(8000L));
 manager.refreshAll();
 TransformWatermarks updatedSourceWatermark =
-manager.getWatermarks(createdInts.getProducingTransformInternal());
+manager.getWatermarks(graph.getProducer(createdInts));
 
 assertThat(updatedSourceWatermark.getOutputWatermark(), equalTo(new 
Instant(8000L)));
   }
@@ -191,7 +164,7 @@ public class WatermarkManagerTest implements Serializable {
 
 manager.updateWatermarks(null,
 TimerUpdate.empty(),
-result(intsToFlatten.getProducingTransformInternal(),
+result(graph.getProducer(intsToFlatten),
 null,
 
Collections.singleton(secondPcollectionBundle)),
 BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -199,7 +172,7 @@ public class WatermarkManagerTest implements Serializable {
 
 // We didn't do anything for the first source, so we shouldn't have 
progressed the watermark
 TransformWatermarks firstSourceWatermark =
-manager.getWatermarks(createdInts.getProducingTransformInternal());
+

[1/4] incubator-beam git commit: Remove getProducingTransformInternal from DirectGraphVisitorTest

2016-12-06 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 077d9118d -> ca6ab6c68


Remove getProducingTransformInternal from DirectGraphVisitorTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ec1eff38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ec1eff38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ec1eff38

Branch: refs/heads/master
Commit: ec1eff387a711039801289c8f59c4240b1f1d007
Parents: d6c6ad3
Author: Thomas Groh 
Authored: Fri Dec 2 14:26:04 2016 -0800
Committer: Thomas Groh 
Committed: Tue Dec 6 10:46:39 2016 -0800

--
 .../runners/direct/DirectGraphVisitorTest.java  | 70 +---
 1 file changed, 47 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ec1eff38/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
--
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index fb84de8..5ad278b 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -19,25 +19,34 @@ package org.apache.beam.runners.direct;
 
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 
+import com.google.common.collect.Iterables;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
 import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
@@ -48,7 +57,6 @@ import org.junit.runners.JUnit4;
 /**
  * Tests for {@link DirectGraphVisitor}.
  */
-// TODO: Replace uses of getProducing
 @RunWith(JUnit4.class)
 public class DirectGraphVisitorTest implements Serializable {
   @Rule public transient ExpectedException thrown = ExpectedException.none();
@@ -81,26 +89,36 @@ public class DirectGraphVisitorTest implements Serializable 
{
   @Test
   public void getRootTransformsContainsPBegins() {
 PCollection created = p.apply(Create.of("foo", "bar"));
-PCollection counted = p.apply(CountingInput.upTo(1234L));
+PCollection counted = p.apply(Read.from(CountingSource.upTo(1234L)));
 PCollection unCounted = p.apply(CountingInput.unbounded());
 p.traverseTopologically(visitor);
+DirectGraph graph = visitor.getGraph();
+assertThat(graph.getRootTransforms(), hasSize(3));
+List unapplied = new ArrayList<>();
 assertThat(
-visitor.getGraph().getRootTransforms(),
+graph.getRootTransforms(),
 Matchers.containsInAnyOrder(
-created.getProducingTransformInternal(),
-counted.getProducingTransformInternal(),
-unCounted.getProducingTransformInternal()));
+graph.getProducer(created), graph.getProducer(counted), 
graph.getProducer(unCounted)));
+for (AppliedPTransform root : graph.getRootTransforms())  {
+  assertTrue(root.getInput() instanceof PBegin);
+  assertThat(root.getOutput(), Matchers.isOneOf(created, counted, 
unCounted));
+}
   }
 
   @Test
   public void getRootTransformsContainsEmptyFlatten() {
-PCollection empty =
-PCollectionList.empty(p).apply(Flatten.pCollections());
+FlattenPCollectionList flatten = Flatten.pCollections();
+PCollectionList emptyList = PCollectionList.empty(p);
+PCollection empty = emptyList.apply(flatten);
 

[4/4] incubator-beam git commit: This closes #1495

2016-12-06 Thread tgroh
This closes #1495


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ca6ab6c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ca6ab6c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ca6ab6c6

Branch: refs/heads/master
Commit: ca6ab6c68d807765ab8e80273dd9d6f99aeaaf4a
Parents: 077d911 ec1eff3
Author: Thomas Groh 
Authored: Tue Dec 6 10:46:40 2016 -0800
Committer: Thomas Groh 
Committed: Tue Dec 6 10:46:40 2016 -0800

--
 .../direct/BoundedReadEvaluatorFactoryTest.java |  18 +-
 .../runners/direct/DirectGraphVisitorTest.java  |  69 --
 .../beam/runners/direct/DirectGraphs.java   |  35 +++
 .../runners/direct/EvaluationContextTest.java   |  82 ---
 .../direct/FlattenEvaluatorFactoryTest.java |  15 +-
 .../direct/GroupByKeyEvaluatorFactoryTest.java  |   2 +-
 .../GroupByKeyOnlyEvaluatorFactoryTest.java |   3 +-
 .../ImmutabilityEnforcementFactoryTest.java |   2 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |   3 +-
 .../StatefulParDoEvaluatorFactoryTest.java  |   4 +-
 .../runners/direct/StepTransformResultTest.java |   2 +-
 .../direct/TestStreamEvaluatorFactoryTest.java  |  14 +-
 .../runners/direct/TransformExecutorTest.java   |   9 +-
 .../UnboundedReadEvaluatorFactoryTest.java  |  24 +-
 .../direct/ViewEvaluatorFactoryTest.java|   4 +-
 .../direct/WatermarkCallbackExecutorTest.java   |   6 +-
 .../runners/direct/WatermarkManagerTest.java| 237 ---
 17 files changed, 292 insertions(+), 237 deletions(-)
--




[GitHub] incubator-beam pull request #1495: [BEAM-646] Add DirectTestUtils

2016-12-02 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1495

[BEAM-646] Add DirectTestUtils

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---
Add getGraph(Pipeline) and getProducer(PValue), which use the
DirectGraphVisitor and DirectGraph methods to provide access to the
producing AppliedPTransform.

Remove getProducingTransformInternal from everywhere except
DirectGraphVisitorTest

This removes all remaining uses of `PValue.getProducingTransformInternal`
from the `DirectRunner`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam 
producers_consumers_as_datastructure

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1495.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1495


commit 5dc2386be460e004108aeaf89e0b5a5e3c3e50bd
Author: Thomas Groh <tg...@google.com>
Date:   2016-12-02T18:56:36Z

Add DirectTestUtils

Add getGraph(Pipeline) and getProducer(PValue), which use the
DirectGraphVisitor and DirectGraph methods to provide access to the
producing AppliedPTransform.

Remove getProducingTransformInternal from everywhere except
DirectGraphVisitorTest

commit a7863cfbdfc8d2289fccdf5499fd245928a11a7d
Author: Thomas Groh <tg...@google.com>
Date:   2016-12-02T22:26:04Z

Remove getProducingTransformInternal from DirectGraphVisitorTest




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/3] incubator-beam git commit: Rename ConsumerTrackingPipelineVisitor to DirectGraphVisitor

2016-12-02 Thread tgroh
Rename ConsumerTrackingPipelineVisitor to DirectGraphVisitor

Reduce visibility of Visitor.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/662416a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/662416a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/662416a4

Branch: refs/heads/master
Commit: 662416a4e176cca252c0d6fde1bf4252aeaa56c0
Parents: 8162cd2
Author: Thomas Groh 
Authored: Fri Dec 2 10:07:05 2016 -0800
Committer: Thomas Groh 
Committed: Fri Dec 2 14:02:25 2016 -0800

--
 .../direct/ConsumerTrackingPipelineVisitor.java | 145 ---
 .../beam/runners/direct/DirectGraphVisitor.java | 145 +++
 .../beam/runners/direct/DirectRunner.java   |   8 +-
 .../ConsumerTrackingPipelineVisitorTest.java| 239 ---
 .../runners/direct/DirectGraphVisitorTest.java  | 239 +++
 .../runners/direct/EvaluationContextTest.java   |   6 +-
 .../ImmutabilityCheckingBundleFactoryTest.java  |   2 +-
 .../runners/direct/WatermarkManagerTest.java|   8 +-
 8 files changed, 396 insertions(+), 396 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/662416a4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
deleted file mode 100644
index b9e77c5..000
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-
-/**
- * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each 
{@link PValue} in the
- * {@link Pipeline}. This is used to schedule consuming {@link PTransform 
PTransforms} to consume
- * input after the upstream transform has produced and committed output.
- */
-public class ConsumerTrackingPipelineVisitor extends PipelineVisitor.Defaults {
-  private Map producers = new HashMap<>();
-
-  private ListMultimap primitiveConsumers =
-  ArrayListMultimap.create();
-
-  private Set views = new HashSet<>();
-  private Set rootTransforms = new HashSet<>();
-  private Map stepNames = new HashMap<>();
-  private Set toFinalize = new HashSet<>();
-  private int numTransforms = 0;
-  private boolean finalized = false;
-
-  @Override
-  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node 
node) {
-checkState(
-!finalized,
-"Attempting to traverse a pipeline (node %s) with a %s "
-+ "which has already visited a Pipeline and is finalized",
-node.getFullName(),
-ConsumerTrackingPipelineVisitor.class.getSimpleName());
-return CompositeBehavior.ENTER_TRANSFORM;
-  }
-
-  @Override
-  public void leaveCompositeTransform(TransformHierarchy.Node 

[3/3] incubator-beam git commit: This closes #1487

2016-12-02 Thread tgroh
This closes #1487


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1abbb900
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1abbb900
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1abbb900

Branch: refs/heads/master
Commit: 1abbb9007e83fc64f1bb61ff4593f37c6c386545
Parents: 8cb2689 662416a
Author: Thomas Groh 
Authored: Fri Dec 2 14:02:25 2016 -0800
Committer: Thomas Groh 
Committed: Fri Dec 2 14:02:25 2016 -0800

--
 .../direct/ConsumerTrackingPipelineVisitor.java | 173 ---
 .../apache/beam/runners/direct/DirectGraph.java |  89 ++
 .../beam/runners/direct/DirectGraphVisitor.java | 145 ++
 .../beam/runners/direct/DirectRunner.java   |  35 +--
 .../beam/runners/direct/EvaluationContext.java  |  76 ++---
 .../direct/ExecutorServiceParallelExecutor.java |  15 +-
 .../ImmutabilityCheckingBundleFactory.java  |  21 +-
 .../beam/runners/direct/WatermarkManager.java   |  50 ++--
 .../ConsumerTrackingPipelineVisitorTest.java| 287 ---
 .../runners/direct/DirectGraphVisitorTest.java  | 239 +++
 .../runners/direct/EvaluationContextTest.java   |  29 +-
 .../ImmutabilityCheckingBundleFactoryTest.java  |   6 +-
 .../runners/direct/WatermarkManagerTest.java|  23 +-
 13 files changed, 575 insertions(+), 613 deletions(-)
--




[1/3] incubator-beam git commit: Stop using Maps of Transforms in the DirectRunner

2016-12-02 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 8cb2689f8 -> 1abbb9007


Stop using Maps of Transforms in the DirectRunner

Instead, add a "DirectGraph" class, which adds a layer of indirection to
all lookup methods.

Remove all remaining uses of getProducingTransformInternal, and instead
use DirectGraph methods to obtain the producing transform.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8162cd29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8162cd29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8162cd29

Branch: refs/heads/master
Commit: 8162cd29d97ef307b6fac588f453e4e39d70fca7
Parents: 8cb2689
Author: Thomas Groh 
Authored: Thu Dec 1 15:39:30 2016 -0800
Committer: Thomas Groh 
Committed: Fri Dec 2 14:02:24 2016 -0800

--
 .../direct/ConsumerTrackingPipelineVisitor.java | 108 +++
 .../apache/beam/runners/direct/DirectGraph.java |  89 +++
 .../beam/runners/direct/DirectRunner.java   |  31 +++---
 .../beam/runners/direct/EvaluationContext.java  |  76 -
 .../direct/ExecutorServiceParallelExecutor.java |  15 +--
 .../ImmutabilityCheckingBundleFactory.java  |  21 ++--
 .../beam/runners/direct/WatermarkManager.java   |  50 -
 .../ConsumerTrackingPipelineVisitorTest.java|  98 +
 .../runners/direct/EvaluationContextTest.java   |  25 ++---
 .../ImmutabilityCheckingBundleFactoryTest.java  |   6 +-
 .../runners/direct/WatermarkManagerTest.java|  23 ++--
 11 files changed, 252 insertions(+), 290 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8162cd29/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
index acfad16..b9e77c5 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkState;
 
-import java.util.ArrayList;
-import java.util.Collection;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 
 /**
@@ -41,9 +42,13 @@ import org.apache.beam.sdk.values.PValue;
  * input after the upstream transform has produced and committed output.
  */
 public class ConsumerTrackingPipelineVisitor extends PipelineVisitor.Defaults {
-  private Map> valueToConsumers 
= new HashMap<>();
-  private Collection rootTransforms = new 
ArrayList<>();
-  private Collection views = new ArrayList<>();
+  private Map producers = new HashMap<>();
+
+  private ListMultimap primitiveConsumers =
+  ArrayListMultimap.create();
+
+  private Set views = new HashSet<>();
+  private Set rootTransforms = new HashSet<>();
   private Map stepNames = new HashMap<>();
   private Set toFinalize = new HashSet<>();
   private int numTransforms = 0;
@@ -81,81 +86,38 @@ public class ConsumerTrackingPipelineVisitor extends 
PipelineVisitor.Defaults {
   rootTransforms.add(appliedTransform);
 } else {
   for (PValue value : node.getInput().expand()) {
-valueToConsumers.get(value).add(appliedTransform);
+primitiveConsumers.put(value, appliedTransform);
   }
 }
   }
 
-  private AppliedPTransform 
getAppliedTransform(TransformHierarchy.Node node) {
-@SuppressWarnings({"rawtypes", "unchecked"})
-AppliedPTransform application = AppliedPTransform.of(
-node.getFullName(), node.getInput(), node.getOutput(), (PTransform) 
node.getTransform());
-return application;
-  }
-
-  @Override
+ @Override
   public void visitValue(PValue value, TransformHierarchy.Node producer) {
 toFinalize.add(value);
+
+AppliedPTransform appliedTransform = 

[1/2] incubator-beam git commit: Explicitly Throw in TransformExecutorTest

2016-12-02 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 37e891fe9 -> 8cb2689f8


Explicitly Throw in TransformExecutorTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b4ee8b73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b4ee8b73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b4ee8b73

Branch: refs/heads/master
Commit: b4ee8b730bffb31ee1178303f1dbd5058eb22a11
Parents: 37e891f
Author: Thomas Groh 
Authored: Fri Dec 2 10:56:15 2016 -0800
Committer: Thomas Groh 
Committed: Fri Dec 2 13:58:38 2016 -0800

--
 .../runners/direct/TransformExecutorTest.java   | 184 ++-
 1 file changed, 97 insertions(+), 87 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4ee8b73/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
--
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 85eff65..08b1e18 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -37,13 +37,10 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.util.IllegalMutationException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -63,7 +60,9 @@ import org.mockito.MockitoAnnotations;
 public class TransformExecutorTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
   private PCollection created;
-  private PCollection> downstream;
+
+  private AppliedPTransform createdProducer;
+  private AppliedPTransform downstreamProducer;
 
   private CountDownLatch evaluatorCompleted;
 
@@ -88,15 +87,17 @@ public class TransformExecutorTest {
 
 TestPipeline p = TestPipeline.create();
 created = p.apply(Create.of("foo", "spam", "third"));
-downstream = created.apply(WithKeys.of(3));
+PCollection> downstream = 
created.apply(WithKeys.of(3));
+
+createdProducer = created.getProducingTransformInternal();
+downstreamProducer = downstream.getProducingTransformInternal();
 
 when(evaluationContext.getMetrics()).thenReturn(metrics);
   }
 
   @Test
   public void callWithNullInputBundleFinishesBundleAndCompletes() throws 
Exception {
-final TransformResult result =
-
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+final TransformResult result = 
StepTransformResult.withoutHold(createdProducer).build();
 final AtomicBoolean finishCalled = new AtomicBoolean(false);
 TransformEvaluator evaluator =
 new TransformEvaluator() {
@@ -112,8 +113,7 @@ public class TransformExecutorTest {
   }
 };
 
-when(registry.forApplication(created.getProducingTransformInternal(), 
null))
-.thenReturn(evaluator);
+when(registry.forApplication(createdProducer, null)).thenReturn(evaluator);
 
 TransformExecutor executor =
 TransformExecutor.create(
@@ -121,7 +121,7 @@ public class TransformExecutorTest {
 registry,
 Collections.emptyList(),
 null,
-created.getProducingTransformInternal(),
+createdProducer,
 completionCallback,
 transformEvaluationState);
 executor.run();
@@ -133,7 +133,7 @@ public class TransformExecutorTest {
 
   @Test
   public void nullTransformEvaluatorTerminates() throws Exception {
-when(registry.forApplication(created.getProducingTransformInternal(), 
null)).thenReturn(null);
+when(registry.forApplication(createdProducer, null)).thenReturn(null);
 
 TransformExecutor executor =
 TransformExecutor.create(
@@ -141,7 +141,7 @@ public class TransformExecutorTest {
 registry,
 Collections.emptyList(),
 null,
-created.getProducingTransformInternal(),
+

[2/2] incubator-beam git commit: This closes #1490

2016-12-02 Thread tgroh
This closes #1490


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8cb2689f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8cb2689f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8cb2689f

Branch: refs/heads/master
Commit: 8cb2689f8952a73a4e855a03f98c1d5bec8181fb
Parents: 37e891f b4ee8b7
Author: Thomas Groh 
Authored: Fri Dec 2 13:58:39 2016 -0800
Committer: Thomas Groh 
Committed: Fri Dec 2 13:58:39 2016 -0800

--
 .../runners/direct/TransformExecutorTest.java   | 184 ++-
 1 file changed, 97 insertions(+), 87 deletions(-)
--




[2/3] incubator-beam git commit: Move Towards removing WindowedValue from SDK

2016-12-02 Thread tgroh
Move Towards removing WindowedValue from SDK

- Introduces ValueInSingleWindow for purposes of PAssert
- Uses ValueInSingleWindow inside DoFnTester
- Moves WindowMatchers{,Test} to runners-core

After this commit, WindowedValue does not appear in any SDK APIs
used by Pipeline authors.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d9891234
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d9891234
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d9891234

Branch: refs/heads/master
Commit: d989123424a54699ecb47ba6c0a4e437316cabce
Parents: 0fb5610
Author: Eugene Kirpichov 
Authored: Mon Oct 31 15:46:25 2016 -0700
Committer: Thomas Groh 
Committed: Fri Dec 2 13:16:04 2016 -0800

--
 .../beam/runners/core/ReduceFnRunnerTest.java   |   5 +-
 .../beam/runners/core/SplittableParDoTest.java  |  38 ++--
 .../beam/runners/core/WindowMatchers.java   | 204 +++
 .../beam/runners/core/WindowMatchersTest.java   |  82 
 .../direct/WindowEvaluatorFactoryTest.java  |   4 +-
 .../apache/beam/sdk/testing/GatherAllPanes.java |  88 
 .../org/apache/beam/sdk/testing/PAssert.java|  77 +++
 .../apache/beam/sdk/testing/PaneExtractors.java |  55 +++--
 .../beam/sdk/testing/ValueInSingleWindow.java   | 134 
 .../apache/beam/sdk/transforms/DoFnTester.java  |  58 +++---
 .../apache/beam/sdk/util/GatherAllPanes.java|  86 
 .../apache/beam/sdk/util/IdentityWindowFn.java  |   2 +-
 .../org/apache/beam/sdk/WindowMatchers.java | 204 ---
 .../org/apache/beam/sdk/WindowMatchersTest.java |  82 
 .../beam/sdk/testing/GatherAllPanesTest.java| 140 +
 .../beam/sdk/testing/PaneExtractorsTest.java| 133 ++--
 .../testing/ValueInSingleWindowCoderTest.java   |  51 +
 .../beam/sdk/util/GatherAllPanesTest.java   | 143 -
 18 files changed, 893 insertions(+), 693 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
--
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 20eb08b..ba57567 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -18,8 +18,8 @@
 package org.apache.beam.runners.core;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue;
-import static org.apache.beam.sdk.WindowMatchers.isWindowedValue;
+import static 
org.apache.beam.runners.core.WindowMatchers.isSingleWindowedValue;
+import static org.apache.beam.runners.core.WindowMatchers.isWindowedValue;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
@@ -39,7 +39,6 @@ import com.google.common.collect.Iterables;
 import java.util.Iterator;
 import java.util.List;
 import org.apache.beam.runners.core.triggers.TriggerStateMachine;
-import org.apache.beam.sdk.WindowMatchers;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
--
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 990d892..b13d839 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValueInSingleWindow;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
@@ -142,14 +143,15 @@ public class SplittableParDoTest {
 PCollection.IsBounded.BOUNDED,
 makeBoundedCollection(pipeline)
   

[1/3] incubator-beam git commit: Move Towards removing WindowedValue from SDK

2016-12-02 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 0fb561068 -> a0884492a


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
index ef501d4..7df2f89 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java
@@ -23,10 +23,10 @@ import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
@@ -34,32 +34,33 @@ import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/**
- * Tests for {@link PaneExtractors}.
- */
+/** Tests for {@link PaneExtractors}. */
 @RunWith(JUnit4.class)
 public class PaneExtractorsTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
 
   @Test
   public void onlyPaneNoFiring() {
-SerializableFunction, Iterable> 
extractor =
+SerializableFunction, 
Iterable> extractor =
 PaneExtractors.onlyPane();
-Iterable noFiring =
+Iterable noFiring =
 ImmutableList.of(
-WindowedValue.valueInGlobalWindow(9), 
WindowedValue.valueInEmptyWindows(19));
+ValueInSingleWindow.of(
+9, BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE, 
PaneInfo.NO_FIRING),
+ValueInSingleWindow.of(
+19, BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE, 
PaneInfo.NO_FIRING));
 assertThat(extractor.apply(noFiring), containsInAnyOrder(9, 19));
   }
 
   @Test
   public void onlyPaneOnlyOneFiring() {
-SerializableFunction, Iterable> 
extractor =
+SerializableFunction, 
Iterable> extractor =
 PaneExtractors.onlyPane();
-Iterable onlyFiring =
+Iterable onlyFiring =
 ImmutableList.of(
-WindowedValue.of(
+ValueInSingleWindow.of(
 2, new Instant(0L), GlobalWindow.INSTANCE, 
PaneInfo.ON_TIME_AND_ONLY_FIRING),
-WindowedValue.of(
+ValueInSingleWindow.of(
 1, new Instant(0L), GlobalWindow.INSTANCE, 
PaneInfo.ON_TIME_AND_ONLY_FIRING));
 
 assertThat(extractor.apply(onlyFiring), containsInAnyOrder(2, 1));
@@ -67,21 +68,21 @@ public class PaneExtractorsTest {
 
   @Test
   public void onlyPaneMultiplePanesFails() {
-SerializableFunction, Iterable> 
extractor =
+SerializableFunction, 
Iterable> extractor =
 PaneExtractors.onlyPane();
-Iterable multipleFiring =
+Iterable multipleFiring =
 ImmutableList.of(
-WindowedValue.of(
+ValueInSingleWindow.of(
 4,
 new Instant(0L),
 GlobalWindow.INSTANCE,
 PaneInfo.createPane(true, false, Timing.EARLY)),
-WindowedValue.of(
+ValueInSingleWindow.of(
 2,
 new Instant(0L),
 GlobalWindow.INSTANCE,
 PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)),
-WindowedValue.of(
+ValueInSingleWindow.of(
 1,
 new Instant(0L),
 GlobalWindow.INSTANCE,
@@ -94,16 +95,16 @@ public class PaneExtractorsTest {
 
   @Test
   public void onTimePane() {
-SerializableFunction, Iterable> 
extractor =
+SerializableFunction, 
Iterable> extractor =
 PaneExtractors.onTimePane();
-Iterable onlyOnTime =
+Iterable onlyOnTime =
 ImmutableList.of(
-WindowedValue.of(
+ValueInSingleWindow.of(
 4,
 new Instant(0L),
 GlobalWindow.INSTANCE,
 PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)),
-WindowedValue.of(
+ValueInSingleWindow.of(
 2,
 new Instant(0L),
 

[3/3] incubator-beam git commit: This closes #1260

2016-12-02 Thread tgroh
This closes #1260


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a0884492
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a0884492
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a0884492

Branch: refs/heads/master
Commit: a0884492a5c037a7fe58a1f011db560f9c228ae0
Parents: 0fb5610 d989123
Author: Thomas Groh 
Authored: Fri Dec 2 13:16:22 2016 -0800
Committer: Thomas Groh 
Committed: Fri Dec 2 13:16:22 2016 -0800

--
 .../beam/runners/core/ReduceFnRunnerTest.java   |   5 +-
 .../beam/runners/core/SplittableParDoTest.java  |  38 ++--
 .../beam/runners/core/WindowMatchers.java   | 204 +++
 .../beam/runners/core/WindowMatchersTest.java   |  82 
 .../direct/WindowEvaluatorFactoryTest.java  |   4 +-
 .../apache/beam/sdk/testing/GatherAllPanes.java |  88 
 .../org/apache/beam/sdk/testing/PAssert.java|  77 +++
 .../apache/beam/sdk/testing/PaneExtractors.java |  55 +++--
 .../beam/sdk/testing/ValueInSingleWindow.java   | 134 
 .../apache/beam/sdk/transforms/DoFnTester.java  |  58 +++---
 .../apache/beam/sdk/util/GatherAllPanes.java|  86 
 .../apache/beam/sdk/util/IdentityWindowFn.java  |   2 +-
 .../org/apache/beam/sdk/WindowMatchers.java | 204 ---
 .../org/apache/beam/sdk/WindowMatchersTest.java |  82 
 .../beam/sdk/testing/GatherAllPanesTest.java| 140 +
 .../beam/sdk/testing/PaneExtractorsTest.java| 133 ++--
 .../testing/ValueInSingleWindowCoderTest.java   |  51 +
 .../beam/sdk/util/GatherAllPanesTest.java   | 143 -
 18 files changed, 893 insertions(+), 693 deletions(-)
--




[GitHub] incubator-beam pull request #1490: Explicitly Throw in TransformExecutorTest

2016-12-02 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1490

Explicitly Throw in TransformExecutorTest

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---
Minor cleanup to stop using `ImmutabilityCheckingEnforcement` and instead 
to use
one that throws when specified.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam 
explicitly_throw_transform_executor_test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1490.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1490


commit 4971af757f98c7a8419834afd1b9df3bc13ff800
Author: Thomas Groh <tg...@google.com>
Date:   2016-12-02T18:56:15Z

Explicitly Throw in TransformExecutorTest




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1487: Stop using Maps of Transforms in the Dire...

2016-12-01 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1487

Stop using Maps of Transforms in the DirectRunner

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

Instead, add a "DirectGraph" class, which adds a layer of indirection to
all lookup methods.

Remove all remaining uses of getProducingTransformInternal, and instead
use DirectGraph methods to obtain the producing transform.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam 
producers_consumers_as_datastructure

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1487.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1487


commit 4f54fa66117f8f6e73827be68dc4b7000b9d1a90
Author: Thomas Groh <tg...@google.com>
Date:   2016-12-01T23:39:30Z

Stop using Maps of Transforms in the DirectRunner

Instead, add a "DirectGraph" class, which adds a layer of indirection to
all lookup methods.

Remove all remaining uses of getProducingTransformInternal, and instead
use DirectGraph methods to obtain the producing transform.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/3] incubator-beam git commit: This closes #1484

2016-12-01 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 24fab9f53 -> 63491bf21


This closes #1484


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/63491bf2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/63491bf2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/63491bf2

Branch: refs/heads/master
Commit: 63491bf211d8b2f23f6b7db1375a733c4332b850
Parents: 24fab9f 7c5c791
Author: Thomas Groh 
Authored: Thu Dec 1 14:34:21 2016 -0800
Committer: Thomas Groh 
Committed: Thu Dec 1 14:34:21 2016 -0800

--
 .../translation/ApexPipelineTranslator.java |  12 +-
 .../apex/translation/TranslationContext.java|   6 +-
 .../direct/ConsumerTrackingPipelineVisitor.java |  12 +-
 .../runners/direct/DisplayDataValidator.java|   6 +-
 .../direct/KeyedPValueTrackingVisitor.java  |  10 +-
 .../apache/beam/runners/flink/FlinkRunner.java  |  12 +-
 .../FlinkBatchPipelineTranslator.java   |  14 +-
 .../FlinkStreamingPipelineTranslator.java   |  16 +-
 .../PipelineTranslationOptimizer.java   |  10 +-
 .../dataflow/DataflowPipelineTranslator.java|   8 +-
 .../beam/runners/dataflow/DataflowRunner.java   |  10 +-
 .../runners/dataflow/DataflowRunnerTest.java|   4 +-
 .../dataflow/RecordingPipelineVisitor.java  |   6 +-
 .../apache/beam/runners/spark/SparkRunner.java  |  21 +-
 .../beam/sdk/AggregatorPipelineExtractor.java   |   6 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java |  17 +-
 .../beam/sdk/runners/TransformHierarchy.java| 244 +++-
 .../beam/sdk/runners/TransformTreeNode.java | 282 ---
 .../sdk/AggregatorPipelineExtractorTest.java|  20 +-
 .../sdk/runners/TransformHierarchyTest.java |  26 +-
 .../beam/sdk/runners/TransformTreeTest.java |   8 +-
 .../display/DisplayDataEvaluator.java   |   8 +-
 22 files changed, 344 insertions(+), 414 deletions(-)
--




[3/3] incubator-beam git commit: Migrate TransformTreeNode to an Inner Class

2016-12-01 Thread tgroh
Migrate TransformTreeNode to an Inner Class

TransformTreeNode requires access to the hierarchy it is contained
within, and generally cannot be separated from TransformHierarchy. It is
primarily an implementation detail of TransformHierarchy, so can be
relocated to within it.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/569e8d70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/569e8d70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/569e8d70

Branch: refs/heads/master
Commit: 569e8d7085cf4e6effd379f23716202c6c5daf52
Parents: 24fab9f
Author: Thomas Groh 
Authored: Thu Dec 1 13:19:14 2016 -0800
Committer: Thomas Groh 
Committed: Thu Dec 1 14:34:21 2016 -0800

--
 .../translation/ApexPipelineTranslator.java |  12 +-
 .../apex/translation/TranslationContext.java|   6 +-
 .../direct/ConsumerTrackingPipelineVisitor.java |  12 +-
 .../runners/direct/DisplayDataValidator.java|   6 +-
 .../direct/KeyedPValueTrackingVisitor.java  |  10 +-
 .../apache/beam/runners/flink/FlinkRunner.java  |  12 +-
 .../FlinkBatchPipelineTranslator.java   |  14 +-
 .../FlinkStreamingPipelineTranslator.java   |  16 +-
 .../PipelineTranslationOptimizer.java   |  10 +-
 .../dataflow/DataflowPipelineTranslator.java|   8 +-
 .../beam/runners/dataflow/DataflowRunner.java   |  10 +-
 .../runners/dataflow/DataflowRunnerTest.java|   4 +-
 .../dataflow/RecordingPipelineVisitor.java  |   6 +-
 .../apache/beam/runners/spark/SparkRunner.java  |  21 +-
 .../beam/sdk/AggregatorPipelineExtractor.java   |   6 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java |  17 +-
 .../beam/sdk/runners/TransformHierarchy.java| 243 +++-
 .../beam/sdk/runners/TransformTreeNode.java | 282 ---
 .../sdk/AggregatorPipelineExtractorTest.java|  20 +-
 .../sdk/runners/TransformHierarchyTest.java |  26 +-
 .../beam/sdk/runners/TransformTreeTest.java |   8 +-
 .../display/DisplayDataEvaluator.java   |   8 +-
 22 files changed, 343 insertions(+), 414 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
index d38faf7..8d6db84 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -19,17 +19,15 @@
 package org.apache.beam.runners.apex.translation;
 
 import com.datatorrent.api.DAG;
-
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
 import 
org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
 import 
org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -84,18 +82,18 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
   }
 
   @Override
-  public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node 
node) {
 LOG.debug("entering composite transform {}", node.getTransform());
 return CompositeBehavior.ENTER_TRANSFORM;
   }
 
   @Override
-  public void leaveCompositeTransform(TransformTreeNode node) {
+  public void leaveCompositeTransform(TransformHierarchy.Node node) {
 LOG.debug("leaving composite transform {}", node.getTransform());
   }
 
   @Override
-  public void visitPrimitiveTransform(TransformTreeNode node) {
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
 LOG.debug("visiting transform {}", node.getTransform());
 PTransform transform = node.getTransform();
 TransformTranslator translator = 
getTransformTranslator(transform.getClass());
@@ -108,7 +106,7 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
   }
 
   @Override
-  public void visitValue(PValue value, TransformTreeNode producer) {
+  

[2/3] incubator-beam git commit: Reduce the visibility of TransformHierarchy Node Mutators

2016-12-01 Thread tgroh
Reduce the visibility of TransformHierarchy Node Mutators

These mutators should not be accessible when visiting the nodes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c5c7910
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c5c7910
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c5c7910

Branch: refs/heads/master
Commit: 7c5c7910c0b8cba8623a1c49fc24c51ea691dac3
Parents: 569e8d7
Author: Thomas Groh 
Authored: Thu Dec 1 13:22:11 2016 -0800
Committer: Thomas Groh 
Committed: Thu Dec 1 14:34:21 2016 -0800

--
 .../java/org/apache/beam/sdk/runners/TransformHierarchy.java  | 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c5c7910/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 662acc1..e9829cc 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -263,7 +263,7 @@ public class TransformHierarchy {
 /**
  * Adds an output to the transform node.
  */
-public void setOutput(POutput output) {
+private void setOutput(POutput output) {
   checkState(!finishedSpecifying);
   checkState(
   this.output == null, "Tried to specify more than one output for %s", 
getFullName());
@@ -304,13 +304,14 @@ public class TransformHierarchy {
   return AppliedPTransform.of(
   getFullName(), getInput(), getOutput(), (PTransform) getTransform());
 }
+
 /**
  * Visit the transform node.
  *
  * Provides an ordered visit of the input values, the primitive 
transform (or child nodes for
  * composite transforms), then the output values.
  */
-public void visit(PipelineVisitor visitor, Set visitedValues) {
+private void visit(PipelineVisitor visitor, Set visitedValues) {
   if (!finishedSpecifying) {
 finishSpecifying();
   }
@@ -352,7 +353,7 @@ public class TransformHierarchy {
  *
  * All inputs are finished first, then the transform, then all outputs.
  */
-public void finishSpecifying() {
+private void finishSpecifying() {
   if (finishedSpecifying) {
 return;
   }



[1/2] incubator-beam git commit: Improve Splittable DoFn

2016-12-01 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master fd4b631f1 -> 24fab9f53


Improve Splittable DoFn

Makes Splittable DoFn be more like a real DoFn:
- Adds support for side inputs and outputs to SDF
- Teaches `ProcessFn` to work with exploded windows inside the
  `KeyedWorkItem`. It works with them by un-exploding the windows
  in the `Iterable` into a
  single `WindowedValue`, since the values and timestamps are
  guaranteed to be the same.

Makes SplittableParDo.ProcessFn not use the (now unavailable)
OldDoFn state and timers API:
- Makes `ProcessFn` be a primitive transform with its own
  `ParDoEvaluator`. As a nice side effect, this enables the runner to
  provide additional hooks into it - e.g. for giving the runner access
  to the restriction tracker (in later PRs)
- For consistency, moves declaration of `GBKIntoKeyedWorkItems`
  primitive transform into `SplittableParDo`, alongside the
  `SplittableProcessElements` transform
- Preserves compressed representation of `WindowedValue`'s in
  `PushbackSideInputDoFnRunner`
- Uses OutputWindowedValue in SplittableParDo.ProcessFn

Proper lifecycle management for wrapped fn.

- Caches underlying fn using DoFnLifecycleManager, so its
  @Setup and @Teardown methods are called.
- Calls @StartBundle and @FinishBundle methods on the underlying
  fn explicitly. Output from them is prohibited, since an SDF
  is only allowed to output after a successful RestrictionTracker.tryClaim.
  It's possible that an SDF should not be allowed to have
  StartBundle/FinishBundle methods at all, but I'm not sure.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/87ff5ac3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/87ff5ac3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/87ff5ac3

Branch: refs/heads/master
Commit: 87ff5ac36bb9cc62fa4864ffa7b5a5e495b9a4a1
Parents: fd4b631
Author: Eugene Kirpichov 
Authored: Wed Oct 26 16:05:01 2016 -0700
Committer: Thomas Groh 
Committed: Thu Dec 1 14:15:55 2016 -0800

--
 .../core/ElementAndRestrictionCoder.java|   8 +
 .../runners/core/GBKIntoKeyedWorkItems.java |  55 ---
 .../beam/runners/core/SplittableParDo.java  | 378 +++
 .../beam/runners/core/SplittableParDoTest.java  | 134 +--
 ...ectGBKIntoKeyedWorkItemsOverrideFactory.java |  41 +-
 .../beam/runners/direct/DirectGroupByKey.java   |   2 +-
 .../beam/runners/direct/DirectRunner.java   |   8 +-
 .../runners/direct/DoFnLifecycleManager.java|   4 +-
 .../beam/runners/direct/ParDoEvaluator.java |  26 +-
 .../runners/direct/ParDoEvaluatorFactory.java   |  63 +++-
 .../direct/ParDoMultiOverrideFactory.java   |   2 +-
 ...littableProcessElementsEvaluatorFactory.java | 144 +++
 .../direct/TransformEvaluatorRegistry.java  |   5 +
 .../beam/runners/direct/SplittableDoFnTest.java | 194 +-
 .../org/apache/beam/sdk/transforms/DoFn.java|  12 +
 .../apache/beam/sdk/transforms/DoFnTester.java  |  51 ++-
 .../sdk/util/state/TimerInternalsFactory.java   |  36 ++
 17 files changed, 905 insertions(+), 258 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
index 6dec8e2..64c1e14 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
@@ -64,4 +64,12 @@ public class ElementAndRestrictionCoder
 RestrictionT value = restrictionCoder.decode(inStream, context);
 return ElementAndRestriction.of(key, value);
   }
+
+  public Coder getElementCoder() {
+return elementCoder;
+  }
+
+  public Coder getRestrictionCoder() {
+return restrictionCoder;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87ff5ac3/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
deleted file mode 100644
index 304e349..000
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
+++ /dev/null
@@ -1,55 +0,0 @@

[2/2] incubator-beam git commit: This closes #1261

2016-12-01 Thread tgroh
This closes #1261


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/24fab9f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/24fab9f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/24fab9f5

Branch: refs/heads/master
Commit: 24fab9f53a8b3a7ef5fb35195dbe9417bbcc4101
Parents: fd4b631 87ff5ac
Author: Thomas Groh 
Authored: Thu Dec 1 14:16:58 2016 -0800
Committer: Thomas Groh 
Committed: Thu Dec 1 14:16:58 2016 -0800

--
 .../core/ElementAndRestrictionCoder.java|   8 +
 .../runners/core/GBKIntoKeyedWorkItems.java |  55 ---
 .../beam/runners/core/SplittableParDo.java  | 378 +++
 .../beam/runners/core/SplittableParDoTest.java  | 134 +--
 ...ectGBKIntoKeyedWorkItemsOverrideFactory.java |  41 +-
 .../beam/runners/direct/DirectGroupByKey.java   |   2 +-
 .../beam/runners/direct/DirectRunner.java   |   8 +-
 .../runners/direct/DoFnLifecycleManager.java|   4 +-
 .../beam/runners/direct/ParDoEvaluator.java |  26 +-
 .../runners/direct/ParDoEvaluatorFactory.java   |  63 +++-
 .../direct/ParDoMultiOverrideFactory.java   |   2 +-
 ...littableProcessElementsEvaluatorFactory.java | 144 +++
 .../direct/TransformEvaluatorRegistry.java  |   5 +
 .../beam/runners/direct/SplittableDoFnTest.java | 194 +-
 .../org/apache/beam/sdk/transforms/DoFn.java|  12 +
 .../apache/beam/sdk/transforms/DoFnTester.java  |  51 ++-
 .../sdk/util/state/TimerInternalsFactory.java   |  36 ++
 17 files changed, 905 insertions(+), 258 deletions(-)
--




[GitHub] incubator-beam pull request #1484: Migrate TransformTreeNode to an Inner Cla...

2016-12-01 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1484

Migrate TransformTreeNode to an Inner Class

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam 
transform_hierarchy_maintenance_internally

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1484.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1484


commit 1fab0da2bdfb7675fc3aa5f0faf81e2dcfe867fb
Author: Thomas Groh <tg...@google.com>
Date:   2016-12-01T21:19:14Z

Migrate TransformTreeNode to an Inner Class

TransformTreeNode requires access to the hierarchy it is contained
within, and generally cannot be separated from TransformHierarchy. It is
primarily an implementation detail of TransformHierarchy, so can be
relocated to within it.

commit 3d44a4a0709fbe1ebb25d0dea7f927415fa370a1
Author: Thomas Groh <tg...@google.com>
Date:   2016-12-01T21:22:11Z

Reduce the visibility of TransformHierarchy Node Mutators

These mutators should not be accessible when visiting the nodes.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: Move TransformHierarchy Maintenance into it

2016-12-01 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 0c875ba70 -> 48130f718


Move TransformHierarchy Maintenance into it

This reduces the complexity of Pipeline.applyInternal by keeping the
responsiblities to passing a node into the Transform Hierarchy,
enforcing name uniqueness, and causing the runner to expand the
PTransform. This logic is moved to the appropriate application sites.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ab1f1ad0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ab1f1ad0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ab1f1ad0

Branch: refs/heads/master
Commit: ab1f1ad012bc559cdb099319a516e4437eed2825
Parents: 0c875ba
Author: Thomas Groh 
Authored: Tue Nov 29 14:29:47 2016 -0800
Committer: Thomas Groh 
Committed: Thu Dec 1 12:55:25 2016 -0800

--
 .../direct/KeyedPValueTrackingVisitor.java  |   2 +-
 .../DataflowPipelineTranslatorTest.java |   2 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java | 117 +++-
 .../beam/sdk/runners/TransformHierarchy.java| 126 -
 .../beam/sdk/runners/TransformTreeNode.java | 165 +
 .../sdk/runners/TransformHierarchyTest.java | 180 ++-
 .../beam/sdk/runners/TransformTreeTest.java |   4 +-
 7 files changed, 340 insertions(+), 256 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
index 7c4376a..47b0857 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
@@ -74,7 +74,7 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor {
 if (node.isRootNode()) {
   finalized = true;
 } else if (producesKeyedOutputs.contains(node.getTransform().getClass())) {
-  keyedValues.addAll(node.getExpandedOutputs());
+  keyedValues.addAll(node.getOutput().expand());
 }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
--
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index c925454..95c7132 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -669,7 +669,7 @@ public class DataflowPipelineTranslatorTest implements 
Serializable {
 PCollection input = p.begin()
 .apply(Create.of(1, 2, 3));
 
-thrown.expect(IllegalStateException.class);
+thrown.expect(IllegalArgumentException.class);
 input.apply(new PartiallyBoundOutputCreator());
 
 Assert.fail("Failure expected from use of partially bound output");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab1f1ad0/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
--
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 9edf496..c8a4439 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -17,10 +17,11 @@
  */
 package org.apache.beam.sdk;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.coders.CoderRegistry;
@@ -31,7 +32,6 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.runners.TransformTreeNode;
 import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
 import 

[2/2] incubator-beam git commit: This closes #1469

2016-12-01 Thread tgroh
This closes #1469


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/48130f71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/48130f71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/48130f71

Branch: refs/heads/master
Commit: 48130f718d019d6928c464e6f7ad90cd510b62d2
Parents: 0c875ba ab1f1ad
Author: Thomas Groh 
Authored: Thu Dec 1 12:55:26 2016 -0800
Committer: Thomas Groh 
Committed: Thu Dec 1 12:55:26 2016 -0800

--
 .../direct/KeyedPValueTrackingVisitor.java  |   2 +-
 .../DataflowPipelineTranslatorTest.java |   2 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java | 117 +++-
 .../beam/sdk/runners/TransformHierarchy.java| 126 -
 .../beam/sdk/runners/TransformTreeNode.java | 165 +
 .../sdk/runners/TransformHierarchyTest.java | 180 ++-
 .../beam/sdk/runners/TransformTreeTest.java |   4 +-
 7 files changed, 340 insertions(+), 256 deletions(-)
--




[2/2] incubator-beam git commit: Preserves compressed windows in PushbackSideInputDoFnRunner

2016-11-30 Thread tgroh
Preserves compressed windows in PushbackSideInputDoFnRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/38f0b11c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/38f0b11c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/38f0b11c

Branch: refs/heads/master
Commit: 38f0b11cc9028cf347e3c96b6e6116e5a5a9972d
Parents: 565e99f
Author: Eugene Kirpichov 
Authored: Wed Nov 30 14:28:51 2016 -0800
Committer: Thomas Groh 
Committed: Wed Nov 30 16:26:33 2016 -0800

--
 .../core/PushbackSideInputDoFnRunner.java   | 20 
 .../core/PushbackSideInputDoFnRunnerTest.java   | 18 +++---
 2 files changed, 27 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/38f0b11c/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index 8c169da..460154d 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -74,17 +74,29 @@ public class PushbackSideInputDoFnRunner 
implements DoFnRunner<
   processElement(elem);
   return Collections.emptyList();
 }
-ImmutableList.Builder pushedBack = 
ImmutableList.builder();
+ImmutableList.Builder readyWindowsBuilder = 
ImmutableList.builder();
+ImmutableList.Builder pushedBackWindowsBuilder = 
ImmutableList.builder();
 for (WindowedValue windowElem : elem.explodeWindows()) {
   BoundedWindow mainInputWindow = 
Iterables.getOnlyElement(windowElem.getWindows());
   if (isReady(mainInputWindow)) {
-processElement(windowElem);
+readyWindowsBuilder.add(mainInputWindow);
   } else {
 notReadyWindows.add(mainInputWindow);
-pushedBack.add(windowElem);
+pushedBackWindowsBuilder.add(mainInputWindow);
   }
 }
-return pushedBack.build();
+ImmutableList readyWindows = readyWindowsBuilder.build();
+ImmutableList pushedBackWindows = 
pushedBackWindowsBuilder.build();
+if (!readyWindows.isEmpty()) {
+  processElement(
+  WindowedValue.of(
+  elem.getValue(), elem.getTimestamp(), readyWindows, 
elem.getPane()));
+}
+return pushedBackWindows.isEmpty()
+? ImmutableList.of()
+: ImmutableList.of(
+WindowedValue.of(
+elem.getValue(), elem.getTimestamp(), pushedBackWindows, 
elem.getPane()));
   }
 
   private boolean isReady(BoundedWindow mainInputWindow) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/38f0b11c/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
--
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
index 59a7c92..f8f4604 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.runners.core;
 
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.when;
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Sum;
@@ -131,7 +130,7 @@ public class PushbackSideInputDoFnRunnerTest {
 PaneInfo.ON_TIME_AND_ONLY_FIRING);
 Iterable multiWindowPushback =
 runner.processElementInReadyWindows(multiWindow);
-assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows()));
+assertThat(multiWindowPushback, contains(multiWindow));
 assertThat(underlying.inputElems, 
Matchers.emptyIterable());
   }
 

[1/2] incubator-beam git commit: This closes #1471

2016-11-30 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 565e99fbf -> a20bc4793


This closes #1471


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a20bc479
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a20bc479
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a20bc479

Branch: refs/heads/master
Commit: a20bc479347b2a10307ada45dc7220ee00671fac
Parents: 565e99f 38f0b11
Author: Thomas Groh 
Authored: Wed Nov 30 16:26:33 2016 -0800
Committer: Thomas Groh 
Committed: Wed Nov 30 16:26:33 2016 -0800

--
 .../core/PushbackSideInputDoFnRunner.java   | 20 
 .../core/PushbackSideInputDoFnRunnerTest.java   | 18 +++---
 2 files changed, 27 insertions(+), 11 deletions(-)
--




[1/2] incubator-beam git commit: Revert "Improvements to ReduceFnRunner prefetching"

2016-11-30 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 4b682039d -> c8f2cdb22


Revert "Improvements to ReduceFnRunner prefetching"

This reverts commit 4282c67c5fa4dea2fe6c8695e0ea23f383c6457b, which
contained some incompatibilities outside of runners-core.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aaa3b91e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aaa3b91e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aaa3b91e

Branch: refs/heads/master
Commit: aaa3b91e1e7b39dd585314a6017235cdd127e923
Parents: 4b68203
Author: Kenneth Knowles 
Authored: Wed Nov 30 15:21:53 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Nov 30 15:21:53 2016 -0800

--
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |  11 +-
 .../beam/runners/core/PaneInfoTracker.java  |   4 -
 .../runners/core/ReduceFnContextFactory.java|   9 +-
 .../beam/runners/core/ReduceFnRunner.java   | 488 +++
 .../apache/beam/runners/core/WatermarkHold.java |   5 -
 .../triggers/TriggerStateMachineRunner.java |  14 +-
 .../beam/runners/core/ReduceFnTester.java   |   4 +-
 .../GroupAlsoByWindowEvaluatorFactory.java  |   5 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  |   6 +-
 .../sdk/util/state/InMemoryTimerInternals.java  |  22 +-
 .../beam/sdk/util/state/TimerCallback.java  |   9 +-
 .../util/state/InMemoryTimerInternalsTest.java  |  54 +-
 12 files changed, 224 insertions(+), 407 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 294f21d..8b10813 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
@@ -72,9 +73,9 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
 
   @Override
   public void processElement(ProcessContext c) throws Exception {
-KeyedWorkItem keyedWorkItem = c.element();
+KeyedWorkItem element = c.element();
 
-K key = keyedWorkItem.key();
+K key = c.element().key();
 TimerInternals timerInternals = c.windowingInternals().timerInternals();
 StateInternals stateInternals = 
stateInternalsFactory.stateInternalsForKey(key);
 
@@ -92,8 +93,10 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
 reduceFn,
 c.getPipelineOptions());
 
-reduceFnRunner.processElements(keyedWorkItem.elementsIterable());
-reduceFnRunner.onTimers(keyedWorkItem.timersIterable());
+reduceFnRunner.processElements(element.elementsIterable());
+for (TimerData timer : element.timersIterable()) {
+  reduceFnRunner.onTimer(timer);
+}
 reduceFnRunner.persist();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aaa3b91e/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
index 69a4cfd..8140243 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
@@ -54,10 +54,6 @@ public class PaneInfoTracker {
 state.access(PANE_INFO_TAG).clear();
   }
 
-  public void prefetchPaneInfo(ReduceFn.Context context) {
-context.state().access(PaneInfoTracker.PANE_INFO_TAG).readLater();
-  }
-
   /**
* Return a ({@link ReadableState} for) the pane info appropriate for {@code 
context}. The pane
* info includes the timing for the pane, who's calculation is quite subtle.


[2/2] incubator-beam git commit: This closes #1474

2016-11-30 Thread tgroh
This closes #1474


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c8f2cdb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c8f2cdb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c8f2cdb2

Branch: refs/heads/master
Commit: c8f2cdb223cf39d76e733e4dc3fffb5487d41fa4
Parents: 4b68203 aaa3b91
Author: Thomas Groh 
Authored: Wed Nov 30 15:30:30 2016 -0800
Committer: Thomas Groh 
Committed: Wed Nov 30 15:30:30 2016 -0800

--
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |  11 +-
 .../beam/runners/core/PaneInfoTracker.java  |   4 -
 .../runners/core/ReduceFnContextFactory.java|   9 +-
 .../beam/runners/core/ReduceFnRunner.java   | 488 +++
 .../apache/beam/runners/core/WatermarkHold.java |   5 -
 .../triggers/TriggerStateMachineRunner.java |  14 +-
 .../beam/runners/core/ReduceFnTester.java   |   4 +-
 .../GroupAlsoByWindowEvaluatorFactory.java  |   5 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  |   6 +-
 .../sdk/util/state/InMemoryTimerInternals.java  |  22 +-
 .../beam/sdk/util/state/TimerCallback.java  |   9 +-
 .../util/state/InMemoryTimerInternalsTest.java  |  54 +-
 12 files changed, 224 insertions(+), 407 deletions(-)
--




[GitHub] incubator-beam pull request #1470: [BEAM-1063] Shutdown DynamicSplit Executo...

2016-11-30 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1470

[BEAM-1063] Shutdown DynamicSplit Executor in Cleanup

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

This ensures that the threads will be shut off when the pipeline shuts
down, enabling a JVM with no more work to do to shut down as well.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam 
direct_runner_finishes_and_hangs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1470.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1470


commit 2780181991afd62093ca79330f95ff7c759c2f6b
Author: Thomas Groh <tg...@google.com>
Date:   2016-11-30T22:30:14Z

Shutdown DynamicSplit Executor in Cleanup

This ensures that the threads will be shut off when the pipeline shuts
down, enabling a JVM with no more work to do to shut down as well.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1469: [BEAM-646] Move TransformHierarchy Mainte...

2016-11-30 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1469

[BEAM-646] Move TransformHierarchy Maintenance into it

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

This reduces the complexity of Pipeline.applyInternal by keeping the
responsiblities to passing a node into the Transform Hierarchy,
enforcing name uniqueness, and causing the runner to expand the
PTransform. This logic is moved to the appropriate application sites.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam 
transform_hierarchy_maintenance_internally

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1469.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1469


commit ae6ac672e582148b846c3910cc7247700b3403cb
Author: Thomas Groh <tg...@google.com>
Date:   2016-11-29T22:29:47Z

Move TransformHierarchy Maintenance into it

This reduces the complexity of Pipeline.applyInternal by keeping the
responsiblities to passing a node into the Transform Hierarchy,
enforcing name uniqueness, and causing the runner to expand the
PTransform. This logic is moved to the appropriate application sites.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: This closes #1382

2016-11-29 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 5ce75a2ea -> 70c1de9b9


This closes #1382


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/70c1de9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/70c1de9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/70c1de9b

Branch: refs/heads/python-sdk
Commit: 70c1de9b95e9c20e5efb277d9ad50ae6348418e0
Parents: 5ce75a2 81e7a0f
Author: Thomas Groh 
Authored: Tue Nov 29 15:43:04 2016 -0800
Committer: Thomas Groh 
Committed: Tue Nov 29 15:43:04 2016 -0800

--
 sdks/python/apache_beam/dataflow_test.py | 66 +++
 sdks/python/apache_beam/test_pipeline.py | 76 +++
 2 files changed, 120 insertions(+), 22 deletions(-)
--




[2/2] incubator-beam git commit: Support ValidatesRunner Attribute in Python

2016-11-29 Thread tgroh
Support ValidatesRunner Attribute in Python

This is roughly equivalent to "RunnableOnService" in the Java SDK. See
BEAM-655


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/81e7a0f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/81e7a0f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/81e7a0f6

Branch: refs/heads/python-sdk
Commit: 81e7a0f653864212a5c9d3d0802608f92bb34501
Parents: 5ce75a2
Author: Mark Liu 
Authored: Thu Nov 17 14:45:42 2016 -0800
Committer: Thomas Groh 
Committed: Tue Nov 29 15:43:04 2016 -0800

--
 sdks/python/apache_beam/dataflow_test.py | 66 +++
 sdks/python/apache_beam/test_pipeline.py | 76 +++
 2 files changed, 120 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81e7a0f6/sdks/python/apache_beam/dataflow_test.py
--
diff --git a/sdks/python/apache_beam/dataflow_test.py 
b/sdks/python/apache_beam/dataflow_test.py
index f96e8af..ba3553a 100644
--- a/sdks/python/apache_beam/dataflow_test.py
+++ b/sdks/python/apache_beam/dataflow_test.py
@@ -24,13 +24,13 @@ import re
 import unittest
 
 import apache_beam as beam
-from apache_beam.pipeline import Pipeline
 from apache_beam.pvalue import AsDict
 from apache_beam.pvalue import AsIter as AllOf
 from apache_beam.pvalue import AsList
 from apache_beam.pvalue import AsSingleton
 from apache_beam.pvalue import EmptySideInput
 from apache_beam.pvalue import SideOutputValue
+from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms import Create
 from apache_beam.transforms import DoFn
 from apache_beam.transforms import FlatMap
@@ -42,6 +42,7 @@ from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 from apache_beam.transforms.window import IntervalWindow
 from apache_beam.transforms.window import WindowFn
+from nose.plugins.attrib import attr
 
 
 class DataflowTest(unittest.TestCase):
@@ -58,8 +59,9 @@ class DataflowTest(unittest.TestCase):
 | 'GroupCounts' >> GroupByKey()
 | 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones
 
+  @attr('ValidatesRunner')
   def test_word_count(self):
-pipeline = Pipeline('DirectPipelineRunner')
+pipeline = TestPipeline()
 lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA)
 result = (
 (lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x)))
@@ -67,8 +69,9 @@ class DataflowTest(unittest.TestCase):
 assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
 pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_map(self):
-pipeline = Pipeline('DirectPipelineRunner')
+pipeline = TestPipeline()
 lines = pipeline | 'input' >> Create(['a', 'b', 'c'])
 result = (lines
   | 'upper' >> Map(str.upper)
@@ -76,8 +79,9 @@ class DataflowTest(unittest.TestCase):
 assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C']))
 pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_par_do_with_side_input_as_arg(self):
-pipeline = Pipeline('DirectPipelineRunner')
+pipeline = TestPipeline()
 words_list = ['aa', 'bb', 'cc']
 words = pipeline | 'SomeWords' >> Create(words_list)
 prefix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
@@ -89,8 +93,9 @@ class DataflowTest(unittest.TestCase):
 assert_that(result, equal_to(['xyz-%s-zyx' % x for x in words_list]))
 pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_par_do_with_side_input_as_keyword_arg(self):
-pipeline = Pipeline('DirectPipelineRunner')
+pipeline = TestPipeline()
 words_list = ['aa', 'bb', 'cc']
 words = pipeline | 'SomeWords' >> Create(words_list)
 prefix = 'zyx'
@@ -102,6 +107,7 @@ class DataflowTest(unittest.TestCase):
 assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
 pipeline.run()
 
+  @attr('ValidatesRunner')
   def test_par_do_with_do_fn_object(self):
 class SomeDoFn(DoFn):
   """A custom DoFn for a FlatMap transform."""
@@ -109,7 +115,7 @@ class DataflowTest(unittest.TestCase):
   def process(self, context, prefix, suffix):
 return ['%s-%s-%s' % (prefix, context.element, suffix)]
 
-pipeline = Pipeline('DirectPipelineRunner')
+pipeline = TestPipeline()
 words_list = ['aa', 'bb', 'cc']
 words = pipeline | 'SomeWords' >> Create(words_list)
 prefix = 'zyx'
@@ -119,6 +125,7 @@ class DataflowTest(unittest.TestCase):
 assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
 pipeline.run()
 
+  @attr('ValidatesRunner')
   def 

[2/2] incubator-beam git commit: This closes #1457

2016-11-29 Thread tgroh
This closes #1457


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b1f7013d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b1f7013d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b1f7013d

Branch: refs/heads/master
Commit: b1f7013d88cea1290f92898b36d6a761546f8e60
Parents: 8d127be 1184bfa
Author: Thomas Groh 
Authored: Tue Nov 29 15:37:58 2016 -0800
Committer: Thomas Groh 
Committed: Tue Nov 29 15:37:58 2016 -0800

--
 .../src/main/java/org/apache/beam/sdk/Pipeline.java  | 15 ---
 1 file changed, 15 deletions(-)
--




[1/2] incubator-beam git commit: Remove TransformApplicationsForTesting

2016-11-29 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 8d127beb8 -> b1f7013d8


Remove TransformApplicationsForTesting

This field is mutated but never queried.

Remove Pipeline#addValueInternal

This method is never called and not suitable for use.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1184bfa7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1184bfa7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1184bfa7

Branch: refs/heads/master
Commit: 1184bfa7a3ee5d58d65c9ba9200e91f71856ce4a
Parents: 8d127be
Author: Thomas Groh 
Authored: Tue Nov 29 14:30:09 2016 -0800
Committer: Thomas Groh 
Committed: Tue Nov 29 15:37:57 2016 -0800

--
 .../src/main/java/org/apache/beam/sdk/Pipeline.java  | 15 ---
 1 file changed, 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1184bfa7/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
--
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index e188b35..9edf496 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -329,8 +327,6 @@ public class Pipeline {
   private Collection values = new ArrayList<>();
   private Set usedFullNames = new HashSet<>();
   private CoderRegistry coderRegistry;
-  private Multimap 
transformApplicationsForTesting =
-  HashMultimap.create();
 
   /**
* @deprecated replaced by {@link #Pipeline(PipelineRunner, PipelineOptions)}
@@ -399,7 +395,6 @@ public class Pipeline {
 
   AppliedPTransform applied = AppliedPTransform.of(
   child.getFullName(), input, output, transform);
-  transformApplicationsForTesting.put(transform, applied);
   // recordAsOutput is a NOOP if already called;
   output.recordAsOutput(applied);
   verifyOutputState(output, child);
@@ -513,14 +508,4 @@ public class Pipeline {
   private String buildName(String namePrefix, String name) {
 return namePrefix.isEmpty() ? name : namePrefix + "/" + name;
   }
-
-  /**
-   * Adds the given {@link PValue} to this {@link Pipeline}.
-   *
-   * For internal use only.
-   */
-  public void addValueInternal(PValue value) {
-this.values.add(value);
-LOG.debug("Adding {} to {}", value, this);
-  }
 }



[GitHub] incubator-beam pull request #1457: Remove TransformApplicationsForTesting

2016-11-29 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1457

Remove TransformApplicationsForTesting

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

This field is mutated but never queried.

Remove Pipeline#addValueInternal

This method is never called and not suitable for use.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam remove_some_internals

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1457.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1457


commit 9755baf949031a9398049072aa2817f5e82bf8ff
Author: Thomas Groh <tg...@google.com>
Date:   2016-11-29T22:30:09Z

Remove TransformApplicationsForTesting

This field is mutated but never queried.

Remove Pipeline#addValueInternal

This method is never called and not suitable for use.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: Remove the Experimental Annotation from the DirectRunner

2016-11-28 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 4bcef03d9 -> cdb7ba165


Remove the Experimental Annotation from the DirectRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0ae18124
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0ae18124
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0ae18124

Branch: refs/heads/master
Commit: 0ae181248823b08803a8f8ac0d39a35a190c4acd
Parents: 4bcef03
Author: Thomas Groh 
Authored: Mon Nov 28 15:52:03 2016 -0800
Committer: Thomas Groh 
Committed: Mon Nov 28 15:52:51 2016 -0800

--
 .../main/java/org/apache/beam/runners/direct/DirectRunner.java  | 5 +
 1 file changed, 1 insertion(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0ae18124/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index cb31947..f71e109 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -41,7 +41,6 @@ import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.metrics.MetricResults;
@@ -71,9 +70,7 @@ import org.joda.time.Instant;
  * An In-Memory implementation of the Dataflow Programming Model. Supports 
Unbounded
  * {@link PCollection PCollections}.
  */
-@Experimental
-public class DirectRunner
-extends PipelineRunner {
+public class DirectRunner extends PipelineRunner {
   /**
* The default set of transform overrides to use in the {@link DirectRunner}.
*



[GitHub] incubator-beam pull request #1447: Remove the Experimental Annotation from t...

2016-11-28 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1447

Remove the Experimental Annotation from the DirectRunner

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

R: @kennknowles 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam 
not_experimental_direct_runner

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1447.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1447


commit 0ae181248823b08803a8f8ac0d39a35a190c4acd
Author: Thomas Groh <tg...@google.com>
Date:   2016-11-28T23:52:03Z

Remove the Experimental Annotation from the DirectRunner




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: Add direct runner dependency to starter archetype

2016-11-28 Thread tgroh
Add direct runner dependency to starter archetype


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/14502a31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/14502a31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/14502a31

Branch: refs/heads/master
Commit: 14502a3122bd1c1652196881797ffb60d480164e
Parents: 33c6870
Author: Scott Wegner 
Authored: Tue Nov 22 08:56:17 2016 -0800
Committer: Thomas Groh 
Committed: Mon Nov 28 13:17:46 2016 -0800

--
 .../examples/src/main/resources/archetype-resources/pom.xml  | 2 +-
 .../starter/src/main/resources/archetype-resources/pom.xml   | 8 
 .../src/test/resources/projects/basic/reference/pom.xml  | 8 
 3 files changed, 17 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14502a31/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
--
diff --git 
a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
 
b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
index b18c57c..031ee88 100644
--- 
a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
+++ 
b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/pom.xml
@@ -93,7 +93,7 @@
   0.4.0-incubating-SNAPSHOT
 
 
-
+
 
   org.apache.beam
   beam-runners-direct-java

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14502a31/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
--
diff --git 
a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
 
b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
index 738acde..4fae02c 100644
--- 
a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
+++ 
b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/pom.xml
@@ -72,6 +72,14 @@
   0.4.0-incubating-SNAPSHOT
 
 
+
+
+  org.apache.beam
+  beam-runners-direct-java
+  0.4.0-incubating-SNAPSHOT
+  runtime
+
+
 
 
   org.slf4j

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/14502a31/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
--
diff --git 
a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
 
b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
index 6950ed5..4656e63 100644
--- 
a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
+++ 
b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/pom.xml
@@ -72,6 +72,14 @@
   0.4.0-incubating-SNAPSHOT
 
 
+
+
+  org.apache.beam
+  beam-runners-direct-java
+  0.4.0-incubating-SNAPSHOT
+  runtime
+
+
 
 
   org.slf4j



[1/2] incubator-beam git commit: This closes #1414

2016-11-28 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 33c687069 -> d5aeee9c1


This closes #1414


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d5aeee9c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d5aeee9c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d5aeee9c

Branch: refs/heads/master
Commit: d5aeee9c17a75307ff539e40b3ddc176790ec559
Parents: 33c6870 14502a3
Author: Thomas Groh 
Authored: Mon Nov 28 13:17:46 2016 -0800
Committer: Thomas Groh 
Committed: Mon Nov 28 13:17:46 2016 -0800

--
 .../examples/src/main/resources/archetype-resources/pom.xml  | 2 +-
 .../starter/src/main/resources/archetype-resources/pom.xml   | 8 
 .../src/test/resources/projects/basic/reference/pom.xml  | 8 
 3 files changed, 17 insertions(+), 1 deletion(-)
--




[GitHub] incubator-beam pull request #1442: [BEAM-646] Add Replacement Methods to Tra...

2016-11-28 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1442

[BEAM-646] Add Replacement Methods to TransformHierarchy, PValue

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [x] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

These are used as the underlying mechanism for Pipeline Surgery.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam 
transform_hierarchy_surgery_methods

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1442.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1442


commit 09b46520a6a7baa6bd59e69f7bec86a601b5
Author: Thomas Groh <tg...@google.com>
Date:   2016-11-23T02:19:03Z

Add Replacement Methods to TransformHierarchy, PValue

These are used as the underlying mechanism for Pipeline Surgery.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: Add input type to TransformResult

2016-11-28 Thread tgroh
Add input type to TransformResult

This would likely have caught some hard-to-diagnose type safety errors
during the development of StatefulParDoEvaluatorFactory, so adding it
should hopefully catch similar bugs in the future.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7502adda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7502adda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7502adda

Branch: refs/heads/master
Commit: 7502adda3262bce9d6d4fe4499bde8d8b5273029
Parents: 9fbd2d2
Author: Kenneth Knowles 
Authored: Tue Nov 22 16:01:45 2016 -0800
Committer: Thomas Groh 
Committed: Mon Nov 28 10:06:31 2016 -0800

--
 .../direct/AbstractModelEnforcement.java|  2 +-
 .../direct/BoundedReadEvaluatorFactory.java |  2 +-
 .../beam/runners/direct/CommittedResult.java|  2 +-
 .../beam/runners/direct/CompletionCallback.java |  2 +-
 ...ecycleManagerRemovingTransformEvaluator.java |  2 +-
 .../runners/direct/EmptyTransformEvaluator.java |  4 +-
 .../beam/runners/direct/EvaluationContext.java  |  2 +-
 .../direct/ExecutorServiceParallelExecutor.java |  2 +-
 .../runners/direct/FlattenEvaluatorFactory.java | 10 ++---
 .../GroupAlsoByWindowEvaluatorFactory.java  |  5 ++-
 .../direct/GroupByKeyOnlyEvaluatorFactory.java  |  2 +-
 .../direct/ImmutabilityEnforcementFactory.java  |  2 +-
 .../beam/runners/direct/ModelEnforcement.java   |  2 +-
 .../beam/runners/direct/ParDoEvaluator.java |  2 +-
 .../direct/PassthroughTransformEvaluator.java   |  4 +-
 .../runners/direct/StepTransformResult.java | 38 +
 .../direct/TestStreamEvaluatorFactory.java  |  2 +-
 .../beam/runners/direct/TransformEvaluator.java |  2 +-
 .../beam/runners/direct/TransformExecutor.java  |  4 +-
 .../beam/runners/direct/TransformResult.java| 16 +--
 .../direct/UnboundedReadEvaluatorFactory.java   |  3 +-
 .../runners/direct/ViewEvaluatorFactory.java|  2 +-
 .../runners/direct/WindowEvaluatorFactory.java  |  6 ++-
 .../direct/BoundedReadEvaluatorFactoryTest.java | 10 ++---
 ...leManagerRemovingTransformEvaluatorTest.java |  4 +-
 .../runners/direct/EvaluationContextTest.java   | 20 -
 .../direct/FlattenEvaluatorFactoryTest.java |  6 +--
 .../ImmutabilityEnforcementFactoryTest.java |  6 +--
 .../beam/runners/direct/ParDoEvaluatorTest.java |  2 +-
 .../runners/direct/StepTransformResultTest.java | 25 ++-
 .../direct/TestStreamEvaluatorFactoryTest.java  | 10 ++---
 .../runners/direct/TransformExecutorTest.java   | 45 ++--
 .../UnboundedReadEvaluatorFactoryTest.java  | 20 ++---
 .../direct/WindowEvaluatorFactoryTest.java  | 12 +++---
 34 files changed, 152 insertions(+), 126 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
index 81f0f5f..f09164b 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
@@ -33,6 +33,6 @@ abstract class AbstractModelEnforcement implements 
ModelEnforcement {
   @Override
   public void afterFinish(
   CommittedBundle input,
-  TransformResult result,
+  TransformResult result,
   Iterable> outputs) {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7502adda/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 66c55cd..65b622f 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -161,7 +161,7 @@ final class BoundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
 }
 
 @Override
-public TransformResult finishBundle()  {
+public TransformResult finishBundle()  {
   return resultBuilder.build();
 }
   }


[1/2] incubator-beam git commit: This closes #1424

2016-11-28 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 9fbd2d24e -> 5e9a80cf6


This closes #1424


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5e9a80cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5e9a80cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5e9a80cf

Branch: refs/heads/master
Commit: 5e9a80cf61c4666edb5febd9a0fface2d2c26261
Parents: 9fbd2d2 7502add
Author: Thomas Groh 
Authored: Mon Nov 28 10:06:31 2016 -0800
Committer: Thomas Groh 
Committed: Mon Nov 28 10:06:31 2016 -0800

--
 .../direct/AbstractModelEnforcement.java|  2 +-
 .../direct/BoundedReadEvaluatorFactory.java |  2 +-
 .../beam/runners/direct/CommittedResult.java|  2 +-
 .../beam/runners/direct/CompletionCallback.java |  2 +-
 ...ecycleManagerRemovingTransformEvaluator.java |  2 +-
 .../runners/direct/EmptyTransformEvaluator.java |  4 +-
 .../beam/runners/direct/EvaluationContext.java  |  2 +-
 .../direct/ExecutorServiceParallelExecutor.java |  2 +-
 .../runners/direct/FlattenEvaluatorFactory.java | 10 ++---
 .../GroupAlsoByWindowEvaluatorFactory.java  |  5 ++-
 .../direct/GroupByKeyOnlyEvaluatorFactory.java  |  2 +-
 .../direct/ImmutabilityEnforcementFactory.java  |  2 +-
 .../beam/runners/direct/ModelEnforcement.java   |  2 +-
 .../beam/runners/direct/ParDoEvaluator.java |  2 +-
 .../direct/PassthroughTransformEvaluator.java   |  4 +-
 .../runners/direct/StepTransformResult.java | 38 +
 .../direct/TestStreamEvaluatorFactory.java  |  2 +-
 .../beam/runners/direct/TransformEvaluator.java |  2 +-
 .../beam/runners/direct/TransformExecutor.java  |  4 +-
 .../beam/runners/direct/TransformResult.java| 16 +--
 .../direct/UnboundedReadEvaluatorFactory.java   |  3 +-
 .../runners/direct/ViewEvaluatorFactory.java|  2 +-
 .../runners/direct/WindowEvaluatorFactory.java  |  6 ++-
 .../direct/BoundedReadEvaluatorFactoryTest.java | 10 ++---
 ...leManagerRemovingTransformEvaluatorTest.java |  4 +-
 .../runners/direct/EvaluationContextTest.java   | 20 -
 .../direct/FlattenEvaluatorFactoryTest.java |  6 +--
 .../ImmutabilityEnforcementFactoryTest.java |  6 +--
 .../beam/runners/direct/ParDoEvaluatorTest.java |  2 +-
 .../runners/direct/StepTransformResultTest.java | 25 ++-
 .../direct/TestStreamEvaluatorFactoryTest.java  | 10 ++---
 .../runners/direct/TransformExecutorTest.java   | 45 ++--
 .../UnboundedReadEvaluatorFactoryTest.java  | 20 ++---
 .../direct/WindowEvaluatorFactoryTest.java  | 12 +++---
 34 files changed, 152 insertions(+), 126 deletions(-)
--




[GitHub] incubator-beam pull request #1425: Add TransformHierarchyTest

2016-11-22 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1425

Add TransformHierarchyTest

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

This tests basic features of TransformHierarchy

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam transform_hierarchy_test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1425.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1425


commit 7d72601b9be356c35e5dd25b86b0857ad4ce554a
Author: Thomas Groh <tg...@google.com>
Date:   2016-11-23T00:14:29Z

Add TransformHierarchyTest

This tests basic features of TransformHierarchy




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1422: Output Keyed Bundles in GroupAlsoByWindow...

2016-11-22 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1422

Output Keyed Bundles in GroupAlsoByWindowEvaluator

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

This allows reuse of keys for downstream serialization.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam no_gbk_keyedness

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1422.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1422


commit 22fbb1728c73d4595fad9bd6e1a13c83100048ca
Author: Thomas Groh <tg...@google.com>
Date:   2016-11-22T22:51:39Z

Output Keyed Bundles in GroupAlsoByWindowEvaluator

This allows reuse of keys for downstream serialization.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: Simplify the API for managing MetricsEnvironment

2016-11-22 Thread tgroh
Simplify the API for managing MetricsEnvironment

1. setCurrentContainer returns the previous MetricsEnvironment
2. setCurrentContainer(null) resets the thread local
3. scopedCurrentContainer sets the container and returns a Closeable to
   reset the previous container.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e6870a6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e6870a6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e6870a6d

Branch: refs/heads/master
Commit: e6870a6dc10e4ad52a911c316137a9f7731a9194
Parents: 6ec45f7
Author: bchambers 
Authored: Tue Nov 22 11:37:23 2016 -0800
Committer: Thomas Groh 
Committed: Tue Nov 22 12:27:41 2016 -0800

--
 .../beam/runners/direct/TransformExecutor.java  |  5 +-
 .../beam/sdk/metrics/MetricsEnvironment.java| 60 +++-
 .../sdk/metrics/MetricsEnvironmentTest.java |  8 +--
 .../apache/beam/sdk/metrics/MetricsTest.java|  6 +-
 4 files changed, 56 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6870a6d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index 1704955..fb31cc9 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.Callable;
@@ -89,8 +90,7 @@ class TransformExecutor implements Runnable {
   @Override
   public void run() {
 MetricsContainer metricsContainer = new 
MetricsContainer(transform.getFullName());
-MetricsEnvironment.setMetricsContainer(metricsContainer);
-try {
+try (Closeable metricsScope = 
MetricsEnvironment.scopedMetricsContainer(metricsContainer)) {
   Collection enforcements = new ArrayList<>();
   for (ModelEnforcementFactory enforcementFactory : modelEnforcements) {
 ModelEnforcement enforcement = 
enforcementFactory.forBundle(inputBundle, transform);
@@ -117,7 +117,6 @@ class TransformExecutor implements Runnable {
   // Report the physical metrics from the end of this step.
   context.getMetrics().commitPhysical(inputBundle, 
metricsContainer.getCumulative());
 
-  MetricsEnvironment.unsetMetricsContainer();
   transformEvaluationState.complete(this);
 }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e6870a6d/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
index ef2660a8..7c06cbf 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.metrics;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.slf4j.Logger;
@@ -29,11 +31,13 @@ import org.slf4j.LoggerFactory;
  * returned objects to create and modify metrics.
  *
  * The runner should create {@link MetricsContainer} for each context in 
which metrics are
- * reported (by step and name) and call {@link #setMetricsContainer} before 
invoking any code that
- * may update metrics within that step.
+ * reported (by step and name) and call {@link #setCurrentContainer} before 
invoking any code that
+ * may update metrics within that step. It should call {@link 
#setCurrentContainer} again to restore
+ * the previous container.
  *
- * The runner should call {@link #unsetMetricsContainer} (or {@link 
#setMetricsContainer} back to
- * the previous value) when exiting code that set the metrics container.
+ * Alternatively, the runner can use {@link 
#scopedMetricsContainer(MetricsContainer)} to set the
+ * container for the current thread and get a {@link Closeable} that will 
restore the previous
+ * container when closed.
  */
 public class MetricsEnvironment {
 
@@ -45,15 +49,20 @@ public class MetricsEnvironment {
   private static final ThreadLocal 

[1/2] incubator-beam git commit: This closes #1417

2016-11-22 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 6ec45f7e7 -> b41789e9c


This closes #1417


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b41789e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b41789e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b41789e9

Branch: refs/heads/master
Commit: b41789e9c5ac8691243c796968b00a65cc11dd39
Parents: 6ec45f7 e6870a6
Author: Thomas Groh 
Authored: Tue Nov 22 12:27:41 2016 -0800
Committer: Thomas Groh 
Committed: Tue Nov 22 12:27:41 2016 -0800

--
 .../beam/runners/direct/TransformExecutor.java  |  5 +-
 .../beam/sdk/metrics/MetricsEnvironment.java| 60 +++-
 .../sdk/metrics/MetricsEnvironmentTest.java |  8 +--
 .../apache/beam/sdk/metrics/MetricsTest.java|  6 +-
 4 files changed, 56 insertions(+), 23 deletions(-)
--




[2/2] incubator-beam git commit: Support @ValidatesRunner(RunnableOnService) in Python [1/2]

2016-11-22 Thread tgroh
Support @ValidatesRunner(RunnableOnService) in Python [1/2]


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cc706608
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cc706608
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cc706608

Branch: refs/heads/python-sdk
Commit: cc706608b281c3beeebd2487084946c06bc83f30
Parents: 9060f83
Author: Mark Liu 
Authored: Thu Nov 17 09:53:01 2016 -0800
Committer: Thomas Groh 
Committed: Tue Nov 22 10:11:22 2016 -0800

--
 sdks/python/setup.py   |  4 
 sdks/python/test_config.py | 44 +
 2 files changed, 48 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cc706608/sdks/python/setup.py
--
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 1299bbf..525f59c 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -137,4 +137,8 @@ setuptools.setup(
 ],
 license='Apache License, Version 2.0',
 keywords=PACKAGE_KEYWORDS,
+entry_points={
+'nose.plugins.0.10': [
+'beam_test_plugin = test_config:BeamTestPlugin'
+]}
 )

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cc706608/sdks/python/test_config.py
--
diff --git a/sdks/python/test_config.py b/sdks/python/test_config.py
new file mode 100644
index 000..bde2795
--- /dev/null
+++ b/sdks/python/test_config.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Test configurations for nose
+
+This module contains nose plugin hooks that configures Beam tests which
+includes ValidatesRunner test and E2E integration test.
+
+"""
+
+from nose.plugins import Plugin
+
+
+class BeamTestPlugin(Plugin):
+  """A nose plugin for Beam testing that registers command line options
+
+  This plugin is registered through setuptools in entry_points.
+  """
+
+  def options(self, parser, env):
+"""Add '--test-pipeline-options' to command line option to avoid
+unrecognized option error thrown by nose.
+
+The value of this option will be processed by TestPipeline and used to
+build customized pipeline for ValidatesRunner tests.
+"""
+parser.add_option('--test-pipeline-options',
+  action='store',
+  type=str,
+  help='providing pipeline options to run tests on runner')



[1/2] incubator-beam git commit: This closes #1376

2016-11-22 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 9060f8395 -> 28bfd9090


This closes #1376


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/28bfd909
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/28bfd909
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/28bfd909

Branch: refs/heads/python-sdk
Commit: 28bfd9090b0ce33c3da0bab0220fcc1ef8a72b4b
Parents: 9060f83 cc70660
Author: Thomas Groh 
Authored: Tue Nov 22 10:11:22 2016 -0800
Committer: Thomas Groh 
Committed: Tue Nov 22 10:11:22 2016 -0800

--
 sdks/python/setup.py   |  4 
 sdks/python/test_config.py | 44 +
 2 files changed, 48 insertions(+)
--




[1/2] incubator-beam git commit: Update StarterPipeline

2016-11-22 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master e53d6d458 -> c2dc38639


Update StarterPipeline

Convert StarterPipeline ParDo to MapElements.

Use the new DoFn for non-outputting transforms.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c80554b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c80554b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c80554b8

Branch: refs/heads/master
Commit: c80554b83426a585c762143e0ad533a73c2c3f0f
Parents: e53d6d4
Author: Scott Wegner 
Authored: Mon Nov 21 16:33:07 2016 -0800
Committer: Thomas Groh 
Committed: Tue Nov 22 10:09:12 2016 -0800

--
 .../src/main/java/StarterPipeline.java| 18 ++
 .../src/main/java/it/pkg/StarterPipeline.java | 18 ++
 2 files changed, 20 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c80554b8/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
--
diff --git 
a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
 
b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
index 0b21aa6..d6afdec 100644
--- 
a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
+++ 
b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
@@ -20,13 +20,15 @@ package ${package};
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A starter example for writing Google Cloud Dataflow programs.
+ * A starter example for writing Beam programs.
  *
  * The example takes two strings, converts them to their upper-case
  * representation and logs them.
@@ -39,7 +41,7 @@ import org.slf4j.LoggerFactory;
  * Platform, you should specify the following command-line options:
  *   --project=
  *   --stagingLocation=
- *   --runner=BlockingDataflowRunner
+ *   --runner=DataflowRunner
  */
 public class StarterPipeline {
   private static final Logger LOG = 
LoggerFactory.getLogger(StarterPipeline.class);
@@ -49,14 +51,14 @@ public class StarterPipeline {
 PipelineOptionsFactory.fromArgs(args).withValidation().create());
 
 p.apply(Create.of("Hello", "World"))
-.apply(ParDo.of(new OldDoFn() {
+.apply(MapElements.via(new SimpleFunction() {
   @Override
-  public void processElement(ProcessContext c) {
-c.output(c.element().toUpperCase());
+  public String apply(String input) {
+return input.toUpperCase();
   }
 }))
-.apply(ParDo.of(new OldDoFn() {
-  @Override
+.apply(ParDo.of(new DoFn() {
+  @ProcessElement
   public void processElement(ProcessContext c)  {
 LOG.info(c.element());
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c80554b8/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
--
diff --git 
a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
 
b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
index b332442..4ae92e8 100644
--- 
a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
+++ 
b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
@@ -20,13 +20,15 @@ package it.pkg;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A starter example for 

[2/2] incubator-beam git commit: This closes #1406

2016-11-22 Thread tgroh
This closes #1406


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c2dc3863
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c2dc3863
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c2dc3863

Branch: refs/heads/master
Commit: c2dc38639ddd37c393d3bd9f341c2ec032a1096c
Parents: e53d6d4 c80554b
Author: Thomas Groh 
Authored: Tue Nov 22 10:10:02 2016 -0800
Committer: Thomas Groh 
Committed: Tue Nov 22 10:10:02 2016 -0800

--
 .../src/main/java/StarterPipeline.java| 18 ++
 .../src/main/java/it/pkg/StarterPipeline.java | 18 ++
 2 files changed, 20 insertions(+), 16 deletions(-)
--




[1/2] incubator-beam git commit: Block earlier in BoundedReadEvaluatorFactoryTest

2016-11-17 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 1543ea952 -> 212fec4eb


Block earlier in BoundedReadEvaluatorFactoryTest

This ensures that the reader doesn't claim the split point, which in
turn ensures the dynamic split request will not be refused by the
OffsetBasedSource. If the split is refused, ...ProducesDynamicSplits
flakes, as if the reader is faster than the split thread it can run past
the point at which the splitter thread will attempt to split the source,
which causes the reader to read all of the elements.

Sleep within TestReader#advanceImpl if the reader is being dynamically
split, to ensure that the dynamic split fully completes before
continuing a call to advance.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4a8d32e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4a8d32e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4a8d32e5

Branch: refs/heads/master
Commit: 4a8d32e5d3726b851329d507a8d0392cc03f6e85
Parents: 1543ea9
Author: Thomas Groh 
Authored: Thu Nov 17 10:56:49 2016 -0800
Committer: Thomas Groh 
Committed: Thu Nov 17 14:37:47 2016 -0800

--
 .../direct/BoundedReadEvaluatorFactoryTest.java | 26 +++-
 1 file changed, 14 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4a8d32e5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
--
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index 9d8503a..e956c34 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.OffsetBasedSource;
 import org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader;
 import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.SourceTestUtils;
@@ -142,9 +143,7 @@ public class BoundedReadEvaluatorFactoryTest {
 TestPipeline.create().apply(Read.from(new 
TestSource<>(VarLongCoder.of(), 5, elems)));
 AppliedPTransform transform = 
read.getProducingTransformInternal();
 Collection unreadInputs =
-new BoundedReadEvaluatorFactory.InputProvider(context)
-.getInitialInputs(transform,
-1);
+new 
BoundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(transform, 
1);
 
 Collection outputs = new ArrayList<>();
 int numIterations = 0;
@@ -155,8 +154,7 @@ public class BoundedReadEvaluatorFactoryTest {
 
   Collection newUnreadInputs = new ArrayList<>();
   for (CommittedBundle shardBundle : unreadInputs) {
-TransformEvaluator evaluator =
-factory.forApplication(transform, null);
+TransformEvaluator evaluator = factory.forApplication(transform, 
null);
 for (WindowedValue shard : shardBundle.getElements()) {
   evaluator.processElement((WindowedValue) shard);
 }
@@ -178,8 +176,6 @@ public class BoundedReadEvaluatorFactoryTest {
   unreadInputs = newUnreadInputs;
 }
 
-// We produced at least one split before we read 1000 elements, as we will 
attempt to split as
-// quickly as possible.
 assertThat(numIterations, greaterThan(1));
 WindowedValue[] expectedValues = new WindowedValue[numElements];
 for (long i = 0L; i < numElements; i++) {
@@ -343,7 +339,7 @@ public class BoundedReadEvaluatorFactoryTest {
 private static boolean readerClosed;
 private final Coder coder;
 private final T[] elems;
-private final int awaitSplitIndex;
+private final int firstSplitIndex;
 
 private transient CountDownLatch subrangesCompleted;
 
@@ -351,11 +347,11 @@ public class BoundedReadEvaluatorFactoryTest {
   this(coder, elems.length, elems);
 }
 
-public TestSource(Coder coder, int awaitSplitIndex, T... elems) {
+public TestSource(Coder coder, int firstSplitIndex, T... elems) {
   super(0L, elems.length, 1L);
   this.elems = elems;
   this.coder = coder;
-  this.awaitSplitIndex = awaitSplitIndex;
+  this.firstSplitIndex = firstSplitIndex;
   readerClosed = false;
 
   

[2/2] incubator-beam git commit: This closes #1377

2016-11-17 Thread tgroh
This closes #1377


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/212fec4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/212fec4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/212fec4e

Branch: refs/heads/master
Commit: 212fec4eba053fe603238a13ca6e0d9cb6aef697
Parents: 1543ea9 4a8d32e
Author: Thomas Groh 
Authored: Thu Nov 17 14:37:48 2016 -0800
Committer: Thomas Groh 
Committed: Thu Nov 17 14:37:48 2016 -0800

--
 .../direct/BoundedReadEvaluatorFactoryTest.java | 26 +++-
 1 file changed, 14 insertions(+), 12 deletions(-)
--




[GitHub] incubator-beam pull request #1377: [BEAM-999] Block earlier in BoundedReadEv...

2016-11-17 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1377

[BEAM-999] Block earlier in BoundedReadEvaluatorFactoryTest

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

This ensures that the reader doesn't claim the split point, which in
turn ensures the dynamic split request will not be refused by the
OffsetBasedSource. If the split is refused, ...ProducesDynamicSplits
flakes, as if the reader is faster than the split thread it can run past
the point at which the splitter thread will attempt to split the source,
which causes the reader to read all of the elements.

Spin within TestReader#advanceImpl if the reader is being dynamically
split, to ensure that the dynamic split fully completes before
continuing a call to advance.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam dynamic_splitting

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1377.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1377


commit bb793e3f578eadbe1d1d66e42c656a6a3f81f18f
Author: Thomas Groh <tg...@google.com>
Date:   2016-11-17T18:56:49Z

Block earlier in BoundedReadEvaluatorFactoryTest

This ensures that the reader doesn't claim the split point, which in
turn ensures the dynamic split request will not be refused by the
OffsetBasedSource. If the split is refused, ...ProducesDynamicSplits
flakes, as if the reader is faster than the split thread it can run past
the point at which the splitter thread will attempt to split the source,
which causes the reader to read all of the elements.

Sleep within TestReader#advanceImpl if the reader is being dynamically
split, to ensure that the dynamic split fully completes before
continuing a call to advance.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: This closes #1373

2016-11-16 Thread tgroh
This closes #1373


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/abd9fb3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/abd9fb3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/abd9fb3d

Branch: refs/heads/master
Commit: abd9fb3d7ced120c4c307df601673e3a548cfa87
Parents: 15e93c5 67ce531
Author: Thomas Groh 
Authored: Wed Nov 16 18:21:58 2016 -0800
Committer: Thomas Groh 
Committed: Wed Nov 16 18:21:58 2016 -0800

--
 .../direct/BoundedReadEvaluatorFactory.java | 28 -
 .../beam/runners/direct/DirectRunner.java   |  7 +++--
 .../beam/runners/direct/EmptyInputProvider.java | 22 +++---
 .../direct/ExecutorServiceParallelExecutor.java | 12 
 .../beam/runners/direct/RootInputProvider.java  | 16 ++
 .../runners/direct/RootProviderRegistry.java| 19 ++--
 .../direct/TestStreamEvaluatorFactory.java  | 23 +++---
 .../direct/TransformEvaluatorRegistry.java  |  5 +--
 .../direct/UnboundedReadEvaluatorFactory.java   | 32 +++-
 9 files changed, 85 insertions(+), 79 deletions(-)
--




[1/2] incubator-beam git commit: Properly apply Transform Overrides in the Direct Runner

2016-11-16 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 15e93c58e -> abd9fb3d7


Properly apply Transform Overrides in the Direct Runner

Previously the direct runner would use the transform override to
.apply(), but would keep the original transform in the pipeline,
e.g. it would use the original transform to look up an evaluator.

The current commit makes it use the node generated by applying the
override as a nested node within the graph (including, potentially
replacing it further recursively).

Additionally, makes InputProvider type-safe.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/67ce5313
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/67ce5313
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/67ce5313

Branch: refs/heads/master
Commit: 67ce53139b51617708cdf037d93c5195608accc5
Parents: 15e93c5
Author: Eugene Kirpichov 
Authored: Wed Nov 16 15:40:08 2016 -0800
Committer: Thomas Groh 
Committed: Wed Nov 16 18:20:36 2016 -0800

--
 .../direct/BoundedReadEvaluatorFactory.java | 28 -
 .../beam/runners/direct/DirectRunner.java   |  7 +++--
 .../beam/runners/direct/EmptyInputProvider.java | 22 +++---
 .../direct/ExecutorServiceParallelExecutor.java | 12 
 .../beam/runners/direct/RootInputProvider.java  | 16 ++
 .../runners/direct/RootProviderRegistry.java| 19 ++--
 .../direct/TestStreamEvaluatorFactory.java  | 23 +++---
 .../direct/TransformEvaluatorRegistry.java  |  5 +--
 .../direct/UnboundedReadEvaluatorFactory.java   | 32 +++-
 9 files changed, 85 insertions(+), 79 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/67ce5313/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 8becb91..66c55cd 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -35,6 +35,7 @@ import 
org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.StepTransformResult.Builder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -174,7 +175,8 @@ final class BoundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
 abstract BoundedSource getSource();
   }
 
-  static class InputProvider implements RootInputProvider {
+  static class InputProvider
+  implements RootInputProvider {
 private final EvaluationContext evaluationContext;
 
 InputProvider(EvaluationContext evaluationContext) {
@@ -182,27 +184,21 @@ final class BoundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
 }
 
 @Override
-public Collection getInitialInputs(
-AppliedPTransform transform, int targetParallelism) throws 
Exception {
-  return createInitialSplits((AppliedPTransform) transform, 
targetParallelism);
-}
-
-private 
-Collection> 
createInitialSplits(
-AppliedPTransform transform, int 
targetParallelism)
-throws Exception {
-  BoundedSource source = transform.getTransform().getSource();
+public Collection> getInitialInputs(
+AppliedPTransform transform, 
int targetParallelism)
+throws Exception {
+  BoundedSource source = transform.getTransform().getSource();
   PipelineOptions options = evaluationContext.getPipelineOptions();
   long estimatedBytes = source.getEstimatedSizeBytes(options);
   long bytesPerBundle = estimatedBytes / targetParallelism;
-  List> bundles =
+  List> bundles =
   source.splitIntoBundles(bytesPerBundle, options);
-  ImmutableList.Builder> 
shards =
+  ImmutableList.Builder> shards =
   ImmutableList.builder();
-  for (BoundedSource bundle : bundles) {
-

[4/4] incubator-beam git commit: Reduce incidence of Namespace StringKey comparisons

2016-11-16 Thread tgroh
Reduce incidence of Namespace StringKey comparisons

If the Namespace of a TimerData reports itself as being equal to the
other namespace, immediately return 0 rather than generating the string
keys and comparing them.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3e6a4f4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3e6a4f4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3e6a4f4a

Branch: refs/heads/master
Commit: 3e6a4f4a49344871430d2711e934b0493c17499f
Parents: c695ef4
Author: Thomas Groh 
Authored: Tue Nov 8 14:18:58 2016 -0800
Committer: Thomas Groh 
Committed: Wed Nov 16 13:22:41 2016 -0800

--
 .../java/org/apache/beam/sdk/util/TimerInternals.java  | 13 +
 1 file changed, 9 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3e6a4f4a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
index 743f3f7..5d4a72d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
@@ -230,12 +230,17 @@ public interface TimerInternals {
  * arbitrary.
  */
 @Override
-public int compareTo(TimerData o) {
+public int compareTo(TimerData that) {
+  if (this.equals(that)) {
+return 0;
+  }
   ComparisonChain chain =
-  ComparisonChain.start().compare(timestamp, 
o.getTimestamp()).compare(domain, o.domain);
-  if (chain.result() == 0) {
+  ComparisonChain.start()
+  .compare(this.timestamp, that.getTimestamp())
+  .compare(this.domain, that.domain);
+  if (chain.result() == 0 && !this.namespace.equals(that.namespace)) {
 // Obtaining the stringKey may be expensive; only do so if required
-chain = chain.compare(namespace.stringKey(), o.namespace.stringKey());
+chain = chain.compare(namespace.stringKey(), 
that.namespace.stringKey());
   }
   return chain.result();
 }



[1/4] incubator-beam git commit: This closes #1319

2016-11-16 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master c695ef48b -> 15e93c58e


This closes #1319


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/15e93c58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/15e93c58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/15e93c58

Branch: refs/heads/master
Commit: 15e93c58e933ef913c89a9654ceb567299a2fe5b
Parents: c695ef4 bf4c504
Author: Thomas Groh 
Authored: Wed Nov 16 13:22:41 2016 -0800
Committer: Thomas Groh 
Committed: Wed Nov 16 13:22:41 2016 -0800

--
 .../beam/runners/direct/DirectRunner.java   |  3 +-
 .../beam/runners/direct/TransformExecutor.java  | 23 ---
 .../runners/direct/TransformExecutorTest.java   | 43 
 .../apache/beam/sdk/util/TimerInternals.java| 13 --
 4 files changed, 11 insertions(+), 71 deletions(-)
--




[3/4] incubator-beam git commit: Reduce Visibility of PCollectionViewWriter

2016-11-16 Thread tgroh
Reduce Visibility of PCollectionViewWriter

This is an internal implementation detail of the DirectRunner and should
not be exposed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bf4c504d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bf4c504d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bf4c504d

Branch: refs/heads/master
Commit: bf4c504dffa05c8eccc5c2ae16dffdd53418a468
Parents: 5ba4d18
Author: Thomas Groh 
Authored: Wed Nov 16 10:37:42 2016 -0800
Committer: Thomas Groh 
Committed: Wed Nov 16 13:22:41 2016 -0800

--
 .../main/java/org/apache/beam/runners/direct/DirectRunner.java| 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bf4c504d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index c9a7864..04c8eb6 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -183,10 +183,11 @@ public class DirectRunner
   /**
* A {@link PCollectionViewWriter} is responsible for writing contents of a 
{@link PCollection} to
* a storage mechanism that can be read from while constructing a {@link 
PCollectionView}.
+   *
* @param  the type of elements the input {@link PCollection} 
contains.
* @param  the type of the PCollectionView this writer writes to.
*/
-  public interface PCollectionViewWriter {
+  interface PCollectionViewWriter {
 void add(Iterable values);
   }
 



[2/4] incubator-beam git commit: Remove unused Thread variable in TransformExecutor

2016-11-16 Thread tgroh
Remove unused Thread variable in TransformExecutor


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5ba4d181
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5ba4d181
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5ba4d181

Branch: refs/heads/master
Commit: 5ba4d181d4625d43078bb0b071635d563d925277
Parents: 3e6a4f4
Author: Thomas Groh 
Authored: Tue Nov 8 14:16:23 2016 -0800
Committer: Thomas Groh 
Committed: Wed Nov 16 13:22:41 2016 -0800

--
 .../beam/runners/direct/TransformExecutor.java  | 23 ---
 .../runners/direct/TransformExecutorTest.java   | 43 
 2 files changed, 66 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5ba4d181/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index c4002b5..1704955 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -17,13 +17,9 @@
  */
 package org.apache.beam.runners.direct;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.metrics.MetricUpdates;
 import org.apache.beam.sdk.metrics.MetricsContainer;
@@ -70,8 +66,6 @@ class TransformExecutor implements Runnable {
   private final TransformExecutorService transformEvaluationState;
   private final EvaluationContext context;
 
-  private final AtomicReference thread;
-
   private TransformExecutor(
   EvaluationContext context,
   TransformEvaluatorFactory factory,
@@ -90,20 +84,12 @@ class TransformExecutor implements Runnable {
 
 this.transformEvaluationState = transformEvaluationState;
 this.context = context;
-this.thread = new AtomicReference<>();
   }
 
   @Override
   public void run() {
 MetricsContainer metricsContainer = new 
MetricsContainer(transform.getFullName());
 MetricsEnvironment.setMetricsContainer(metricsContainer);
-checkState(
-thread.compareAndSet(null, Thread.currentThread()),
-"Tried to execute %s for %s on thread %s, but is already executing on 
thread %s",
-TransformExecutor.class.getSimpleName(),
-transform.getFullName(),
-Thread.currentThread(),
-thread.get());
 try {
   Collection enforcements = new ArrayList<>();
   for (ModelEnforcementFactory enforcementFactory : modelEnforcements) {
@@ -186,13 +172,4 @@ class TransformExecutor implements Runnable {
 }
 return result;
   }
-
-  /**
-   * If this {@link TransformExecutor} is currently executing, return the 
thread it is executing in.
-   * Otherwise, return null.
-   */
-  @Nullable
-  public Thread getThread() {
-return thread.get();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5ba4d181/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
--
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 32f874d..0b7b882 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isA;
-import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
@@ -277,48 +276,6 @@ public class TransformExecutorTest {
   }
 
   @Test
-  public void duringCallGetThreadIsNonNull() throws Exception {
-final TransformResult result =
-
StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
-final CountDownLatch testLatch = new CountDownLatch(1);
-final CountDownLatch evaluatorLatch = new 

[1/2] incubator-beam git commit: Remove Pipeline#getFullNameForTesting

2016-11-16 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 938ac91a9 -> c695ef48b


Remove Pipeline#getFullNameForTesting


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1cdaa4e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1cdaa4e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1cdaa4e7

Branch: refs/heads/master
Commit: 1cdaa4e7beee025cdca1f1456d4271635383
Parents: 938ac91
Author: Thomas Groh 
Authored: Tue Nov 15 12:38:02 2016 -0800
Committer: Thomas Groh 
Committed: Wed Nov 16 10:55:43 2016 -0800

--
 .../main/java/org/apache/beam/sdk/Pipeline.java| 17 -
 1 file changed, 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1cdaa4e7/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
--
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 2286832..e188b35 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -17,10 +17,7 @@
  */
 package org.apache.beam.sdk;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -484,20 +481,6 @@ public class Pipeline {
   }
 
   /**
-   * @deprecated this method is no longer compatible with the design of {@link 
Pipeline},
-   * as {@link PTransform PTransforms} can be applied multiple times, with 
different names
-   * each time.
-   */
-  @Deprecated
-  public String getFullNameForTesting(PTransform transform) {
-Collection uses =
-transformApplicationsForTesting.get(transform);
-checkState(uses.size() > 0, "Unknown transform: " + transform);
-checkState(uses.size() <= 1, "Transform used multiple times: " + 
transform);
-return Iterables.getOnlyElement(uses).getFullName();
-  }
-
-  /**
* Returns a unique name for a transform with the given prefix (from
* enclosing transforms) and initial name.
*



[2/2] incubator-beam git commit: This closes #1363

2016-11-16 Thread tgroh
This closes #1363


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c695ef48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c695ef48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c695ef48

Branch: refs/heads/master
Commit: c695ef48bfef314ea964a57d0f9255f3005a295e
Parents: 938ac91 1cdaa4e
Author: Thomas Groh 
Authored: Wed Nov 16 10:55:44 2016 -0800
Committer: Thomas Groh 
Committed: Wed Nov 16 10:55:44 2016 -0800

--
 .../main/java/org/apache/beam/sdk/Pipeline.java| 17 -
 1 file changed, 17 deletions(-)
--




[1/2] incubator-beam git commit: Exercise Dynamic Splitting in the DirectRunner

2016-11-16 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master dc94dbdd7 -> 938ac91a9


Exercise Dynamic Splitting in the DirectRunner

For sources that are above a certain size, the DirectRunner will run a
Thread which will split off half of the remaining work. This exercises
the concurrent behavior for splitAtFraction and getFractionConsumed


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fc80ba54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fc80ba54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fc80ba54

Branch: refs/heads/master
Commit: fc80ba542a7c90221e836fb90a9c3ad984b51670
Parents: dc94dbd
Author: Thomas Groh 
Authored: Tue Nov 1 17:23:03 2016 -0700
Committer: Thomas Groh 
Committed: Wed Nov 16 09:38:35 2016 -0800

--
 .../direct/BoundedReadEvaluatorFactory.java |  82 -
 .../runners/direct/StepTransformResult.java |   6 +
 .../direct/BoundedReadEvaluatorFactoryTest.java | 184 +--
 3 files changed, 247 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fc80ba54/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index add1e8a..8becb91 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -18,10 +18,17 @@
 package org.apache.beam.runners.direct;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.SettableFuture;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
@@ -33,22 +40,35 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator 
TransformEvaluators}
  * for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
  */
 final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
-  private static final Logger LOG = 
LoggerFactory.getLogger(BoundedReadEvaluatorFactory.class);
+  /**
+   * The required minimum size of a source to dynamically split. Produced 
{@link TransformEvaluator
+   * TransformEvaluators} will attempt to dynamically split all sources larger 
than the minimum
+   * dynamic split size.
+   */
+  private static final long REQUIRED_DYNAMIC_SPLIT_ORIGINAL_SIZE = 0;
   private final EvaluationContext evaluationContext;
+  private final ExecutorService executor = Executors.newCachedThreadPool();
+
+  private final long minimumDynamicSplitSize;
 
   BoundedReadEvaluatorFactory(EvaluationContext evaluationContext) {
+this(evaluationContext, REQUIRED_DYNAMIC_SPLIT_ORIGINAL_SIZE);
+  }
+
+  @VisibleForTesting
+  BoundedReadEvaluatorFactory(EvaluationContext evaluationContext, long 
minimumDynamicSplitSize) {
 this.evaluationContext = evaluationContext;
+this.minimumDynamicSplitSize = minimumDynamicSplitSize;
   }
 
   @SuppressWarnings({"unchecked", "rawtypes"})
@@ -61,7 +81,8 @@ final class BoundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
 
   private  TransformEvaluator createEvaluator(
   final AppliedPTransform, ?> transform) {
-return new BoundedReadEvaluator<>(transform, evaluationContext);
+return new BoundedReadEvaluator<>(
+transform, evaluationContext, minimumDynamicSplitSize, executor);
   }
 
   @Override
@@ -82,21 +103,29 @@ final class BoundedReadEvaluatorFactory implements 

[2/2] incubator-beam git commit: This closes #1254

2016-11-16 Thread tgroh
This closes #1254


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/938ac91a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/938ac91a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/938ac91a

Branch: refs/heads/master
Commit: 938ac91a91eb886b013e9e54899a6afd07fe3d9e
Parents: dc94dbd fc80ba5
Author: Thomas Groh 
Authored: Wed Nov 16 09:38:36 2016 -0800
Committer: Thomas Groh 
Committed: Wed Nov 16 09:38:36 2016 -0800

--
 .../direct/BoundedReadEvaluatorFactory.java |  82 -
 .../runners/direct/StepTransformResult.java |   6 +
 .../direct/BoundedReadEvaluatorFactoryTest.java | 184 +--
 3 files changed, 247 insertions(+), 25 deletions(-)
--




  1   2   3   4   >