[jira] [Commented] (BEAM-469) NullableCoder optimized encoding via passthrough context

2016-12-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768525#comment-15768525
 ] 

ASF GitHub Bot commented on BEAM-469:
-

GitHub user dhalperi opened a pull request:

https://github.com/apache/incubator-beam/pull/1680

[BEAM-XXX] Make KVCoder more efficient by removing unnecessary nesting

See [BEAM-469] for more information about why this is
correct.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dhalperi/incubator-beam 
efficient-nested-coders

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1680.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1680


commit 621e8250c9535d773c4f4440a34ea0833912b51f
Author: Dan Halperin 
Date:   2016-12-21T23:37:49Z

[BEAM-XXX] Make KVCoder more efficient by removing unnecessary nesting

See [BEAM-469] for more information about why this is
correct.




> NullableCoder optimized encoding via passthrough context
> 
>
> Key: BEAM-469
> URL: https://issues.apache.org/jira/browse/BEAM-469
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Luke Cwik
>Assignee: Thomas Groh
>Priority: Trivial
>  Labels: backward-incompatible
> Fix For: 0.3.0-incubating
>
>
> NullableCoder should encode using the context given and not always use the 
> nested context. For coders which can efficiently encode in the outer context 
> such as StringUtf8Coder or ByteArrayCoder, we are forcing them to prefix 
> themselves with their length.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #1680: [BEAM-XXX] Make KVCoder more efficient by...

2016-12-21 Thread dhalperi
GitHub user dhalperi opened a pull request:

https://github.com/apache/incubator-beam/pull/1680

[BEAM-XXX] Make KVCoder more efficient by removing unnecessary nesting

See [BEAM-469] for more information about why this is
correct.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dhalperi/incubator-beam 
efficient-nested-coders

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1680.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1680


commit 621e8250c9535d773c4f4440a34ea0833912b51f
Author: Dan Halperin 
Date:   2016-12-21T23:37:49Z

[BEAM-XXX] Make KVCoder more efficient by removing unnecessary nesting

See [BEAM-469] for more information about why this is
correct.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-1201) Remove producesSortedKeys from Source

2016-12-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768483#comment-15768483
 ] 

ASF GitHub Bot commented on BEAM-1201:
--

GitHub user dhalperi opened a pull request:

https://github.com/apache/incubator-beam/pull/1679

[BEAM-1201] Remove BoundedSource.producesSortedKeys

R: @jkff

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dhalperi/incubator-beam 
remove-produces-sorted-keys

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1679.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1679


commit ee15138543f8b9926466cf4e4dc6857b3173345e
Author: Dan Halperin 
Date:   2016-12-21T23:32:38Z

[BEAM-1201] Remove BoundedSource.producesSortedKeys

Unused and unclear; for more information see the linked JIRA.




> Remove producesSortedKeys from Source
> -
>
> Key: BEAM-1201
> URL: https://issues.apache.org/jira/browse/BEAM-1201
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Daniel Halperin
>Assignee: Daniel Halperin
>Priority: Minor
>  Labels: backward-incompatible
>
> This is a holdover from a precursor of the old Dataflow SDK that we just 
> failed to delete before releasing Dataflow 1.0, but we can delete before the 
> first stable release of Beam.
> This function has never been used by any runner. It does not mean anything 
> obvious to implementors, as many sources produce {{T}}, not {{KV}} -- 
> what does it mean in the former case? (And how do you get the latter case 
> correct?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-1201) Remove producesSortedKeys from Source

2016-12-21 Thread Daniel Halperin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Halperin updated BEAM-1201:
--
Labels: backward-incompatible  (was: )

> Remove producesSortedKeys from Source
> -
>
> Key: BEAM-1201
> URL: https://issues.apache.org/jira/browse/BEAM-1201
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Daniel Halperin
>Assignee: Daniel Halperin
>Priority: Minor
>  Labels: backward-incompatible
>
> This is a holdover from a precursor of the old Dataflow SDK that we just 
> failed to delete before releasing Dataflow 1.0, but we can delete before the 
> first stable release of Beam.
> This function has never been used by any runner. It does not mean anything 
> obvious to implementors, as many sources produce {{T}}, not {{KV}} -- 
> what does it mean in the former case? (And how do you get the latter case 
> correct?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-1201) Remove producesSortedKeys from BoundedSource

2016-12-21 Thread Daniel Halperin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Halperin updated BEAM-1201:
--
Summary: Remove producesSortedKeys from BoundedSource  (was: Remove 
producesSortedKeys from Source)

> Remove producesSortedKeys from BoundedSource
> 
>
> Key: BEAM-1201
> URL: https://issues.apache.org/jira/browse/BEAM-1201
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Daniel Halperin
>Assignee: Daniel Halperin
>Priority: Minor
>  Labels: backward-incompatible
>
> This is a holdover from a precursor of the old Dataflow SDK that we just 
> failed to delete before releasing Dataflow 1.0, but we can delete before the 
> first stable release of Beam.
> This function has never been used by any runner. It does not mean anything 
> obvious to implementors, as many sources produce {{T}}, not {{KV}} -- 
> what does it mean in the former case? (And how do you get the latter case 
> correct?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #1679: [BEAM-1201] Remove BoundedSource.produces...

2016-12-21 Thread dhalperi
GitHub user dhalperi opened a pull request:

https://github.com/apache/incubator-beam/pull/1679

[BEAM-1201] Remove BoundedSource.producesSortedKeys

R: @jkff

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dhalperi/incubator-beam 
remove-produces-sorted-keys

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1679.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1679


commit ee15138543f8b9926466cf4e4dc6857b3173345e
Author: Dan Halperin 
Date:   2016-12-21T23:32:38Z

[BEAM-1201] Remove BoundedSource.producesSortedKeys

Unused and unclear; for more information see the linked JIRA.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-469) NullableCoder optimized encoding via passthrough context

2016-12-21 Thread Daniel Halperin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768476#comment-15768476
 ] 

Daniel Halperin commented on BEAM-469:
--

Sorry I missed this JIRA comment, [~mariusz89016]! A bit late, but...

Say a coder C does not have the nested context. Then we actually have the 
guarantee that no one will put later elements.

So if {{NullableCoder}} does not have the nested context, then no one will put 
more elements after whatever the {{NullableCoder}} puts. If the NC puts {{0}} 
then that's it -- the element is null. But if the NC puts {{1}}, then we know 
that all remaining bytes in the encoded string belong to the inner coder. That 
is effectively saying that the inner coder also does not need to have the 
nested context, so it does not need to write its own length.

In your example, the {{NullableCoder}} is used in an inner context. So the 
inner coder needs to also use the inner context, because there may be more 
encoded elements later.

In either case: the context of the nullable coder can be the same as the 
context of the inner coder. This is why in the patch here, we simply pass the 
NC's context down into the inner coder. All we have removed is the _additional_ 
nesting that was used.

https://patch-diff.githubusercontent.com/raw/apache/incubator-beam/pull/992.patch
 

> NullableCoder optimized encoding via passthrough context
> 
>
> Key: BEAM-469
> URL: https://issues.apache.org/jira/browse/BEAM-469
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Luke Cwik
>Assignee: Thomas Groh
>Priority: Trivial
>  Labels: backward-incompatible
> Fix For: 0.3.0-incubating
>
>
> NullableCoder should encode using the context given and not always use the 
> nested context. For coders which can efficiently encode in the outer context 
> such as StringUtf8Coder or ByteArrayCoder, we are forcing them to prefix 
> themselves with their length.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-646) Get runners out of the apply()

2016-12-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768470#comment-15768470
 ] 

ASF GitHub Bot commented on BEAM-646:
-

Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/1582


> Get runners out of the apply()
> --
>
> Key: BEAM-646
> URL: https://issues.apache.org/jira/browse/BEAM-646
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model-runner-api, sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Thomas Groh
>
> Right now, the runner intercepts calls to apply() and replaces transforms as 
> we go. This means that there is no "original" user graph. For portability and 
> misc architectural benefits, we would like to build the original graph first, 
> and have the runner override later.
> Some runners already work in this manner, but we could integrate it more 
> smoothly, with more validation, via some handy APIs on e.g. the Pipeline 
> object.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #1582: [BEAM-646] Add Parameters to finishSpecif...

2016-12-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/1582


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/3] incubator-beam git commit: Add Parameters to finishSpecifying

2016-12-21 Thread tgroh
Repository: incubator-beam
Updated Branches:
  refs/heads/master 57d9bbd79 -> 7984fe3fc


Add Parameters to finishSpecifying

Remove the need to use getProducingTransformInternal in TypedPValue.

Ensure that all nodes are finished specifying before a call to
TransformHierarchy#visit. This ensures that all nodes are fully
specified without requiring the Pipeline or Runner to do so explicitly.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/038950df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/038950df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/038950df

Branch: refs/heads/master
Commit: 038950df02fa553cbb91f57978efe125a9ebc80f
Parents: b053be4
Author: Thomas Groh 
Authored: Thu Dec 8 14:33:36 2016 -0800
Committer: Thomas Groh 
Committed: Wed Dec 21 15:26:17 2016 -0800

--
 .../translation/ParDoBoundTranslatorTest.java   | 32 
 .../beam/runners/direct/DirectGraphVisitor.java | 21 --
 .../beam/runners/direct/DirectRunner.java   |  1 -
 .../runners/direct/DirectGraphVisitorTest.java  | 32 +---
 .../direct/FlattenEvaluatorFactoryTest.java |  2 +
 .../direct/KeyedPValueTrackingVisitorTest.java  | 17 -
 .../beam/runners/spark/ForceStreamingTest.java  |  2 -
 .../main/java/org/apache/beam/sdk/Pipeline.java |  3 +
 .../beam/sdk/runners/TransformHierarchy.java| 45 ++-
 .../transforms/join/KeyedPCollectionTuple.java  | 32 
 .../java/org/apache/beam/sdk/values/PBegin.java |  5 --
 .../apache/beam/sdk/values/PCollectionList.java | 13 +---
 .../beam/sdk/values/PCollectionTuple.java   | 13 +---
 .../java/org/apache/beam/sdk/values/PInput.java |  9 ---
 .../org/apache/beam/sdk/values/POutput.java | 20 ++---
 .../beam/sdk/values/POutputValueBase.java   |  4 +-
 .../java/org/apache/beam/sdk/values/PValue.java | 15 
 .../org/apache/beam/sdk/values/PValueBase.java  |  3 +-
 .../org/apache/beam/sdk/values/TypedPValue.java | 78 +++-
 .../sdk/runners/TransformHierarchyTest.java | 34 +
 .../apache/beam/sdk/transforms/ParDoTest.java   |  7 +-
 .../apache/beam/sdk/values/TypedPValueTest.java |  7 +-
 22 files changed, 185 insertions(+), 210 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
--
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
index fa94b2a..f88a94d 100644
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
@@ -286,21 +287,22 @@ public class ParDoBoundTranslatorTest {
 Arrays.asList(sideInput1, sideInput2),
 Arrays.asList(;
 
- outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector()));
- ApexRunnerResult result = (ApexRunnerResult) pipeline.run();
-
- HashSet expected = Sets.newHashSet("processing: 3: [11, 222]",
- "processing: -42: [11, 222]", "processing: 666: [11, 222]");
- long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
- while (System.currentTimeMillis() < timeout) {
-   if (EmbeddedCollector.RESULTS.containsAll(expected)) {
- break;
-   }
-   LOG.info("Waiting for expected results.");
-   Thread.sleep(SLEEP_MILLIS);
- }
- result.cancel();
- Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS);
+outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector()));
+outputs.get(sideOutputTag).setCoder(VoidCoder.of());
+ApexRunnerResult result = (ApexRunnerResult) pipeline.run();
+
+HashSet expected = Sets.newHashSet("processing: 3: [11, 222]",
+"processing: -42: [11, 222]", "processing: 666: [11, 222]");
+long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS;
+while (System.currentTimeMillis() < timeout) {
+  if (EmbeddedCollector.RESULTS.containsAll(expected)) {
+break;
+  }
+  LOG.info("Waiting for expected 

[2/3] incubator-beam git commit: Use CountingSource in ForceStreamingTest

2016-12-21 Thread tgroh
Use CountingSource in ForceStreamingTest

Removes the requirement to have a FakeUnboundedSource, plus the read is
fully specified.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b053be46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b053be46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b053be46

Branch: refs/heads/master
Commit: b053be460c2e6ff486faed1b1a0996af63f93db2
Parents: 57d9bbd
Author: Thomas Groh 
Authored: Tue Dec 20 14:23:21 2016 -0800
Committer: Thomas Groh 
Committed: Wed Dec 21 15:26:17 2016 -0800

--
 .../beam/runners/spark/ForceStreamingTest.java  | 39 +---
 1 file changed, 2 insertions(+), 37 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b053be46/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
--
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
index eb17eea..1b2ff08 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java
@@ -23,10 +23,9 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import java.io.IOException;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
+import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
@@ -59,7 +58,7 @@ public class ForceStreamingTest {
 // apply the BoundedReadFromUnboundedSource.
 @SuppressWarnings("unchecked")
 BoundedReadFromUnboundedSource boundedRead =
-Read.from(new FakeUnboundedSource()).withMaxNumRecords(-1);
+Read.from(CountingSource.unbounded()).withMaxNumRecords(-1);
 //noinspection unchecked
 pipeline.apply(boundedRead);
 
@@ -86,38 +85,4 @@ public class ForceStreamingTest {
 }
 
   }
-
-  /**
-   * A fake {@link UnboundedSource} to satisfy the compiler.
-   */
-  private static class FakeUnboundedSource extends UnboundedSource {
-
-@Override
-public List generateInitialSplits(
-int desiredNumSplits,
-PipelineOptions options) throws Exception {
-  return null;
-}
-
-@Override
-public UnboundedReader createReader(
-PipelineOptions options,
-CheckpointMark checkpointMark) throws IOException {
-  return null;
-}
-
-@Override
-public Coder getCheckpointMarkCoder() {
-  return null;
-}
-
-@Override
-public void validate() { }
-
-@Override
-public Coder getDefaultOutputCoder() {
-  return null;
-}
-  }
-
 }



[3/3] incubator-beam git commit: This closes #1582

2016-12-21 Thread tgroh
This closes #1582


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7984fe3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7984fe3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7984fe3f

Branch: refs/heads/master
Commit: 7984fe3fc20160d2286433434190f35658aef158
Parents: 57d9bbd 038950d
Author: Thomas Groh 
Authored: Wed Dec 21 15:26:18 2016 -0800
Committer: Thomas Groh 
Committed: Wed Dec 21 15:26:18 2016 -0800

--
 .../translation/ParDoBoundTranslatorTest.java   | 32 
 .../beam/runners/direct/DirectGraphVisitor.java | 21 --
 .../beam/runners/direct/DirectRunner.java   |  1 -
 .../runners/direct/DirectGraphVisitorTest.java  | 32 +---
 .../direct/FlattenEvaluatorFactoryTest.java |  2 +
 .../direct/KeyedPValueTrackingVisitorTest.java  | 17 -
 .../beam/runners/spark/ForceStreamingTest.java  | 41 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java |  3 +
 .../beam/sdk/runners/TransformHierarchy.java| 45 ++-
 .../transforms/join/KeyedPCollectionTuple.java  | 32 
 .../java/org/apache/beam/sdk/values/PBegin.java |  5 --
 .../apache/beam/sdk/values/PCollectionList.java | 13 +---
 .../beam/sdk/values/PCollectionTuple.java   | 13 +---
 .../java/org/apache/beam/sdk/values/PInput.java |  9 ---
 .../org/apache/beam/sdk/values/POutput.java | 20 ++---
 .../beam/sdk/values/POutputValueBase.java   |  4 +-
 .../java/org/apache/beam/sdk/values/PValue.java | 15 
 .../org/apache/beam/sdk/values/PValueBase.java  |  3 +-
 .../org/apache/beam/sdk/values/TypedPValue.java | 78 +++-
 .../sdk/runners/TransformHierarchyTest.java | 34 +
 .../apache/beam/sdk/transforms/ParDoTest.java   |  7 +-
 .../apache/beam/sdk/values/TypedPValueTest.java |  7 +-
 22 files changed, 187 insertions(+), 247 deletions(-)
--




[jira] [Commented] (BEAM-1112) Python E2E Integration Test Framework - Batch Only

2016-12-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768461#comment-15768461
 ] 

ASF GitHub Bot commented on BEAM-1112:
--

Github user markflyhigh closed the pull request at:

https://github.com/apache/incubator-beam/pull/1639


> Python E2E Integration Test Framework - Batch Only
> --
>
> Key: BEAM-1112
> URL: https://issues.apache.org/jira/browse/BEAM-1112
> Project: Beam
>  Issue Type: Task
>  Components: sdk-py, testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>
> Parity with Java. 
> Build e2e integration test framework that can configure and run batch 
> pipeline with specified test runner, wait for pipeline execution and verify 
> results with given verifiers in the end.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-1112) Python E2E Integration Test Framework - Batch Only

2016-12-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768462#comment-15768462
 ] 

ASF GitHub Bot commented on BEAM-1112:
--

GitHub user markflyhigh reopened a pull request:

https://github.com/apache/incubator-beam/pull/1639

[BEAM-1112] Python E2E Test Framework And Wordcount E2E Test

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

 - E2e test framework that supports TestRunner start and verify pipeline 
job.
   - add `TestOptions` which defined `on_success_matcher` that is used to 
verify state/output of pipeline job.
   - validate `on_success_matcher` before pipeline execution to make sure 
it's unpicklable to a subclass of BaseMatcher.
   - create a `TestDataflowRunner` which provide functionalities of 
`DataflowRunner` plus result verification.
   - provide a test verifier `PipelineStateMatcher` that can verify 
pipeline job finished in DONE or not.
 - Add wordcount_it (it = integration test) that build e2e test based on 
existing wordcount pipeline.
   - include wordcount_it to nose collector, so that wordcount_it can be 
collected and run by nose.
   - skip ITs when running unit tests from tox in precommit and postcommit.

Current changes will not change behavior of existing pre/postcommit.
Test is done by running `tox -e py27 -c sdks/python/tox.ini` for unit test 
and running wordcount_it with `TestDataflowRunner` on service 
([link](https://pantheon.corp.google.com/dataflow/job/2016-12-15_17_36_16-3857167705491723621?project=google.com:clouddfe)).

TODO:
 - Output data verifier that verify pipeline output that stores in 
filesystem.
 - Add wordcount_it to precommit and replace existing wordcount execution 
command in postcommit with a better structured nose command.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/markflyhigh/incubator-beam e2e-testrunner

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1639.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1639






> Python E2E Integration Test Framework - Batch Only
> --
>
> Key: BEAM-1112
> URL: https://issues.apache.org/jira/browse/BEAM-1112
> Project: Beam
>  Issue Type: Task
>  Components: sdk-py, testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>
> Parity with Java. 
> Build e2e integration test framework that can configure and run batch 
> pipeline with specified test runner, wait for pipeline execution and verify 
> results with given verifiers in the end.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #1639: [BEAM-1112] Python E2E Test Framework And...

2016-12-21 Thread markflyhigh
GitHub user markflyhigh reopened a pull request:

https://github.com/apache/incubator-beam/pull/1639

[BEAM-1112] Python E2E Test Framework And Wordcount E2E Test

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

 - E2e test framework that supports TestRunner start and verify pipeline 
job.
   - add `TestOptions` which defined `on_success_matcher` that is used to 
verify state/output of pipeline job.
   - validate `on_success_matcher` before pipeline execution to make sure 
it's unpicklable to a subclass of BaseMatcher.
   - create a `TestDataflowRunner` which provide functionalities of 
`DataflowRunner` plus result verification.
   - provide a test verifier `PipelineStateMatcher` that can verify 
pipeline job finished in DONE or not.
 - Add wordcount_it (it = integration test) that build e2e test based on 
existing wordcount pipeline.
   - include wordcount_it to nose collector, so that wordcount_it can be 
collected and run by nose.
   - skip ITs when running unit tests from tox in precommit and postcommit.

Current changes will not change behavior of existing pre/postcommit.
Test is done by running `tox -e py27 -c sdks/python/tox.ini` for unit test 
and running wordcount_it with `TestDataflowRunner` on service 
([link](https://pantheon.corp.google.com/dataflow/job/2016-12-15_17_36_16-3857167705491723621?project=google.com:clouddfe)).

TODO:
 - Output data verifier that verify pipeline output that stores in 
filesystem.
 - Add wordcount_it to precommit and replace existing wordcount execution 
command in postcommit with a better structured nose command.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/markflyhigh/incubator-beam e2e-testrunner

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1639.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1639






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1639: [BEAM-1112] Python E2E Test Framework And...

2016-12-21 Thread markflyhigh
Github user markflyhigh closed the pull request at:

https://github.com/apache/incubator-beam/pull/1639


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: Add support for date partitioned table names

2016-12-21 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk bb09c07b6 -> 409d067b3


Add support for date partitioned table names

These names have the format "tablename$mmdd".  Previously the dollar
sign caused this to be deemed invalid.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1af871a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1af871a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1af871a

Branch: refs/heads/python-sdk
Commit: a1af871a0c8c92a6d84f2e9950615f7737118d7e
Parents: bb09c07
Author: Kevin Graney 
Authored: Tue Dec 6 15:09:42 2016 -0500
Committer: Robert Bradshaw 
Committed: Wed Dec 21 15:16:45 2016 -0800

--
 sdks/python/apache_beam/io/bigquery.py  | 6 --
 sdks/python/apache_beam/io/bigquery_test.py | 8 
 2 files changed, 12 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1af871a/sdks/python/apache_beam/io/bigquery.py
--
diff --git a/sdks/python/apache_beam/io/bigquery.py 
b/sdks/python/apache_beam/io/bigquery.py
index ce75e10..2059de4 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -275,7 +275,9 @@ def _parse_table_reference(table, dataset=None, 
project=None):
   then the table argument must contain the entire table reference:
   'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'. This argument can be a
   bigquery.TableReference instance in which case dataset and project are
-  ignored and the reference is returned as a result.
+  ignored and the reference is returned as a result.  Additionally, for 
date
+  partitioned tables, appending '$mmdd' to the table name is supported,
+  e.g. 'DATASET.TABLE$mmdd'.
 dataset: The ID of the dataset containing this table or null if the table
   reference is specified entirely by the table argument.
 project: The ID of the project containing this table or null if the table
@@ -300,7 +302,7 @@ def _parse_table_reference(table, dataset=None, 
project=None):
   # table name.
   if dataset is None:
 match = re.match(
-r'^((?P.+):)?(?P\w+)\.(?P\w+)$', table)
+r'^((?P.+):)?(?P\w+)\.(?P[\w\$]+)$', table)
 if not match:
   raise ValueError(
   'Expected a table reference (PROJECT:DATASET.TABLE or '

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1af871a/sdks/python/apache_beam/io/bigquery_test.py
--
diff --git a/sdks/python/apache_beam/io/bigquery_test.py 
b/sdks/python/apache_beam/io/bigquery_test.py
index a2cf947..f6f9363 100644
--- a/sdks/python/apache_beam/io/bigquery_test.py
+++ b/sdks/python/apache_beam/io/bigquery_test.py
@@ -208,6 +208,14 @@ class TestBigQuerySource(unittest.TestCase):
 self.assertEqual(source.query, 'my_query')
 self.assertIsNone(source.table_reference)
 
+  def test_date_partitioned_table_name(self):
+source = beam.io.BigQuerySource('dataset.table$20030102', validate=True)
+dd = DisplayData.create_from(source)
+expected_items = [
+DisplayDataItemMatcher('validation', True),
+DisplayDataItemMatcher('table', 'dataset.table$20030102')]
+hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
 
 class TestBigQuerySink(unittest.TestCase):
 



[2/2] incubator-beam git commit: Closes #1534

2016-12-21 Thread robertwb
Closes #1534


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/409d067b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/409d067b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/409d067b

Branch: refs/heads/python-sdk
Commit: 409d067b36036981e330a055b652bb74a93f4ca2
Parents: bb09c07 a1af871
Author: Robert Bradshaw 
Authored: Wed Dec 21 15:16:46 2016 -0800
Committer: Robert Bradshaw 
Committed: Wed Dec 21 15:16:46 2016 -0800

--
 sdks/python/apache_beam/io/bigquery.py  | 6 --
 sdks/python/apache_beam/io/bigquery_test.py | 8 
 2 files changed, 12 insertions(+), 2 deletions(-)
--




[1/2] incubator-beam git commit: Fixing inconsistencies in PipelineOptions

2016-12-21 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 3454d691f -> bb09c07b6


Fixing inconsistencies in PipelineOptions

The following options have changed:

* job_name - Default is 'beamapp-username-date-microseconds'. Test was added.
* staging_location and temp_location - staging_location was the default of
  temp_location. Now it's the other way around, and the tests reflect that.
* machine_type alias of worker_machine_type has been removed.
* disk_type alias of worker_disk_type has been removed.
* disk_source_image option has been removed.
* no_save_main_session option has been removed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/35e2fdc7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/35e2fdc7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/35e2fdc7

Branch: refs/heads/python-sdk
Commit: 35e2fdc7f22f20d74a569e86ced931209661dec1
Parents: 3454d69
Author: Pablo 
Authored: Tue Dec 6 18:01:54 2016 -0800
Committer: Robert Bradshaw 
Committed: Wed Dec 21 15:14:52 2016 -0800

--
 sdks/python/apache_beam/internal/apiclient.py   | 45 
 .../apache_beam/internal/apiclient_test.py  |  6 +++
 sdks/python/apache_beam/utils/options.py| 33 ++
 .../utils/pipeline_options_validator.py | 11 ++---
 .../utils/pipeline_options_validator_test.py|  8 ++--
 5 files changed, 54 insertions(+), 49 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35e2fdc7/sdks/python/apache_beam/internal/apiclient.py
--
diff --git a/sdks/python/apache_beam/internal/apiclient.py 
b/sdks/python/apache_beam/internal/apiclient.py
index f1341a7..3a9ba46 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -18,6 +18,8 @@
 """Dataflow client utility functions."""
 
 import codecs
+from datetime import datetime
+import getpass
 import json
 import logging
 import os
@@ -46,10 +48,6 @@ from apache_beam.utils.options import WorkerOptions
 from apache_beam.internal.clients import storage
 import apache_beam.internal.clients.dataflow as dataflow
 
-BIGQUERY_API_SERVICE = 'bigquery.googleapis.com'
-COMPUTE_API_SERVICE = 'compute.googleapis.com'
-STORAGE_API_SERVICE = 'storage.googleapis.com'
-
 
 class Step(object):
   """Wrapper for a dataflow Step protobuf."""
@@ -121,11 +119,13 @@ class Environment(object):
 self.worker_options = options.view_as(WorkerOptions)
 self.debug_options = options.view_as(DebugOptions)
 self.proto = dataflow.Environment()
-self.proto.clusterManagerApiService = COMPUTE_API_SERVICE
-self.proto.dataset = '%s/cloud_dataflow' % BIGQUERY_API_SERVICE
+self.proto.clusterManagerApiService = 
GoogleCloudOptions.COMPUTE_API_SERVICE
+self.proto.dataset = '{}/cloud_dataflow'.format(
+GoogleCloudOptions.BIGQUERY_API_SERVICE)
 self.proto.tempStoragePrefix = (
-self.google_cloud_options.temp_location.replace('gs:/',
-STORAGE_API_SERVICE))
+self.google_cloud_options.temp_location.replace(
+'gs:/',
+GoogleCloudOptions.STORAGE_API_SERVICE))
 # User agent information.
 self.proto.userAgent = dataflow.Environment.UserAgentValue()
 self.local = 'localhost' in self.google_cloud_options.dataflow_endpoint
@@ -165,7 +165,7 @@ class Environment(object):
   dataflow.Package(
   location='%s/%s' % (
   self.google_cloud_options.staging_location.replace(
-  'gs:/', STORAGE_API_SERVICE),
+  'gs:/', GoogleCloudOptions.STORAGE_API_SERVICE),
   package),
   name=package))
 
@@ -174,7 +174,7 @@ class Environment(object):
 packages=package_descriptors,
 taskrunnerSettings=dataflow.TaskRunnerSettings(
 parallelWorkerSettings=dataflow.WorkerSettings(
-baseUrl='https://dataflow.googleapis.com',
+baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT,
 servicePath=self.google_cloud_options.dataflow_endpoint)))
 pool.autoscalingSettings = dataflow.AutoscalingSettings()
 # Set worker pool options received through command line.
@@ -195,8 +195,6 @@ class Environment(object):
   pool.diskSizeGb = self.worker_options.disk_size_gb
 if self.worker_options.disk_type:
   pool.diskType = self.worker_options.disk_type
-if self.worker_options.disk_source_image:
-  pool.diskSourceImage = self.worker_options.disk_source_image
 if self.worker_options.zone:
   pool.zone = self.worker_options.zone
 if 

[2/2] incubator-beam git commit: Closes #1526

2016-12-21 Thread robertwb
Closes #1526


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bb09c07b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bb09c07b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bb09c07b

Branch: refs/heads/python-sdk
Commit: bb09c07b6351dcc53c0bdc8bf1259261ad2edfba
Parents: 3454d69 35e2fdc
Author: Robert Bradshaw 
Authored: Wed Dec 21 15:15:20 2016 -0800
Committer: Robert Bradshaw 
Committed: Wed Dec 21 15:15:20 2016 -0800

--
 sdks/python/apache_beam/internal/apiclient.py   | 45 
 .../apache_beam/internal/apiclient_test.py  |  6 +++
 sdks/python/apache_beam/utils/options.py| 33 ++
 .../utils/pipeline_options_validator.py | 11 ++---
 .../utils/pipeline_options_validator_test.py|  8 ++--
 5 files changed, 54 insertions(+), 49 deletions(-)
--




[jira] [Created] (BEAM-1202) Coders should have meaningful equals methods

2016-12-21 Thread Thomas Groh (JIRA)
Thomas Groh created BEAM-1202:
-

 Summary: Coders should have meaningful equals methods
 Key: BEAM-1202
 URL: https://issues.apache.org/jira/browse/BEAM-1202
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-core
Reporter: Thomas Groh
Assignee: Davor Bonaci


{{StandardCoder}} implements an equality check based on the component coders 
and equal classes. Any coder that is configured, or that does not extend 
{{StandardCoder}}, should have meaningful implementations of {{equals}} and 
{{hashCode}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-1202) Coders should have meaningful equals methods

2016-12-21 Thread Thomas Groh (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Groh updated BEAM-1202:
--
Assignee: (was: Davor Bonaci)

> Coders should have meaningful equals methods
> 
>
> Key: BEAM-1202
> URL: https://issues.apache.org/jira/browse/BEAM-1202
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Thomas Groh
>
> {{StandardCoder}} implements an equality check based on the component coders 
> and equal classes. Any coder that is configured, or that does not extend 
> {{StandardCoder}}, should have meaningful implementations of {{equals}} and 
> {{hashCode}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-1201) Remove producesSortedKeys from Source

2016-12-21 Thread Daniel Halperin (JIRA)
Daniel Halperin created BEAM-1201:
-

 Summary: Remove producesSortedKeys from Source
 Key: BEAM-1201
 URL: https://issues.apache.org/jira/browse/BEAM-1201
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-core
Reporter: Daniel Halperin
Assignee: Daniel Halperin
Priority: Minor


This is a holdover from a precursor of the old Dataflow SDK that we just failed 
to delete before releasing Dataflow 1.0, but we can delete before the first 
stable release of Beam.

This function has never been used by any runner. It does not mean anything 
obvious to implementors, as many sources produce {{T}}, not {{KV}} -- what 
does it mean in the former case? (And how do you get the latter case correct?)





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[46/51] [abbrv] incubator-beam git commit: Allow setting timer by ID in DirectTimerInternals

2016-12-21 Thread kenn
Allow setting timer by ID in DirectTimerInternals


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7f14c463
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7f14c463
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7f14c463

Branch: refs/heads/python-sdk
Commit: 7f14c463acd2ae5b86ac81a9528ac4aa7dff765f
Parents: 4d71924
Author: Kenneth Knowles 
Authored: Wed Dec 7 20:18:44 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 13:45:37 2016 -0800

--
 .../runners/direct/DirectTimerInternals.java|  2 +-
 .../beam/runners/direct/WatermarkManager.java   | 25 
 2 files changed, 26 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f14c463/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
index 5ca276d..80e0721 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
@@ -49,7 +49,7 @@ class DirectTimerInternals implements TimerInternals {
   @Override
   public void setTimer(StateNamespace namespace, String timerId, Instant 
target,
   TimeDomain timeDomain) {
-throw new UnsupportedOperationException("Setting timer by ID not yet 
supported.");
+timerUpdateBuilder.setTimer(TimerData.of(timerId, namespace, target, 
timeDomain));
   }
 
   @Deprecated

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f14c463/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 7bed751..f7bafd1 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -23,11 +23,13 @@ import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.SortedMultiset;
+import com.google.common.collect.Table;
 import com.google.common.collect.TreeMultiset;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -56,6 +58,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TaggedPValue;
 import org.joda.time.Instant;
@@ -210,6 +213,10 @@ public class WatermarkManager {
 private final SortedMultiset pendingElements;
 private final Map objectTimers;
 
+// Entries in this table represent the authoritative timestamp for which
+// a per-key-and-StateNamespace timer is set.
+private final Map> existingTimers;
+
 private AtomicReference currentWatermark;
 
 public AppliedPTransformInputWatermark(Collection 
inputWatermarks) {
@@ -222,6 +229,7 @@ public class WatermarkManager {
   this.pendingElements =
   TreeMultiset.create(pendingBundleComparator);
   this.objectTimers = new HashMap<>();
+  this.existingTimers = new HashMap<>();
   currentWatermark = new 
AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
 }
 
@@ -276,14 +284,31 @@ public class WatermarkManager {
 keyTimers = new TreeSet<>();
 objectTimers.put(update.key, keyTimers);
   }
+  Table existingTimersForKey =
+  existingTimers.get(update.key);
+  if (existingTimersForKey == null) {
+existingTimersForKey = HashBasedTable.create();
+existingTimers.put(update.key, existingTimersForKey);
+  }
+

[49/51] [abbrv] incubator-beam git commit: This closes #1669: Preliminaries for timers in the direct runner

2016-12-21 Thread kenn
This closes #1669: Preliminaries for timers in the direct runner

  Hold output watermark according to pending timers
  Allow setting timer by ID in DirectTimerInternals
  Add UsesTestStream for use with JUnit @Category
  Add static Window.withOutputTimeFn to match build method
  Use informative Instant formatter in WatermarkHold
  Add informative Instant formatter to BoundedWindow


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/57d9bbd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/57d9bbd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/57d9bbd7

Branch: refs/heads/python-sdk
Commit: 57d9bbd797edfcf32fdd9284b802fc4f9694e8d2
Parents: ff39516 dfe2e62
Author: Kenneth Knowles 
Authored: Wed Dec 21 13:46:34 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 13:46:34 2016 -0800

--
 .../apache/beam/runners/core/WatermarkHold.java |  4 +-
 .../runners/direct/DirectTimerInternals.java|  2 +-
 .../beam/runners/direct/WatermarkManager.java   | 78 ++--
 .../apache/beam/sdk/testing/UsesTestStream.java | 24 ++
 .../sdk/transforms/windowing/BoundedWindow.java | 31 
 .../beam/sdk/transforms/windowing/Window.java   |  9 +++
 .../apache/beam/sdk/testing/TestStreamTest.java | 12 +--
 7 files changed, 144 insertions(+), 16 deletions(-)
--




[50/51] [abbrv] incubator-beam git commit: Merge remote-tracking branch 'origin/master' into python-sdk

2016-12-21 Thread kenn
Merge remote-tracking branch 'origin/master' into python-sdk


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/77742979
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/77742979
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/77742979

Branch: refs/heads/python-sdk
Commit: 77742979463d0590969b186cee2af017a2bb1037
Parents: 3b4fd5c 57d9bbd
Author: Ahmet Altay 
Authored: Wed Dec 21 14:22:06 2016 -0800
Committer: Ahmet Altay 
Committed: Wed Dec 21 14:22:06 2016 -0800

--
 .gitignore  |   4 +
 .jenkins/common_job_properties.groovy   | 173 ++
 ...job_beam_PostCommit_Java_MavenInstall.groovy |  42 ++
 ...ostCommit_Java_RunnableOnService_Apex.groovy |  41 ++
 ...ommit_Java_RunnableOnService_Dataflow.groovy |  39 ++
 ...stCommit_Java_RunnableOnService_Flink.groovy |  38 ++
 ...ommit_Java_RunnableOnService_Gearpump.groovy |  41 ++
 ...stCommit_Java_RunnableOnService_Spark.groovy |  38 ++
 .../job_beam_PostCommit_Python_Verify.groovy|  37 ++
 .../job_beam_PreCommit_Java_MavenInstall.groovy |  42 ++
 .../job_beam_Release_NightlySnapshot.groovy |  46 ++
 .jenkins/job_seed.groovy|  47 ++
 .travis.yml |   2 +-
 .travis/README.md   |   2 +-
 README.md   |   2 +-
 examples/java/pom.xml   |  20 +-
 .../apache/beam/examples/WindowedWordCount.java | 177 +++---
 .../org/apache/beam/examples/WordCount.java |   2 +-
 .../examples/common/WriteWindowedFilesDoFn.java |  77 +++
 .../beam/examples/complete/AutoComplete.java|   6 +-
 .../org/apache/beam/examples/complete/README.md |  14 +-
 .../apache/beam/examples/complete/TfIdf.java|   6 +-
 .../examples/complete/TopWikipediaSessions.java |  15 +-
 .../examples/complete/TrafficMaxLaneFlow.java   |   4 +-
 .../beam/examples/complete/TrafficRoutes.java   |   4 +-
 .../examples/cookbook/BigQueryTornadoes.java|   2 +-
 .../cookbook/CombinePerKeyExamples.java |   2 +-
 .../beam/examples/cookbook/DeDupExample.java|  96 ---
 .../beam/examples/cookbook/DistinctExample.java |  96 +++
 .../beam/examples/cookbook/FilterExamples.java  |   2 +-
 .../examples/cookbook/MaxPerKeyExamples.java|   2 +-
 .../org/apache/beam/examples/cookbook/README.md |  16 +-
 .../beam/examples/cookbook/TriggerExample.java  |   4 +-
 .../beam/examples/WindowedWordCountIT.java  | 182 +-
 .../org/apache/beam/examples/WordCountIT.java   |   2 +-
 .../org/apache/beam/examples/WordCountTest.java |   7 +-
 .../examples/complete/AutoCompleteTest.java |  13 +-
 .../beam/examples/complete/TfIdfTest.java   |   6 +-
 .../complete/TopWikipediaSessionsTest.java  |   7 +-
 .../examples/cookbook/DistinctExampleTest.java  |   9 +-
 .../examples/cookbook/JoinExamplesTest.java |   6 +-
 .../examples/cookbook/TriggerExampleTest.java   |   6 +-
 examples/java8/pom.xml  |   2 +-
 .../beam/examples/complete/game/GameStats.java  |   2 +-
 .../examples/complete/game/LeaderBoard.java |   4 +-
 .../beam/examples/complete/game/UserScore.java  |   2 +-
 .../complete/game/utils/WriteToBigQuery.java|   2 +-
 .../game/utils/WriteWindowedToBigQuery.java |   7 +-
 .../examples/MinimalWordCountJava8Test.java |   6 +-
 .../examples/complete/game/GameStatsTest.java   |   7 +-
 .../complete/game/HourlyTeamScoreTest.java  |   5 +-
 .../examples/complete/game/LeaderBoardTest.java |  11 +-
 .../examples/complete/game/UserScoreTest.java   |  10 +-
 examples/pom.xml|   2 +-
 pom.xml |  15 +-
 runners/apex/pom.xml|  69 ++-
 .../apache/beam/runners/apex/ApexRunner.java| 111 ++--
 .../beam/runners/apex/ApexRunnerResult.java |  50 +-
 .../beam/runners/apex/ApexYarnLauncher.java | 395 
 .../translation/ApexPipelineTranslator.java |  14 +-
 .../translation/ParDoBoundMultiTranslator.java  |  22 +-
 .../apex/translation/ParDoBoundTranslator.java  |  22 +-
 .../apex/translation/TranslationContext.java|  10 +-
 .../apex/translation/WindowBoundTranslator.java |  78 +++
 .../operators/ApexGroupByKeyOperator.java   |  26 +-
 .../operators/ApexParDoOperator.java|  29 +-
 .../apex/translation/utils/NoOpStepContext.java |   3 +-
 .../beam/runners/apex/ApexYarnLauncherTest.java | 138 +
 .../FlattenPCollectionTranslatorTest.java   |  15 +-
 .../translation/GroupByKeyTranslatorTest.java   |  21 +-
 .../translation/ParDoBoundTranslatorTest.java   |  38 +-
 .../translation/ReadUnboundTranslatorTest.java  |  15 +-
 runners/core-java/pom.xml   |   6 +-
 

[41/51] [abbrv] incubator-beam git commit: Fix windowing in direct runner Stateful ParDo

2016-12-21 Thread kenn
Fix windowing in direct runner Stateful ParDo


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4fb16e8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4fb16e8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4fb16e8f

Branch: refs/heads/python-sdk
Commit: 4fb16e8fb9bb087c0975f38c54665634868cfed7
Parents: 7ee8c86
Author: Kenneth Knowles 
Authored: Tue Dec 20 13:58:29 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 13:11:22 2016 -0800

--
 .../direct/ParDoMultiOverrideFactory.java   | 34 ++--
 1 file changed, 31 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4fb16e8f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 2cea999..b35df87 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -34,8 +34,13 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -92,9 +97,12 @@ class ParDoMultiOverrideFactory
 @Override
 public PCollectionTuple expand(PCollection> input) {
 
+  WindowingStrategy inputWindowingStrategy = 
input.getWindowingStrategy();
+
   // A KvCoder is required since this goes through GBK. Further, 
WindowedValueCoder
   // is not registered by default, so we explicitly set the relevant 
coders.
-  checkState(input.getCoder() instanceof KvCoder,
+  checkState(
+  input.getCoder() instanceof KvCoder,
   "Input to a %s using state requires a %s, but the coder was %s",
   ParDo.class.getSimpleName(),
   KvCoder.class.getSimpleName(),
@@ -102,14 +110,27 @@ class ParDoMultiOverrideFactory
   KvCoder kvCoder = (KvCoder) input.getCoder();
   Coder keyCoder = kvCoder.getKeyCoder();
   Coder windowCoder =
-  input.getWindowingStrategy().getWindowFn().windowCoder();
+  inputWindowingStrategy.getWindowFn().windowCoder();
 
-  PCollectionTuple outputs =
+  PCollection>> adjustedInput =
   input
   // Stash the original timestamps, etc, for when it is fed to the 
user's DoFn
   .apply("Reify timestamps", ParDo.of(new ReifyWindowedValueFn()))
   .setCoder(KvCoder.of(keyCoder, 
WindowedValue.getFullCoder(kvCoder, windowCoder)))
 
+  // We are going to GBK to gather keys and windows but otherwise 
do not want
+  // to alter the flow of data. This entails:
+  //  - trigger as fast as possible
+  //  - maintain the full timestamps of elements
+  //  - ensure this GBK holds to the minimum of those timestamps 
(via OutputTimeFn)
+  //  - discard past panes as it is "just a stream" of elements
+  .apply(
+  Window.>>create())
 
@@ -117,6 +138,13 @@ class ParDoMultiOverrideFactory
   .apply("To KeyedWorkItem", ParDo.of(new ToKeyedWorkItem()))
   

[44/51] [abbrv] incubator-beam git commit: Add static Window.withOutputTimeFn to match build method

2016-12-21 Thread kenn
Add static Window.withOutputTimeFn to match build method


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8188040d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8188040d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8188040d

Branch: refs/heads/python-sdk
Commit: 8188040d930b1fa49efd4ed7d5f821d05d6f28ef
Parents: fa4958a
Author: Kenneth Knowles 
Authored: Tue Dec 20 13:57:55 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 13:45:37 2016 -0800

--
 .../org/apache/beam/sdk/transforms/windowing/Window.java| 9 +
 1 file changed, 9 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8188040d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 0c430d0..1241abe 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -223,6 +223,15 @@ public class Window {
   }
 
   /**
+   * (Experimental) Override the default {@link OutputTimeFn}, 
to control
+   * the output timestamp of values output from a {@link GroupByKey} operation.
+   */
+  @Experimental(Kind.OUTPUT_TIME)
+  public static  Bound withOutputTimeFn(OutputTimeFn outputTimeFn) {
+return new Bound(null).withOutputTimeFn(outputTimeFn);
+  }
+
+  /**
* A {@code PTransform} that windows the elements of a {@code 
PCollection},
* into finite windows according to a user-specified {@code WindowFn}.
*



[45/51] [abbrv] incubator-beam git commit: Add UsesTestStream for use with JUnit @Category

2016-12-21 Thread kenn
Add UsesTestStream for use with JUnit @Category


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4d71924c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4d71924c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4d71924c

Branch: refs/heads/python-sdk
Commit: 4d71924ccda9dae97c7cc9535a9780df9457cc3f
Parents: 8188040
Author: Kenneth Knowles 
Authored: Tue Dec 20 14:20:07 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 13:45:37 2016 -0800

--
 .../apache/beam/sdk/testing/UsesTestStream.java | 24 
 .../apache/beam/sdk/testing/TestStreamTest.java | 12 +-
 2 files changed, 30 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d71924c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java
new file mode 100644
index 000..8debb46
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+/**
+ * Category tag for tests that use {@link TestStream}, which is not a part of 
the Beam model
+ * but a special feature currently only implemented by the direct runner.
+ */
+public interface UsesTestStream {}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d71924c/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index 64aeca3..c12e9f3 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -69,7 +69,7 @@ public class TestStreamTest implements Serializable {
   @Rule public transient ExpectedException thrown = ExpectedException.none();
 
   @Test
-  @Category(NeedsRunner.class)
+  @Category({NeedsRunner.class, UsesTestStream.class})
   public void testLateDataAccumulating() {
 Instant instant = new Instant(0);
 TestStream source = TestStream.create(VarIntCoder.of())
@@ -136,7 +136,7 @@ public class TestStreamTest implements Serializable {
   }
 
   @Test
-  @Category(NeedsRunner.class)
+  @Category({NeedsRunner.class, UsesTestStream.class})
   public void testProcessingTimeTrigger() {
 TestStream source = TestStream.create(VarLongCoder.of())
 .addElements(TimestampedValue.of(1L, new Instant(1000L)),
@@ -159,7 +159,7 @@ public class TestStreamTest implements Serializable {
   }
 
   @Test
-  @Category(NeedsRunner.class)
+  @Category({NeedsRunner.class, UsesTestStream.class})
   public void testDiscardingMode() {
 TestStream stream =
 TestStream.create(StringUtf8Coder.of())
@@ -208,7 +208,7 @@ public class TestStreamTest implements Serializable {
   }
 
   @Test
-  @Category(NeedsRunner.class)
+  @Category({NeedsRunner.class, UsesTestStream.class})
   public void testFirstElementLate() {
 Instant lateElementTimestamp = new Instant(-1_000_000);
 TestStream stream =
@@ -238,7 +238,7 @@ public class TestStreamTest implements Serializable {
   }
 
   @Test
-  @Category(NeedsRunner.class)
+  @Category({NeedsRunner.class, UsesTestStream.class})
   public void testElementsAtAlmostPositiveInfinity() {
 Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
 TestStream stream = TestStream.create(StringUtf8Coder.of())
@@ -261,7 +261,7 @@ public class TestStreamTest implements Serializable {
   }
 
   @Test
-  @Category(NeedsRunner.class)
+  

[36/51] [abbrv] incubator-beam git commit: This closes #1673: Require TimeDomain to delete a timer

2016-12-21 Thread kenn
This closes #1673: Require TimeDomain to delete a timer


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4843dc59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4843dc59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4843dc59

Branch: refs/heads/python-sdk
Commit: 4843dc59c6e87ea0be75f7abd1e312bf5bc5a529
Parents: 0d0a5e2 35a0274
Author: Kenneth Knowles 
Authored: Wed Dec 21 10:15:56 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 10:15:56 2016 -0800

--
 .../operators/ApexGroupByKeyOperator.java  |  8 
 .../beam/runners/core/InMemoryTimerInternals.java  |  8 
 .../beam/runners/direct/DirectTimerInternals.java  |  8 
 .../wrappers/streaming/WindowDoFnOperator.java |  9 +
 .../org/apache/beam/sdk/util/TimerInternals.java   | 17 +++--
 5 files changed, 48 insertions(+), 2 deletions(-)
--




[51/51] [abbrv] incubator-beam git commit: This closes #1676: Merge master (57d9bbd) into python-sdk

2016-12-21 Thread kenn
This closes #1676: Merge master (57d9bbd) into python-sdk

No fixups. Commit from master:

  Hold output watermark according to pending timers
  Allow setting timer by ID in DirectTimerInternals
  Add UsesTestStream for use with JUnit @Category
  Add static Window.withOutputTimeFn to match build method
  Use informative Instant formatter in WatermarkHold
  Add informative Instant formatter to BoundedWindow
  Actually propagate and commit state in direct runner
  Fix windowing in direct runner Stateful ParDo
  Support set and delete of timer by ID in InMemoryTimerInternals
  Move ExecutionContext and related classes to runners-core
  Update Dataflow worker to beam-master-20161221
  Require TimeDomain to delete a timer
  Provide local tags in PInput, POutput expansions
  Remove deprecated InMemoryTimerInternals from SDK
  Remove deprecated AggregatorFactory from SDK
  Update Dataflow worker to beam-master-20161220
  Fixup usage of canonical name with name since canonical name != name for 
inner classes.
  Provide a better error message for non-existing gcpTempLocation
  Restore SDK's InMemoryTimerInternals, deprecated
  Move InMemoryTimerInternals to runners-core
  Port direct runner StatefulParDo to KeyedWorkItem
  Propagate key through ParDo if DoFn is key-preserving
  Move responsibility for knowing about keyedness into EvaluationContext
  Add some key-preserving to KeyedPValueTrackingVisitor
  Migrated the beam-sdks-java-io-java8tests module to TestPipeline as a JUnit 
rule + fixed WithTimestampsJava8Test.withTimestampsLambdaShouldApplyTimestamps.
  Migrated the beam-sdks-java-io-mongodb module to TestPipeline as a JUnit rule.
  Migrated the beam-sdks-java-io-kinesis module to TestPipeline as a JUnit rule.
  Migrated the beam-sdks-java-io-kafka module to TestPipeline as a JUnit rule.
  Migrated the beam-sdks-java-io-jms module to TestPipeline as a JUnit rule.
  Migrated the beam-sdks-java-io-jdbc module to TestPipeline as a JUnit rule.
  Migrated the beam-sdks-java-io-google-cloud-platform module to TestPipeline 
as a JUnit rule.
  Migrated the beam-sdks-java-extensions-sorter module to TestPipeline as a 
JUnit rule.
  Migrated the beam-sdks-java-extensions-join-library module to TestPipeline as 
a JUnit rule.
  Migrated the beam-sdks-java-core module to TestPipeline as a JUnit rule. 
Plus, fixed some checkstyle errors from previous modules' migration.
  Migrated the beam-runners-direct-java module to TestPipeline as a JUnit rule.
  Migrated the beam-runners-core module to TestPipeline as a JUnit rule.
  Migrated the beam-examples-java8 module to TestPipeline as a JUnit rule.
  Migrated the beam-examples-java module to TestPipeline as a JUnit rule.
  Disable automatic archiving of Maven builds
  [BEAM-59] initial interfaces and classes of Beam FileSystem.
  Change counter name in TestDataflowRunner
  More escaping in Jenkins timestamp spec
  Add RunnableOnService test for Metrics
  Fix seed job fetch spec
  Show timestamps on log lines in Jenkins
  [BEAM-1165] Fix unexpected file creation when checking dependencies
  [BEAM-1178] Make naming of logger objects consistent
  [BEAM-716] Fix javadoc on with* methods [BEAM-959] Improve check 
preconditions in JmsIO
  [BEAM-716] Use AutoValue in JmsIO
  Fix grammar error (repeated for)
  Empty TestPipeline need not be run
  [BEAM-85, BEAM-298] Make TestPipeline a JUnit Rule checking proper usage
  Change counter name in TestDataflowRunner
  BigQueryIO: fix streaming write, typo in API
  [BEAM-853] Force streaming execution on batch pipelines for testing. Expose 
the adapted source.
  Use empty SideInputReader, fixes NPE in SimpleDoFnRunnerTest
  Test that SimpleDoFnRunner wraps exceptions in startBundle and finishBundle
  Add timer support to DoFnRunner(s)
  Make TimerSpec and StateSpec fields accessible
  View.asMap: minor javadoc fixes
  Revert "Move InMemoryTimerInternals to runners-core"
  Revert "Moves DoFnAdapters to runners-core"
  Revert "Removes ArgumentProvider.windowingInternals"
  Revert "Removes code for wrapping DoFn as an OldDoFn"
  checkstyle: missed newline in DistributionCell
  Make {Metric,Counter,Distribution}Cell public
  Add PTransformOverrideFactory to the Core SDK
  Move ActiveWindowSet and implementations to runners-core
  Update Dataflow worker to beam-master-20161216
  [BEAM-1108] Remove outdated language about experimental autoscaling
  [BEAM-450] Shade modules to separate paths
  [BEAM-362] Port runners to runners-core AggregatoryFactory
  Move InMemoryTimerInternals to runners-core
  Delete deprecated TimerCallback
  Remove deprecated methods of InMemoryTimerInternals
  Don't incorrectly log error in MetricsEnvironment
  Renames ParDo.getNewFn to getFn
  Moves DoFnAdapters to runners-core
  Removes unused code from NoOpOldDoFn
  Removes ArgumentProvider.windowingInternals
  Removes code for wrapping DoFn as an OldDoFn
  Removes OldDoFn from ParDo
  Pushes uses of OldDoFn deeper inside Flin

[38/51] [abbrv] incubator-beam git commit: Support set and delete of timer by ID in InMemoryTimerInternals

2016-12-21 Thread kenn
Support set and delete of timer by ID in InMemoryTimerInternals


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/df2e540d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/df2e540d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/df2e540d

Branch: refs/heads/python-sdk
Commit: df2e540d7a7b8444b9ff3b404740d5a3394b7691
Parents: acd2196
Author: Kenneth Knowles 
Authored: Mon Dec 19 14:01:36 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 11:01:36 2016 -0800

--
 .../runners/core/InMemoryTimerInternals.java|  65 +++
 .../core/InMemoryTimerInternalsTest.java| 112 +--
 2 files changed, 120 insertions(+), 57 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/df2e540d/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 5fcd088..292ac23 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -17,13 +17,15 @@
  */
 package org.apache.beam.runners.core;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.MoreObjects;
-import java.util.HashSet;
-import java.util.PriorityQueue;
-import java.util.Set;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+import java.util.NavigableSet;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
@@ -35,17 +37,17 @@ import org.joda.time.Instant;
 /** {@link TimerInternals} with all watermarks and processing clock simulated 
in-memory. */
 public class InMemoryTimerInternals implements TimerInternals {
 
-  /** At most one timer per timestamp is kept. */
-  private Set existingTimers = new HashSet<>();
+  /** The current set timers by namespace and ID. */
+  Table existingTimers = 
HashBasedTable.create();
 
   /** Pending input watermark timers, in timestamp order. */
-  private PriorityQueue watermarkTimers = new PriorityQueue<>(11);
+  private NavigableSet watermarkTimers = new TreeSet<>();
 
   /** Pending processing time timers, in timestamp order. */
-  private PriorityQueue processingTimers = new PriorityQueue<>(11);
+  private NavigableSet processingTimers = new TreeSet<>();
 
   /** Pending synchronized processing time timers, in timestamp order. */
-  private PriorityQueue synchronizedProcessingTimers = new 
PriorityQueue<>(11);
+  private NavigableSet synchronizedProcessingTimers = new 
TreeSet<>();
 
   /** Current input watermark. */
   private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
@@ -74,13 +76,13 @@ public class InMemoryTimerInternals implements 
TimerInternals {
 final TimerData data;
 switch (domain) {
   case EVENT_TIME:
-data = watermarkTimers.peek();
+data = watermarkTimers.first();
 break;
   case PROCESSING_TIME:
-data = processingTimers.peek();
+data = processingTimers.first();
 break;
   case SYNCHRONIZED_PROCESSING_TIME:
-data = synchronizedProcessingTimers.peek();
+data = synchronizedProcessingTimers.first();
 break;
   default:
 throw new IllegalArgumentException("Unexpected time domain: " + 
domain);
@@ -88,7 +90,7 @@ public class InMemoryTimerInternals implements TimerInternals 
{
 return (data == null) ? null : data.getTimestamp();
   }
 
-  private PriorityQueue queue(TimeDomain domain) {
+  private NavigableSet timersForDomain(TimeDomain domain) {
 switch (domain) {
   case EVENT_TIME:
 return watermarkTimers;
@@ -104,27 +106,45 @@ public class InMemoryTimerInternals implements 
TimerInternals {
   @Override
   public void setTimer(StateNamespace namespace, String timerId, Instant 
target,
   TimeDomain timeDomain) {
-throw new UnsupportedOperationException("Setting a timer by ID is not yet 
supported.");
+setTimer(TimerData.of(timerId, namespace, target, timeDomain));
   }
 
   @Override
   public void setTimer(TimerData timerData) {
 WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), 
timerData);
- 

[35/51] [abbrv] incubator-beam git commit: Update Dataflow worker to beam-master-20161221

2016-12-21 Thread kenn
Update Dataflow worker to beam-master-20161221


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/64336e40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/64336e40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/64336e40

Branch: refs/heads/python-sdk
Commit: 64336e40dd6a48b3b6b48634bb9204db0aa0c7ca
Parents: 0d0a5e2
Author: Kenneth Knowles <k...@google.com>
Authored: Wed Dec 21 10:09:49 2016 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Wed Dec 21 10:10:00 2016 -0800

--
 .../org/apache/beam/runners/dataflow/dataflow.properties | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64336e40/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
--
diff --git 
a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
 
b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
index bf08e83..2912f61 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
+++ 
b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
@@ -18,6 +18,6 @@
 
 environment.major.version=6
 
-worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161220
+worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161221
 
-worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161220
+worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161221



[47/51] [abbrv] incubator-beam git commit: Hold output watermark according to pending timers

2016-12-21 Thread kenn
Hold output watermark according to pending timers


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dfe2e62d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dfe2e62d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dfe2e62d

Branch: refs/heads/python-sdk
Commit: dfe2e62d103595583e3ca4594cc03885fe1bba16
Parents: 7f14c46
Author: Kenneth Knowles 
Authored: Tue Dec 20 13:37:40 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 13:45:37 2016 -0800

--
 .../beam/runners/direct/WatermarkManager.java   | 59 
 1 file changed, 48 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfe2e62d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index f7bafd1..248fafd 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -211,12 +211,18 @@ public class WatermarkManager {
   private static class AppliedPTransformInputWatermark implements Watermark {
 private final Collection inputWatermarks;
 private final SortedMultiset pendingElements;
-private final Map objectTimers;
+
+// This tracks only the quantity of timers at each timestamp, for quickly 
getting the cross-key
+// minimum
+private final SortedMultiset pendingTimers;
 
 // Entries in this table represent the authoritative timestamp for which
 // a per-key-and-StateNamespace timer is set.
 private final Map> existingTimers;
 
+// This per-key sorted set allows quick retrieval of timers that should 
fire for a key
+private final Map objectTimers;
+
 private AtomicReference currentWatermark;
 
 public AppliedPTransformInputWatermark(Collection 
inputWatermarks) {
@@ -224,10 +230,13 @@ public class WatermarkManager {
   // The ordering must order elements by timestamp, and must not compare 
two distinct elements
   // as equal. This is built on the assumption that any element added as a 
pending element will
   // be consumed without modifications.
+  //
+  // The same logic is applied for pending timers
   Ordering pendingBundleComparator =
   new 
BundleByElementTimestampComparator().compound(Ordering.arbitrary());
   this.pendingElements =
   TreeMultiset.create(pendingBundleComparator);
+  this.pendingTimers = TreeMultiset.create();
   this.objectTimers = new HashMap<>();
   this.existingTimers = new HashMap<>();
   currentWatermark = new 
AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
@@ -278,6 +287,14 @@ public class WatermarkManager {
   pendingElements.remove(completed);
 }
 
+private synchronized Instant getEarliestTimerTimestamp() {
+  if (pendingTimers.isEmpty()) {
+return BoundedWindow.TIMESTAMP_MAX_VALUE;
+  } else {
+return pendingTimers.firstEntry().getElement();
+  }
+}
+
 private synchronized void updateTimers(TimerUpdate update) {
   NavigableSet keyTimers = objectTimers.get(update.key);
   if (keyTimers == null) {
@@ -291,27 +308,43 @@ public class WatermarkManager {
 existingTimers.put(update.key, existingTimersForKey);
   }
 
-  for (TimerData timer : update.setTimers) {
+  for (TimerData timer : update.getSetTimers()) {
+if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
+  @Nullable
+  TimerData existingTimer =
+  existingTimersForKey.get(timer.getNamespace(), 
timer.getTimerId());
+
+  if (existingTimer == null) {
+pendingTimers.add(timer.getTimestamp());
+keyTimers.add(timer);
+  } else if (!existingTimer.equals(timer)) {
+keyTimers.remove(existingTimer);
+keyTimers.add(timer);
+  } // else the timer is already set identically, so noop
+
+  existingTimersForKey.put(timer.getNamespace(), timer.getTimerId(), 
timer);
+}
+  }
+
+  for (TimerData timer : update.getDeletedTimers()) {
 if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
   @Nullable
   TimerData existingTimer =
   existingTimersForKey.get(timer.getNamespace(), 
timer.getTimerId());
 

[43/51] [abbrv] incubator-beam git commit: Add informative Instant formatter to BoundedWindow

2016-12-21 Thread kenn
Add informative Instant formatter to BoundedWindow


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/280a6a8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/280a6a8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/280a6a8f

Branch: refs/heads/python-sdk
Commit: 280a6a8f729cb382616ad65f71860b61277cbd6f
Parents: ff39516
Author: Kenneth Knowles 
Authored: Mon Dec 19 20:40:11 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 13:45:36 2016 -0800

--
 .../beam/sdk/transforms/windowing/BoundedWindow.java| 12 
 1 file changed, 12 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/280a6a8f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
index 3654074..6da2495 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
@@ -39,6 +39,18 @@ public abstract class BoundedWindow {
   public static final Instant TIMESTAMP_MAX_VALUE =
   new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
 
+  public static String formatTimestamp(Instant timestamp) {
+if (timestamp.equals(TIMESTAMP_MIN_VALUE)) {
+  return timestamp.toString() + " (TIMESTAMP_MIN_VALUE)";
+} else if (timestamp.equals(TIMESTAMP_MAX_VALUE)) {
+  return timestamp.toString() + " (TIMESTAMP_MAX_VALUE)";
+} else if (timestamp.equals(GlobalWindow.INSTANCE.maxTimestamp())) {
+  return timestamp.toString() + " (end of global window)";
+} else {
+  return timestamp.toString();
+}
+  }
+
   /**
* Returns the inclusive upper bound of timestamps for values in this window.
*/



[28/51] [abbrv] incubator-beam git commit: This closes #1665: Remove deprecated AggregatorFactory from SDK

2016-12-21 Thread kenn
This closes #1665: Remove deprecated AggregatorFactory from SDK

  Update Dataflow worker to beam-master-20161220
  Remove deprecated AggregatorFactory from SDK


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6a05d7f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6a05d7f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6a05d7f1

Branch: refs/heads/python-sdk
Commit: 6a05d7f17aab5cab202cdbf50b766b4fc86180b4
Parents: acd2196 aab46a0
Author: Kenneth Knowles 
Authored: Tue Dec 20 15:07:12 2016 -0800
Committer: Kenneth Knowles 
Committed: Tue Dec 20 15:07:12 2016 -0800

--
 .../beam/runners/dataflow/dataflow.properties|  4 ++--
 .../apache/beam/sdk/transforms/Aggregator.java   | 19 ---
 2 files changed, 2 insertions(+), 21 deletions(-)
--




[GitHub] incubator-beam pull request #1676: Merge remote-tracking branch 'origin/mast...

2016-12-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/1676


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[37/51] [abbrv] incubator-beam git commit: This closes #1666: Move ExecutionContext and related classes to runners-core

2016-12-21 Thread kenn
This closes #1666: Move ExecutionContext and related classes to runners-core

  Move ExecutionContext and related classes to runners-core
  Update Dataflow worker to beam-master-20161221


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a9447a22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a9447a22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a9447a22

Branch: refs/heads/python-sdk
Commit: a9447a2251f46496b7a773c8b07b3281dbc7a6fb
Parents: 4843dc5 9d2b8e0
Author: Kenneth Knowles <k...@google.com>
Authored: Wed Dec 21 10:24:11 2016 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Wed Dec 21 10:24:11 2016 -0800

--
 .../operators/ApexParDoOperator.java|   2 +-
 .../apex/translation/utils/NoOpStepContext.java |   3 +-
 .../beam/runners/core/AggregatorFactory.java|   1 -
 .../beam/runners/core/BaseExecutionContext.java | 176 +++
 .../apache/beam/runners/core/DoFnRunners.java   |   2 +-
 .../beam/runners/core/ExecutionContext.java | 102 +++
 .../beam/runners/core/SimpleDoFnRunner.java |   2 +-
 .../beam/runners/core/SimpleOldDoFnRunner.java  |   2 +-
 .../beam/runners/core/SimpleDoFnRunnerTest.java |   2 +-
 .../runners/core/SimpleOldDoFnRunnerTest.java   |   3 +-
 .../runners/direct/AggregatorContainer.java |   2 +-
 .../runners/direct/DirectExecutionContext.java  |   6 +-
 .../beam/runners/direct/EvaluationContext.java  |   2 +-
 .../runners/direct/AggregatorContainerTest.java |   2 +-
 .../wrappers/streaming/DoFnOperator.java|   2 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |   2 +-
 .../beam/runners/dataflow/dataflow.properties   |   4 +-
 .../spark/aggregators/SparkAggregators.java |   2 +-
 .../spark/translation/SparkProcessContext.java  |   2 +-
 .../beam/sdk/util/BaseExecutionContext.java | 174 --
 .../apache/beam/sdk/util/ExecutionContext.java  | 100 ---
 21 files changed, 297 insertions(+), 296 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a9447a22/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
--



[48/51] [abbrv] incubator-beam git commit: Use informative Instant formatter in WatermarkHold

2016-12-21 Thread kenn
Use informative Instant formatter in WatermarkHold


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa4958a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa4958a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa4958a6

Branch: refs/heads/python-sdk
Commit: fa4958a6140eb00ceee08b2468f7d88f17538794
Parents: 280a6a8
Author: Kenneth Knowles 
Authored: Mon Dec 19 20:40:47 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 13:45:37 2016 -0800

--
 .../apache/beam/runners/core/WatermarkHold.java  |  4 +++-
 .../sdk/transforms/windowing/BoundedWindow.java  | 19 +++
 2 files changed, 22 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa4958a6/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 7f1afcc..5e5f44d 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -207,7 +207,9 @@ class WatermarkHold implements 
Serializable {
 Instant shifted = 
windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window);
 checkState(!shifted.isBefore(timestamp),
 "OutputTimeFn moved element from %s to earlier time %s for window %s",
-timestamp, shifted, window);
+BoundedWindow.formatTimestamp(timestamp),
+BoundedWindow.formatTimestamp(shifted),
+window);
 checkState(timestamp.isAfter(window.maxTimestamp())
 || !shifted.isAfter(window.maxTimestamp()),
 "OutputTimeFn moved element from %s to %s which is beyond end of "

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa4958a6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
index 6da2495..74223b5 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
@@ -34,11 +34,30 @@ import org.joda.time.Instant;
 public abstract class BoundedWindow {
   // The min and max timestamps that won't overflow when they are converted to
   // usec.
+
+  /**
+   * The minimum value for any Beam timestamp. Often referred to as 
"-infinity".
+   *
+   * This value and {@link #TIMESTAMP_MAX_VALUE} are chosen so that their
+   * microseconds-since-epoch can be safely represented with a {@code long}.
+   */
   public static final Instant TIMESTAMP_MIN_VALUE =
   new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
+
+  /**
+   * The maximum value for any Beam timestamp. Often referred to as 
"+infinity".
+   *
+   * This value and {@link #TIMESTAMP_MIN_VALUE} are chosen so that their
+   * microseconds-since-epoch can be safely represented with a {@code long}.
+   */
   public static final Instant TIMESTAMP_MAX_VALUE =
   new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
 
+  /**
+   * Formats a {@link Instant} timestamp with additional Beam-specific 
metadata, such as indicating
+   * whether the timestamp is the end of the global window or one of the 
distinguished values {@link
+   * #TIMESTAMP_MIN_VALUE} or {@link #TIMESTAMP_MIN_VALUE}.
+   */
   public static String formatTimestamp(Instant timestamp) {
 if (timestamp.equals(TIMESTAMP_MIN_VALUE)) {
   return timestamp.toString() + " (TIMESTAMP_MIN_VALUE)";



[42/51] [abbrv] incubator-beam git commit: This closes #1670: Fixes for direct runner expansion and evaluation of stateful ParDo

2016-12-21 Thread kenn
This closes #1670: Fixes for direct runner expansion and evaluation of stateful 
ParDo

  Actually propagate and commit state in direct runner
  Fix windowing in direct runner Stateful ParDo


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ff395169
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ff395169
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ff395169

Branch: refs/heads/python-sdk
Commit: ff395169993d84cd920be21f11d9af8f8d8b
Parents: 7ee8c86 55176c3
Author: Kenneth Knowles 
Authored: Wed Dec 21 13:11:54 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 13:11:54 2016 -0800

--
 .../direct/ParDoMultiOverrideFactory.java   | 34 ++--
 .../direct/StatefulParDoEvaluatorFactory.java   |  1 +
 2 files changed, 32 insertions(+), 3 deletions(-)
--




[40/51] [abbrv] incubator-beam git commit: Actually propagate and commit state in direct runner

2016-12-21 Thread kenn
Actually propagate and commit state in direct runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/55176c38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/55176c38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/55176c38

Branch: refs/heads/python-sdk
Commit: 55176c385cc802be42b5467fbb2dcc9a1c7467ea
Parents: 4fb16e8
Author: Kenneth Knowles 
Authored: Tue Dec 20 15:59:45 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 13:11:22 2016 -0800

--
 .../apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java   | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55176c38/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 5f9d8f4..003df0f 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -233,6 +233,7 @@ final class StatefulParDoEvaluatorFactory implements Transfo
   StepTransformResult.>>withHold(
   delegateResult.getTransform(), 
delegateResult.getWatermarkHold())
   .withTimerUpdate(delegateResult.getTimerUpdate())
+  .withState(delegateResult.getState())
   .withAggregatorChanges(delegateResult.getAggregatorChanges())
   .withMetricUpdates(delegateResult.getLogicalMetricUpdates())
   
.addOutput(Lists.newArrayList(delegateResult.getOutputBundles()));



[10/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-extensions-join-library module to TestPipeline as a JUnit rule.

2016-12-21 Thread kenn
Migrated the beam-sdks-java-extensions-join-library module to TestPipeline as a 
JUnit rule.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/24ad1831
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/24ad1831
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/24ad1831

Branch: refs/heads/python-sdk
Commit: 24ad18319248a128a1c5db4f2bef8861f7361d9f
Parents: 75a4c91
Author: Stas Levin 
Authored: Tue Dec 20 00:01:31 2016 +0200
Committer: Kenneth Knowles 
Committed: Tue Dec 20 09:55:45 2016 -0800

--
 .../beam/sdk/extensions/joinlibrary/InnerJoinTest.java|  9 ++---
 .../sdk/extensions/joinlibrary/OuterLeftJoinTest.java | 10 +++---
 .../sdk/extensions/joinlibrary/OuterRightJoinTest.java| 10 +++---
 3 files changed, 20 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24ad1831/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java
--
diff --git 
a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java
 
b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java
index 423ab9c..1c120c2 100644
--- 
a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java
+++ 
b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java
@@ -19,13 +19,13 @@ package org.apache.beam.sdk.extensions.joinlibrary;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 /**
@@ -33,15 +33,16 @@ import org.junit.Test;
  */
 public class InnerJoinTest {
 
-  Pipeline p;
   List> leftListOfKv;
   List> listRightOfKv;
   List>> expectedResult;
 
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
   @Before
   public void setup() {
 
-p = TestPipeline.create();
 leftListOfKv = new ArrayList<>();
 listRightOfKv = new ArrayList<>();
 
@@ -131,11 +132,13 @@ public class InnerJoinTest {
 
   @Test(expected = NullPointerException.class)
   public void testJoinLeftCollectionNull() {
+p.enableAbandonedNodeEnforcement(false);
 Join.innerJoin(null, p.apply(Create.of(listRightOfKv)));
   }
 
   @Test(expected = NullPointerException.class)
   public void testJoinRightCollectionNull() {
+p.enableAbandonedNodeEnforcement(false);
 Join.innerJoin(p.apply(Create.of(leftListOfKv)), null);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24ad1831/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java
--
diff --git 
a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java
 
b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java
index c32163f..81f4fa3 100644
--- 
a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java
+++ 
b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java
@@ -19,13 +19,13 @@ package org.apache.beam.sdk.extensions.joinlibrary;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 
@@ -34,15 +34,16 @@ import org.junit.Test;
  */
 public class OuterLeftJoinTest {
 
-  Pipeline p;
   List> leftListOfKv;
   List> listRightOfKv;
   List>> expectedResult;
 
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
   @Before
   public void setup() {
 
-p = TestPipeline.create();
 leftListOfKv = new ArrayList<>();
 listRightOfKv = new ArrayList<>();
 
@@ -133,16 +134,19 @@ public class OuterLeftJoinTest 

[09/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-extensions-sorter module to TestPipeline as a JUnit rule.

2016-12-21 Thread kenn
Migrated the beam-sdks-java-extensions-sorter module to TestPipeline as a JUnit 
rule.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/63331aa8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/63331aa8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/63331aa8

Branch: refs/heads/python-sdk
Commit: 63331aa8aa6314e8469c23a4f4a89fbf287cbc5a
Parents: 24ad183
Author: Stas Levin 
Authored: Tue Dec 20 09:54:57 2016 +0200
Committer: Kenneth Knowles 
Committed: Tue Dec 20 09:55:45 2016 -0800

--
 .../org/apache/beam/sdk/extensions/sorter/SortValuesTest.java | 7 ---
 1 file changed, 4 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/63331aa8/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java
--
diff --git 
a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java
 
b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java
index ebfbd0e..4f77100 100644
--- 
a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java
+++ 
b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java
@@ -24,7 +24,6 @@ import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
 import java.util.Arrays;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -35,6 +34,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.hamcrest.TypeSafeMatcher;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -43,10 +43,11 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class SortValuesTest {
 
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
   @Test
   public void testSecondaryKeySorting() throws Exception {
-Pipeline p = TestPipeline.create();
-
 // Create a PCollection of > pairs.
 PCollection>> input =
 p.apply(



[02/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-core module to TestPipeline as a JUnit rule. Plus, fixed some checkstyle errors from previous modules' migration.

2016-12-21 Thread kenn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 2a89a18..3bc0a65 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -38,7 +38,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -76,12 +75,12 @@ import org.junit.runners.JUnit4;
 @SuppressWarnings("unchecked")
 public class CreateTest {
   @Rule public final ExpectedException thrown = ExpectedException.none();
+  @Rule public final TestPipeline p = TestPipeline.create();
+
 
   @Test
   @Category(RunnableOnService.class)
   public void testCreate() {
-Pipeline p = TestPipeline.create();
-
 PCollection output =
 p.apply(Create.of(LINES));
 
@@ -93,8 +92,6 @@ public class CreateTest {
   @Test
   @Category(RunnableOnService.class)
   public void testCreateEmpty() {
-Pipeline p = TestPipeline.create();
-
 PCollection output =
 p.apply(Create.of(NO_LINES)
 .withCoder(StringUtf8Coder.of()));
@@ -106,7 +103,7 @@ public class CreateTest {
 
   @Test
   public void testCreateEmptyInfersCoder() {
-Pipeline p = TestPipeline.create();
+p.enableAbandonedNodeEnforcement(false);
 
 PCollection output =
 p.apply(Create.of());
@@ -126,8 +123,6 @@ public class CreateTest {
 thrown.expectMessage(
 Matchers.containsString("Unable to infer a coder"));
 
-Pipeline p = TestPipeline.create();
-
 // Create won't infer a default coder in this case.
 p.apply(Create.of(new Record(), new Record2()));
 
@@ -137,8 +132,6 @@ public class CreateTest {
   @Test
   @Category(RunnableOnService.class)
   public void testCreateWithNullsAndValues() throws Exception {
-Pipeline p = TestPipeline.create();
-
 PCollection output =
 p.apply(Create.of(null, "test1", null, "test2", null)
 .withCoder(SerializableCoder.of(String.class)));
@@ -150,8 +143,6 @@ public class CreateTest {
   @Test
   @Category(NeedsRunner.class)
   public void testCreateParameterizedType() throws Exception {
-Pipeline p = TestPipeline.create();
-
 PCollection output =
 p.apply(Create.of(
 TimestampedValue.of("a", new Instant(0)),
@@ -216,7 +207,6 @@ public class CreateTest {
 Create.Values create =
 Create.of(elements).withCoder(new 
UnserializableRecord.UnserializableRecordCoder());
 
-TestPipeline p = TestPipeline.create();
 PAssert.that(p.apply(create))
 .containsInAnyOrder(
 new UnserializableRecord("foo"),
@@ -235,8 +225,6 @@ public class CreateTest {
   @Test
   @Category(RunnableOnService.class)
   public void testCreateTimestamped() {
-Pipeline p = TestPipeline.create();
-
 List data = Arrays.asList(
 TimestampedValue.of("a", new Instant(1L)),
 TimestampedValue.of("b", new Instant(2L)),
@@ -254,8 +242,6 @@ public class CreateTest {
   @Test
   @Category(RunnableOnService.class)
   public void testCreateTimestampedEmpty() {
-Pipeline p = TestPipeline.create();
-
 PCollection output = p
 .apply(Create.timestamped(new ArrayList())
 .withCoder(StringUtf8Coder.of()));
@@ -266,7 +252,7 @@ public class CreateTest {
 
   @Test
   public void testCreateTimestampedEmptyInfersCoder() {
-Pipeline p = TestPipeline.create();
+p.enableAbandonedNodeEnforcement(false);
 
 PCollection output = p
 .apply(Create.timestamped());
@@ -280,8 +266,6 @@ public class CreateTest {
 thrown.expectMessage(
 Matchers.containsString("Unable to infer a coder"));
 
-Pipeline p = TestPipeline.create();
-
 // Create won't infer a default coder in this case.
 PCollection c = p.apply(Create.timestamped(
 TimestampedValue.of(new Record(), new Instant(0)),
@@ -295,7 +279,6 @@ public class CreateTest {
   @Test
   @Category(RunnableOnService.class)
   public void testCreateWithVoidType() throws Exception {
-Pipeline p = TestPipeline.create();
 PCollection output = p.apply(Create.of((Void) null, (Void) null));
 PAssert.that(output).containsInAnyOrder((Void) null, (Void) null);
 p.run();
@@ -304,8 +287,6 @@ public class CreateTest {
   @Test
   @Category(RunnableOnService.class)
   public void testCreateWithKVVoidType() throws Exception {
-Pipeline p = TestPipeline.create();
-
 PCollection

[08/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-io-google-cloud-platform module to TestPipeline as a JUnit rule.

2016-12-21 Thread kenn
Migrated the beam-sdks-java-io-google-cloud-platform module to TestPipeline as 
a JUnit rule.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6dea0992
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6dea0992
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6dea0992

Branch: refs/heads/python-sdk
Commit: 6dea0992d9976b39232cf846906831feaa25ec43
Parents: 63331aa
Author: Stas Levin 
Authored: Tue Dec 20 13:26:07 2016 +0200
Committer: Kenneth Knowles 
Committed: Tue Dec 20 09:55:45 2016 -0800

--
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 25 +---
 .../sdk/io/gcp/bigtable/BigtableIOTest.java | 13 ++
 2 files changed, 25 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6dea0992/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
--
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index dc566d2..4ddfdea 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -643,6 +643,7 @@ public class BigQueryIOTest implements Serializable {
 }
   }
 
+  @Rule public final transient TestPipeline p = TestPipeline.create();
   @Rule public transient ExpectedException thrown = ExpectedException.none();
   @Rule public transient ExpectedLogs logged = 
ExpectedLogs.none(BigQueryIO.class);
   @Rule public transient TemporaryFolder testFolder = new TemporaryFolder();
@@ -1370,7 +1371,7 @@ public class BigQueryIOTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testBuildWriteWithoutTable() {
-Pipeline p = TestPipeline.create();
+
 thrown.expect(IllegalStateException.class);
 thrown.expectMessage("must set the table reference");
 p.apply(Create.of().withCoder(TableRowJsonCoder.of()))
@@ -1591,9 +1592,11 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testWriteValidateFailsCreateNoSchema() {
+p.enableAbandonedNodeEnforcement(false);
+
 thrown.expect(IllegalArgumentException.class);
 thrown.expectMessage("no schema was provided");
-TestPipeline.create()
+p
 .apply(Create.of())
 .apply(BigQueryIO.Write
 .to("dataset.table")
@@ -1602,9 +1605,11 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testWriteValidateFailsTableAndTableSpec() {
+p.enableAbandonedNodeEnforcement(false);
+
 thrown.expect(IllegalStateException.class);
 thrown.expectMessage("Cannot set both a table reference and a table 
function");
-TestPipeline.create()
+p
 .apply(Create.of())
 .apply(BigQueryIO.Write
 .to("dataset.table")
@@ -1618,9 +1623,11 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testWriteValidateFailsNoTableAndNoTableSpec() {
+p.enableAbandonedNodeEnforcement(false);
+
 thrown.expect(IllegalStateException.class);
 thrown.expectMessage("must set the table reference of a BigQueryIO.Write 
transform");
-TestPipeline.create()
+p
 .apply(Create.of())
 .apply("name", BigQueryIO.Write.withoutValidation());
   }
@@ -1950,7 +1957,6 @@ public class BigQueryIOTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testPassThroughThenCleanup() throws Exception {
-Pipeline p = TestPipeline.create();
 
 PCollection output = p
 .apply(Create.of(1, 2, 3))
@@ -1968,7 +1974,6 @@ public class BigQueryIOTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testPassThroughThenCleanupExecuted() throws Exception {
-Pipeline p = TestPipeline.create();
 
 p.apply(Create.of())
 .apply(new PassThroughThenCleanup(new CleanupOperation() {
@@ -2025,6 +2030,8 @@ public class BigQueryIOTest implements Serializable {
 
   private void testWritePartition(long numFiles, long fileSize, long 
expectedNumPartitions)
   throws Exception {
+p.enableAbandonedNodeEnforcement(false);
+
 List expectedPartitionIds = Lists.newArrayList();
 for (long i = 1; i <= expectedNumPartitions; ++i) {
   expectedPartitionIds.add(i);
@@ -2044,7 +2051,7 @@ public class BigQueryIOTest implements Serializable {
 new TupleTag>("singlePartitionTag") 

[34/51] [abbrv] incubator-beam git commit: Move ExecutionContext and related classes to runners-core

2016-12-21 Thread kenn
Move ExecutionContext and related classes to runners-core


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9d2b8e09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9d2b8e09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9d2b8e09

Branch: refs/heads/python-sdk
Commit: 9d2b8e09bcb5e04017b487e1a919d335875dbfc0
Parents: 64336e4
Author: Kenneth Knowles 
Authored: Thu Dec 15 20:20:34 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 10:10:00 2016 -0800

--
 .../operators/ApexParDoOperator.java|   2 +-
 .../apex/translation/utils/NoOpStepContext.java |   3 +-
 .../beam/runners/core/AggregatorFactory.java|   1 -
 .../beam/runners/core/BaseExecutionContext.java | 176 +++
 .../apache/beam/runners/core/DoFnRunners.java   |   2 +-
 .../beam/runners/core/ExecutionContext.java | 102 +++
 .../beam/runners/core/SimpleDoFnRunner.java |   2 +-
 .../beam/runners/core/SimpleOldDoFnRunner.java  |   2 +-
 .../beam/runners/core/SimpleDoFnRunnerTest.java |   2 +-
 .../runners/core/SimpleOldDoFnRunnerTest.java   |   3 +-
 .../runners/direct/AggregatorContainer.java |   2 +-
 .../runners/direct/DirectExecutionContext.java  |   6 +-
 .../beam/runners/direct/EvaluationContext.java  |   2 +-
 .../runners/direct/AggregatorContainerTest.java |   2 +-
 .../wrappers/streaming/DoFnOperator.java|   2 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |   2 +-
 .../spark/aggregators/SparkAggregators.java |   2 +-
 .../spark/translation/SparkProcessContext.java  |   2 +-
 .../beam/sdk/util/BaseExecutionContext.java | 174 --
 .../apache/beam/sdk/util/ExecutionContext.java  | 100 ---
 20 files changed, 295 insertions(+), 294 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index a3d3a97..c41cd45 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -41,6 +41,7 @@ import org.apache.beam.runners.core.AggregatorFactory;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.sdk.coders.Coder;
@@ -50,7 +51,6 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnAdapters;
 import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.util.ExecutionContext;
 import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.UserCodeException;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
index 078f95f..f169ae6 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
@@ -19,10 +19,9 @@ package org.apache.beam.runners.apex.translation.utils;
 
 import java.io.IOException;
 import java.io.Serializable;
-
+import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ExecutionContext;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.StateInternals;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java

[27/51] [abbrv] incubator-beam git commit: Update Dataflow worker to beam-master-20161220

2016-12-21 Thread kenn
Update Dataflow worker to beam-master-20161220


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a3f68d34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a3f68d34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a3f68d34

Branch: refs/heads/python-sdk
Commit: a3f68d343d018d99a04ac5c9134a11f1bc74935b
Parents: 2f4b803
Author: Kenneth Knowles 
Authored: Tue Dec 20 14:05:19 2016 -0800
Committer: Kenneth Knowles 
Committed: Tue Dec 20 14:05:35 2016 -0800

--
 .../org/apache/beam/runners/dataflow/dataflow.properties | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3f68d34/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
--
diff --git 
a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
 
b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
index 27a518f..bf08e83 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
+++ 
b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
@@ -18,6 +18,6 @@
 
 environment.major.version=6
 
-worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161216
+worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161220
 
-worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161216
+worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161220



[14/51] [abbrv] incubator-beam git commit: This closes #1664: [BEAM-1176] Migrating tests to use TestPipeline as a JUnit rule

2016-12-21 Thread kenn
This closes #1664: [BEAM-1176] Migrating tests to use TestPipeline as a JUnit 
rule

  Migrated the beam-sdks-java-io-java8tests module to TestPipeline as a JUnit 
rule + fixed WithTimestampsJava8Test.withTimestampsLambdaShouldApplyTimestamps.
  Migrated the beam-sdks-java-io-mongodb module to TestPipeline as a JUnit rule.
  Migrated the beam-sdks-java-io-kinesis module to TestPipeline as a JUnit rule.
  Migrated the beam-sdks-java-io-kafka module to TestPipeline as a JUnit rule.
  Migrated the beam-sdks-java-io-jms module to TestPipeline as a JUnit rule.
  Migrated the beam-sdks-java-io-jdbc module to TestPipeline as a JUnit rule.
  Migrated the beam-sdks-java-io-google-cloud-platform module to TestPipeline 
as a JUnit rule.
  Migrated the beam-sdks-java-extensions-sorter module to TestPipeline as a 
JUnit rule.
  Migrated the beam-sdks-java-extensions-join-library module to TestPipeline as 
a JUnit rule.
  Migrated the beam-sdks-java-core module to TestPipeline as a JUnit rule. 
Plus, fixed some checkstyle errors from previous modules' migration.
  Migrated the beam-runners-direct-java module to TestPipeline as a JUnit rule.
  Migrated the beam-runners-core module to TestPipeline as a JUnit rule.
  Migrated the beam-examples-java8 module to TestPipeline as a JUnit rule.
  Migrated the beam-examples-java module to TestPipeline as a JUnit rule.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/22e25a47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/22e25a47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/22e25a47

Branch: refs/heads/python-sdk
Commit: 22e25a47e2edc7b7f702eaca12630f6de7195657
Parents: b3de17b 4b23d42
Author: Kenneth Knowles 
Authored: Tue Dec 20 09:55:57 2016 -0800
Committer: Kenneth Knowles 
Committed: Tue Dec 20 09:55:57 2016 -0800

--
 .../org/apache/beam/examples/WordCountTest.java |   7 +-
 .../examples/complete/AutoCompleteTest.java |  11 +-
 .../beam/examples/complete/TfIdfTest.java   |   6 +-
 .../complete/TopWikipediaSessionsTest.java  |   7 +-
 .../examples/cookbook/DistinctExampleTest.java  |   9 +-
 .../examples/cookbook/JoinExamplesTest.java |   6 +-
 .../examples/cookbook/TriggerExampleTest.java   |   6 +-
 .../examples/MinimalWordCountJava8Test.java |   6 +-
 .../examples/complete/game/GameStatsTest.java   |   7 +-
 .../complete/game/HourlyTeamScoreTest.java  |   5 +-
 .../examples/complete/game/LeaderBoardTest.java |  11 +-
 .../examples/complete/game/UserScoreTest.java   |  10 +-
 .../core/PushbackSideInputDoFnRunnerTest.java   |   5 +-
 .../beam/runners/core/SplittableParDoTest.java  |  10 +-
 .../UnboundedReadFromBoundedSourceTest.java |   6 +-
 .../direct/BoundedReadEvaluatorFactoryTest.java |  13 +-
 .../direct/CloningBundleFactoryTest.java|   8 +-
 .../runners/direct/CommittedResultTest.java |   6 +-
 .../CopyOnAccessInMemoryStateInternalsTest.java |   7 +-
 .../runners/direct/DirectGraphVisitorTest.java  |   3 +-
 .../runners/direct/EvaluationContextTest.java   |   7 +-
 .../direct/FlattenEvaluatorFactoryTest.java |   6 +-
 .../direct/GroupByKeyEvaluatorFactoryTest.java  |   5 +-
 .../GroupByKeyOnlyEvaluatorFactoryTest.java |   5 +-
 .../ImmutabilityCheckingBundleFactoryTest.java  |   4 +-
 .../ImmutabilityEnforcementFactoryTest.java |   3 +-
 .../direct/ImmutableListBundleFactoryTest.java  |  14 ++-
 .../direct/KeyedPValueTrackingVisitorTest.java  |   6 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |   5 +-
 .../runners/direct/SideInputContainerTest.java  |   5 +-
 .../StatefulParDoEvaluatorFactoryTest.java  |   7 +-
 .../runners/direct/StepTransformResultTest.java |   5 +-
 .../direct/TestStreamEvaluatorFactoryTest.java  |   5 +-
 .../runners/direct/TransformExecutorTest.java   |   4 +-
 .../UnboundedReadEvaluatorFactoryTest.java  |   9 +-
 .../direct/ViewEvaluatorFactoryTest.java|   5 +-
 .../direct/WatermarkCallbackExecutorTest.java   |   5 +-
 .../runners/direct/WatermarkManagerTest.java|   6 +-
 .../direct/WindowEvaluatorFactoryTest.java  |   5 +-
 .../direct/WriteWithShardingFactoryTest.java|  14 +--
 .../java/org/apache/beam/sdk/PipelineTest.java  |  37 +++---
 .../apache/beam/sdk/coders/AvroCoderTest.java   |  11 +-
 .../beam/sdk/coders/CoderRegistryTest.java  |   6 +-
 .../beam/sdk/coders/SerializableCoderTest.java  |   7 +-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  16 +--
 .../io/BoundedReadFromUnboundedSourceTest.java  |   6 +-
 .../beam/sdk/io/CompressedSourceTest.java   |  12 +-
 .../apache/beam/sdk/io/CountingInputTest.java   |  12 +-
 .../apache/beam/sdk/io/CountingSourceTest.java  |  13 +-
 .../apache/beam/sdk/io/FileBasedSourceTest.java |   4 +-
 .../beam/sdk/io/PubsubUnboundedSinkTest.java|  10 +-
 

[39/51] [abbrv] incubator-beam git commit: This closes #1160: Support set and delete of timer by ID in InMemoryTimerInternals

2016-12-21 Thread kenn
This closes #1160: Support set and delete of timer by ID in 
InMemoryTimerInternals


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7ee8c86d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7ee8c86d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7ee8c86d

Branch: refs/heads/python-sdk
Commit: 7ee8c86d3b0553d8cb7de60b0dc1a03103dfbbc5
Parents: a9447a2 df2e540
Author: Kenneth Knowles 
Authored: Wed Dec 21 11:02:02 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 11:02:02 2016 -0800

--
 .../runners/core/InMemoryTimerInternals.java|  65 +++
 .../core/InMemoryTimerInternalsTest.java| 112 +--
 2 files changed, 120 insertions(+), 57 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7ee8c86d/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
--
diff --cc 
runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 5ddd5a7,292ac23..2c3d78a
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@@ -104,10 -106,9 +106,10 @@@ public class InMemoryTimerInternals imp
@Override
public void setTimer(StateNamespace namespace, String timerId, Instant 
target,
TimeDomain timeDomain) {
- throw new UnsupportedOperationException("Setting a timer by ID is not yet 
supported.");
+ setTimer(TimerData.of(timerId, namespace, target, timeDomain));
}
  
 +  @Deprecated
@Override
public void setTimer(TimerData timerData) {
  WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), 
timerData);
@@@ -117,17 -133,13 +134,20 @@@
}
  
@Override
 +  public void deleteTimer(StateNamespace namespace, String timerId, 
TimeDomain timeDomain) {
 +throw new UnsupportedOperationException("Canceling a timer by ID is not 
yet supported.");
 +  }
 +
 +  @Deprecated
 +  @Override
public void deleteTimer(StateNamespace namespace, String timerId) {
- throw new UnsupportedOperationException("Canceling a timer by ID is not 
yet supported.");
+ TimerData existing = existingTimers.get(namespace, timerId);
+ if (existing != null) {
+   deleteTimer(existing);
+ }
}
  
 +  @Deprecated
@Override
public void deleteTimer(TimerData timer) {
  WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), 
timer);



[23/51] [abbrv] incubator-beam git commit: Fixup usage of canonical name with name since canonical name != name for inner classes.

2016-12-21 Thread kenn
Fixup usage of canonical name with name since canonical name != name for inner 
classes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96d39314
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96d39314
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96d39314

Branch: refs/heads/python-sdk
Commit: 96d393147c365d0911a091d0b3600fef621709f4
Parents: ef1a858
Author: Luke Cwik 
Authored: Tue Dec 20 11:47:42 2016 -0800
Committer: Luke Cwik 
Committed: Tue Dec 20 13:04:24 2016 -0800

--
 .../org/apache/beam/runners/dataflow/DataflowRunnerTest.java | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96d39314/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
--
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index b29c4cd..21d575a 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -267,7 +267,7 @@ public class DataflowRunnerTest {
 "--runner=DataflowRunner",
 "--tempLocation=/tmp/not/a/gs/path",
 "--project=test-project",
-"--credentialFactoryClass=" + 
NoopCredentialFactory.class.getCanonicalName(),
+"--credentialFactoryClass=" + NoopCredentialFactory.class.getName(),
 };
 
 try {
@@ -286,7 +286,7 @@ public class DataflowRunnerTest {
 "--runner=DataflowRunner",
 "--tempLocation=gs://does/not/exist",
 "--project=test-project",
-"--credentialFactoryClass=" + 
NoopCredentialFactory.class.getCanonicalName(),
+"--credentialFactoryClass=" + NoopCredentialFactory.class.getName(),
 };
 
 try {
@@ -306,8 +306,8 @@ public class DataflowRunnerTest {
 "--runner=DataflowRunner",
 "--tempLocation=/tmp/testing",
 "--project=test-project",
-"--credentialFactoryClass=" + 
NoopCredentialFactory.class.getCanonicalName(),
-"--pathValidatorClass=" + NoopPathValidator.class.getCanonicalName(),
+"--credentialFactoryClass=" + NoopCredentialFactory.class.getName(),
+"--pathValidatorClass=" + NoopPathValidator.class.getName(),
 };
 // Should not crash, because gcpTempLocation should get set from 
tempLocation
 TestPipeline.fromOptions(PipelineOptionsFactory.fromArgs(args).create());



[05/51] [abbrv] incubator-beam git commit: Migrated the beam-runners-core module to TestPipeline as a JUnit rule.

2016-12-21 Thread kenn
Migrated the beam-runners-core module to TestPipeline as a JUnit rule.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b6710251
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b6710251
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b6710251

Branch: refs/heads/python-sdk
Commit: b6710251d8bb5d1968aea2258ce5878b43368dd5
Parents: 7106e88
Author: Stas Levin 
Authored: Sun Dec 18 18:51:31 2016 +0200
Committer: Kenneth Knowles 
Committed: Tue Dec 20 09:55:45 2016 -0800

--
 .../runners/core/PushbackSideInputDoFnRunnerTest.java |  5 -
 .../org/apache/beam/runners/core/SplittableParDoTest.java | 10 --
 .../runners/core/UnboundedReadFromBoundedSourceTest.java  |  5 +++--
 3 files changed, 15 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6710251/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
--
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
index a1cdbf6..251c7c2 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
@@ -47,6 +47,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -63,10 +64,12 @@ public class PushbackSideInputDoFnRunnerTest {
   private TestDoFnRunner underlying;
   private PCollectionView singletonView;
 
+  @Rule
+  public TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
   @Before
   public void setup() {
 MockitoAnnotations.initMocks(this);
-TestPipeline p = TestPipeline.create();
 PCollection created = p.apply(Create.of(1, 2, 3));
 singletonView =
 created

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6710251/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
--
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index cf96b66..0f0b106 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -57,6 +57,7 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -132,9 +133,13 @@ public class SplittableParDoTest {
 return ParDo.of(fn).withOutputTags(MAIN_OUTPUT_TAG, TupleTagList.empty());
   }
 
+  @Rule
+  public TestPipeline pipeline = TestPipeline.create();
+
   @Test
   public void testBoundednessForBoundedFn() {
-Pipeline pipeline = TestPipeline.create();
+pipeline.enableAbandonedNodeEnforcement(false);
+
 DoFn boundedFn = new BoundedFakeFn();
 assertEquals(
 "Applying a bounded SDF to a bounded collection produces a bounded 
collection",
@@ -154,7 +159,8 @@ public class SplittableParDoTest {
 
   @Test
   public void testBoundednessForUnboundedFn() {
-Pipeline pipeline = TestPipeline.create();
+pipeline.enableAbandonedNodeEnforcement(false);
+
 DoFn unboundedFn = new UnboundedFakeFn();
 assertEquals(
 "Applying an unbounded SDF to a bounded collection produces a bounded 
collection",

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6710251/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
--
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
index 7fd8807..86450f2 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
+++ 

[01/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-core module to TestPipeline as a JUnit rule. Plus, fixed some checkstyle errors from previous modules' migration.

2016-12-21 Thread kenn
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 3b4fd5c7d -> 3454d691f


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index 3bf63fd..1d8b32c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -84,12 +84,14 @@ public class ViewTest implements Serializable {
   // anonymous inner classes inside the non-static test methods.
 
   @Rule
+  public final transient TestPipeline pipeline = TestPipeline.create();
+
+  @Rule
   public transient ExpectedException thrown = ExpectedException.none();
 
   @Test
   @Category(RunnableOnService.class)
   public void testSingletonSideInput() {
-Pipeline pipeline = TestPipeline.create();
 
 final PCollectionView view =
 pipeline.apply("Create47", 
Create.of(47)).apply(View.asSingleton());
@@ -112,7 +114,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testWindowedSingletonSideInput() {
-Pipeline pipeline = TestPipeline.create();
 
 final PCollectionView view =
 pipeline.apply("Create47", Create.timestamped(
@@ -143,7 +144,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testEmptySingletonSideInput() throws Exception {
-Pipeline pipeline = TestPipeline.create();
 
 final PCollectionView view =
 pipeline.apply("CreateEmptyIntegers", 
Create.of().withCoder(VarIntCoder.of()))
@@ -169,7 +169,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testNonSingletonSideInput() throws Exception {
-Pipeline pipeline = TestPipeline.create();
 
 PCollection oneTwoThree = pipeline.apply(Create.of(1, 2, 
3));
 final PCollectionView view = 
oneTwoThree.apply(View.asSingleton());
@@ -194,7 +193,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testListSideInput() {
-Pipeline pipeline = TestPipeline.create();
 
 final PCollectionView view =
 pipeline.apply("CreateSideInput", Create.of(11, 13, 17, 
23)).apply(View.asList());
@@ -221,7 +219,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testWindowedListSideInput() {
-Pipeline pipeline = TestPipeline.create();
 
 final PCollectionView view =
 pipeline.apply("CreateSideInput", Create.timestamped(
@@ -262,7 +259,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testEmptyListSideInput() throws Exception {
-Pipeline pipeline = TestPipeline.create();
 
 final PCollectionView view =
 pipeline.apply("CreateEmptyView", 
Create.of().withCoder(VarIntCoder.of()))
@@ -289,7 +285,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testListSideInputIsImmutable() {
-Pipeline pipeline = TestPipeline.create();
 
 final PCollectionView view =
 pipeline.apply("CreateSideInput", 
Create.of(11)).apply(View.asList());
@@ -335,7 +330,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testIterableSideInput() {
-Pipeline pipeline = TestPipeline.create();
 
 final PCollectionView view =
 pipeline.apply("CreateSideInput", Create.of(11, 13, 17, 23))
@@ -361,7 +355,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testWindowedIterableSideInput() {
-Pipeline pipeline = TestPipeline.create();
 
 final PCollectionView view =
 pipeline.apply("CreateSideInput", Create.timestamped(
@@ -401,7 +394,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testEmptyIterableSideInput() throws Exception {
-Pipeline pipeline = TestPipeline.create();
 
 final PCollectionView view =
 pipeline.apply("CreateEmptyView", 
Create.of().withCoder(VarIntCoder.of()))
@@ -427,7 +419,6 @@ public class ViewTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testIterableSideInputIsImmutable() {
-Pipeline pipeline = TestPipeline.create();
 
 final PCollectionView view =
 pipeline.apply("CreateSideInput", 
Create.of(11)).apply(View.asIterable());
@@ -459,7 +450,6 @@ public class ViewTest implements Serializable {
   @Test
   

[30/51] [abbrv] incubator-beam git commit: Provide local tags in PInput, POutput expansions

2016-12-21 Thread kenn
Provide local tags in PInput, POutput expansions

Output an ordered colleciton in PInput and POutput expansions.

This provides information that is necessary to reconstruct a PInput
or POutput from its expansion.

Implement PCollectionList.equals, PCollectionTuple.equals


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/34373c21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/34373c21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/34373c21

Branch: refs/heads/python-sdk
Commit: 34373c21ed67696235d88ef40d50e31c77b84c33
Parents: 6a05d7f
Author: Thomas Groh 
Authored: Tue Dec 6 11:03:52 2016 -0800
Committer: Thomas Groh 
Committed: Tue Dec 20 15:18:55 2016 -0800

--
 .../beam/runners/direct/DirectGraphVisitor.java |  18 +--
 .../beam/runners/direct/EvaluationContext.java  |   7 +-
 .../direct/KeyedPValueTrackingVisitor.java  |  16 ++-
 .../beam/runners/direct/WatermarkManager.java   |  19 +--
 .../apache/beam/runners/spark/SparkRunner.java  |  13 ++-
 .../beam/sdk/runners/TransformHierarchy.java|  49 
 .../transforms/join/KeyedPCollectionTuple.java  |   9 +-
 .../java/org/apache/beam/sdk/values/PBegin.java |   4 +-
 .../apache/beam/sdk/values/PCollectionList.java |  65 +++
 .../beam/sdk/values/PCollectionTuple.java   |  28 -
 .../java/org/apache/beam/sdk/values/PDone.java  |   4 +-
 .../java/org/apache/beam/sdk/values/PInput.java |   4 +-
 .../org/apache/beam/sdk/values/POutput.java |   4 +-
 .../java/org/apache/beam/sdk/values/PValue.java |  10 ++
 .../org/apache/beam/sdk/values/PValueBase.java  |  11 +-
 .../apache/beam/sdk/values/TaggedPValue.java|  42 +++
 .../sdk/runners/TransformHierarchyTest.java |  23 +++-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  34 ++
 .../beam/sdk/values/PCollectionListTest.java| 117 +++
 .../beam/sdk/values/PCollectionTupleTest.java   |  70 +++
 20 files changed, 449 insertions(+), 98 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
index 0283d03..425bbf1 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 
 /**
  * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each 
{@link PValue} in the
@@ -79,14 +80,16 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
 
   @Override
   public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-toFinalize.removeAll(node.getInputs());
+for (TaggedPValue consumed : node.getInputs()) {
+  toFinalize.remove(consumed.getValue());
+}
 AppliedPTransform appliedTransform = getAppliedTransform(node);
 stepNames.put(appliedTransform, genStepName());
 if (node.getInputs().isEmpty()) {
   rootTransforms.add(appliedTransform);
 } else {
-  for (PValue value : node.getInputs()) {
-primitiveConsumers.put(value, appliedTransform);
+  for (TaggedPValue value : node.getInputs()) {
+primitiveConsumers.put(value.getValue(), appliedTransform);
   }
 }
   }
@@ -96,15 +99,12 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
 toFinalize.add(value);
 
 AppliedPTransform appliedTransform = 
getAppliedTransform(producer);
+if (value instanceof PCollectionView) {
+  views.add((PCollectionView) value);
+}
 if (!producers.containsKey(value)) {
   producers.put(value, appliedTransform);
 }
-   if (value instanceof PCollectionView) {
- views.add((PCollectionView) value);
-   }
-   if (!producers.containsKey(value)) {
- producers.put(value, appliedTransform);
-   }
   }
 
   private AppliedPTransform 
getAppliedTransform(TransformHierarchy.Node node) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
--
diff --git 

[31/51] [abbrv] incubator-beam git commit: This closes #1569

2016-12-21 Thread kenn
This closes #1569


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aadcf3a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aadcf3a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aadcf3a1

Branch: refs/heads/python-sdk
Commit: aadcf3a1203b257961a1a474acf74e6bbca1e2ad
Parents: 6a05d7f 34373c2
Author: Thomas Groh 
Authored: Tue Dec 20 15:18:55 2016 -0800
Committer: Thomas Groh 
Committed: Tue Dec 20 15:18:55 2016 -0800

--
 .../beam/runners/direct/DirectGraphVisitor.java |  18 +--
 .../beam/runners/direct/EvaluationContext.java  |   7 +-
 .../direct/KeyedPValueTrackingVisitor.java  |  16 ++-
 .../beam/runners/direct/WatermarkManager.java   |  19 +--
 .../apache/beam/runners/spark/SparkRunner.java  |  13 ++-
 .../beam/sdk/runners/TransformHierarchy.java|  49 
 .../transforms/join/KeyedPCollectionTuple.java  |   9 +-
 .../java/org/apache/beam/sdk/values/PBegin.java |   4 +-
 .../apache/beam/sdk/values/PCollectionList.java |  65 +++
 .../beam/sdk/values/PCollectionTuple.java   |  28 -
 .../java/org/apache/beam/sdk/values/PDone.java  |   4 +-
 .../java/org/apache/beam/sdk/values/PInput.java |   4 +-
 .../org/apache/beam/sdk/values/POutput.java |   4 +-
 .../java/org/apache/beam/sdk/values/PValue.java |  10 ++
 .../org/apache/beam/sdk/values/PValueBase.java  |  11 +-
 .../apache/beam/sdk/values/TaggedPValue.java|  42 +++
 .../sdk/runners/TransformHierarchyTest.java |  23 +++-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  34 ++
 .../beam/sdk/values/PCollectionListTest.java| 117 +++
 .../beam/sdk/values/PCollectionTupleTest.java   |  70 +++
 20 files changed, 449 insertions(+), 98 deletions(-)
--




[33/51] [abbrv] incubator-beam git commit: Require TimeDomain to delete a timer

2016-12-21 Thread kenn
Require TimeDomain to delete a timer


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/35a02740
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/35a02740
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/35a02740

Branch: refs/heads/python-sdk
Commit: 35a02740748182ee52729d8bfb621a3c342b8312
Parents: 0d0a5e2
Author: Kenneth Knowles 
Authored: Tue Dec 20 20:09:25 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 08:20:28 2016 -0800

--
 .../operators/ApexGroupByKeyOperator.java  |  8 
 .../beam/runners/core/InMemoryTimerInternals.java  |  8 
 .../beam/runners/direct/DirectTimerInternals.java  |  8 
 .../wrappers/streaming/WindowDoFnOperator.java |  9 +
 .../org/apache/beam/sdk/util/TimerInternals.java   | 17 +++--
 5 files changed, 48 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
--
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 48ac177..49ec1c8 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -425,12 +425,19 @@ public class ApexGroupByKeyOperator implements 
Operator {
*/
   public class ApexTimerInternals implements TimerInternals {
 
+@Deprecated
 @Override
 public void setTimer(TimerData timerData) {
   registerActiveTimer(context.element().key(), timerData);
 }
 
 @Override
+public void deleteTimer(StateNamespace namespace, String timerId, 
TimeDomain timeDomain) {
+  throw new UnsupportedOperationException("Canceling of timer by ID is not 
yet supported.");
+}
+
+@Deprecated
+@Override
 public void deleteTimer(TimerData timerKey) {
   unregisterActiveTimer(context.element().key(), timerKey);
 }
@@ -463,6 +470,7 @@ public class ApexGroupByKeyOperator implements 
Operator {
   throw new UnsupportedOperationException("Setting timer by ID not yet 
supported.");
 }
 
+@Deprecated
 @Override
 public void deleteTimer(StateNamespace namespace, String timerId) {
   throw new UnsupportedOperationException("Canceling of timer by ID is not 
yet supported.");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 5fcd088..5ddd5a7 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -107,6 +107,7 @@ public class InMemoryTimerInternals implements 
TimerInternals {
 throw new UnsupportedOperationException("Setting a timer by ID is not yet 
supported.");
   }
 
+  @Deprecated
   @Override
   public void setTimer(TimerData timerData) {
 WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), 
timerData);
@@ -116,10 +117,17 @@ public class InMemoryTimerInternals implements 
TimerInternals {
   }
 
   @Override
+  public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain 
timeDomain) {
+throw new UnsupportedOperationException("Canceling a timer by ID is not 
yet supported.");
+  }
+
+  @Deprecated
+  @Override
   public void deleteTimer(StateNamespace namespace, String timerId) {
 throw new UnsupportedOperationException("Canceling a timer by ID is not 
yet supported.");
   }
 
+  @Deprecated
   @Override
   public void deleteTimer(TimerData timer) {
 WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), 
timer);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
 

[13/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-io-kinesis module to TestPipeline as a JUnit rule.

2016-12-21 Thread kenn
Migrated the beam-sdks-java-io-kinesis module to TestPipeline as a JUnit rule.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/950aa7e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/950aa7e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/950aa7e1

Branch: refs/heads/python-sdk
Commit: 950aa7e1d9c50167933eb192a16e15700e483377
Parents: 12be8b1
Author: Stas Levin 
Authored: Tue Dec 20 17:44:15 2016 +0200
Committer: Kenneth Knowles 
Committed: Tue Dec 20 09:55:46 2016 -0800

--
 .../org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java   | 7 +--
 .../java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java  | 6 --
 2 files changed, 9 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/950aa7e1/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
--
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
index f0ab46c..075805e 100644
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
@@ -22,19 +22,23 @@ import static com.google.common.collect.Lists.newArrayList;
 import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
 import com.google.common.collect.Iterables;
 import java.util.List;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.DateTime;
+import org.junit.Rule;
 import org.junit.Test;
 
 /**
  * Tests {@link AmazonKinesisMock}.
  */
 public class KinesisMockReadTest {
+
+@Rule
+public final transient TestPipeline p = TestPipeline.create();
+
 @Test
 public void readsDataFromMockKinesis() {
 int noOfShards = 3;
@@ -42,7 +46,6 @@ public class KinesisMockReadTest {
 List testData =
 provideTestData(noOfShards, noOfEventsPerShard);
 
-final Pipeline p = TestPipeline.create();
 PCollection result = p.
 apply(
 KinesisIO.Read.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/950aa7e1/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
--
diff --git 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
index 73a2455..690cc11 100644
--- 
a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
+++ 
b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
@@ -31,7 +31,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
@@ -43,6 +42,7 @@ import org.apache.commons.lang.RandomStringUtils;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 
 /**
@@ -53,6 +53,8 @@ public class KinesisReaderIT {
 private static final long PIPELINE_STARTUP_TIME = 
TimeUnit.SECONDS.toMillis(10);
 private ExecutorService singleThreadExecutor = newSingleThreadExecutor();
 
+@Rule
+public final transient TestPipeline p = TestPipeline.create();
 
 @Ignore
 @Test
@@ -76,7 +78,7 @@ public class KinesisReaderIT {
 
 private Future startTestPipeline(List testData, 
KinesisTestOptions options)
 throws InterruptedException {
-final Pipeline p = TestPipeline.create();
+
 PCollection result = p.
 apply(KinesisIO.Read.
 from(options.getAwsKinesisStream(), Instant.now()).



[24/51] [abbrv] incubator-beam git commit: Provide a better error message for non-existing gcpTempLocation

2016-12-21 Thread kenn
Provide a better error message for non-existing gcpTempLocation

gcpTempLocation will default to using the value for tmpLocation, as long
as the value is a valid GCP path. Non-valid GCP paths are silently
discarded.

This change removes existence validation from the default value logic
such that downstream validation can provide a better error message.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ef1a8583
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ef1a8583
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ef1a8583

Branch: refs/heads/python-sdk
Commit: ef1a858347e475cd15f7dcd8873464f506527b2a
Parents: 2f4b803
Author: Scott Wegner 
Authored: Tue Dec 6 14:19:12 2016 -0800
Committer: Luke Cwik 
Committed: Tue Dec 20 13:04:24 2016 -0800

--
 .../beam/runners/dataflow/DataflowRunner.java   | 25 
 .../options/DataflowPipelineOptions.java| 19 -
 .../runners/dataflow/DataflowRunnerTest.java| 42 +++-
 .../options/DataflowPipelineOptionsTest.java| 20 ++
 .../org/apache/beam/sdk/options/GcpOptions.java | 19 +
 .../apache/beam/sdk/util/GcsPathValidator.java  |  3 +-
 .../apache/beam/sdk/options/GcpOptionsTest.java | 32 +--
 .../beam/sdk/util/GcsPathValidatorTest.java | 15 +--
 8 files changed, 114 insertions(+), 61 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1a8583/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
--
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 711b1b0..1a15eaf 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -244,14 +244,23 @@ public class DataflowRunner extends 
PipelineRunner {
 }
 
 PathValidator validator = dataflowOptions.getPathValidator();
-checkArgument(
-!isNullOrEmpty(dataflowOptions.getGcpTempLocation()),
-"DataflowRunner requires gcpTempLocation, and it is missing in 
PipelineOptions.");
-
validator.validateOutputFilePrefixSupported(dataflowOptions.getGcpTempLocation());
-checkArgument(
-!isNullOrEmpty(dataflowOptions.getStagingLocation()),
-"DataflowRunner requires stagingLocation, and it is missing in 
PipelineOptions.");
-
validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation());
+String gcpTempLocation;
+try {
+  gcpTempLocation = dataflowOptions.getGcpTempLocation();
+} catch (Exception e) {
+  throw new IllegalArgumentException("DataflowRunner requires 
gcpTempLocation, "
+  + "but failed to retrieve a value from PipelineOptions", e);
+}
+validator.validateOutputFilePrefixSupported(gcpTempLocation);
+
+String stagingLocation;
+try {
+  stagingLocation = dataflowOptions.getStagingLocation();
+} catch (Exception e) {
+  throw new IllegalArgumentException("DataflowRunner requires 
stagingLocation, "
+  + "but failed to retrieve a value from PipelineOptions", e);
+}
+validator.validateOutputFilePrefixSupported(stagingLocation);
 
 if (!Strings.isNullOrEmpty(dataflowOptions.getSaveProfilesToGcs())) {
   
validator.validateOutputFilePrefixSupported(dataflowOptions.getSaveProfilesToGcs());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1a8583/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
--
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
index 66632ad..5ddc5d0 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java
@@ -17,9 +17,6 @@
  */
 package org.apache.beam.runners.dataflow.options;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Strings.isNullOrEmpty;
-
 import java.io.IOException;
 import 

[07/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-io-jdbc module to TestPipeline as a JUnit rule.

2016-12-21 Thread kenn
Migrated the beam-sdks-java-io-jdbc module to TestPipeline as a JUnit rule.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5ccbe679
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5ccbe679
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5ccbe679

Branch: refs/heads/python-sdk
Commit: 5ccbe6791af7e75c166ed877391e8c86bba5fe56
Parents: 6dea099
Author: Stas Levin 
Authored: Tue Dec 20 17:26:51 2016 +0200
Committer: Kenneth Knowles 
Committed: Tue Dec 20 09:55:45 2016 -0800

--
 .../test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java| 8 
 1 file changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5ccbe679/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
--
diff --git 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index aa93a22..eec7cb8 100644
--- 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -48,6 +48,7 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
@@ -64,6 +65,9 @@ public class JdbcIOTest implements Serializable {
 
   private static int port;
 
+  @Rule
+  public final transient TestPipeline pipeline = TestPipeline.create();
+
   @BeforeClass
   public static void startDatabase() throws Exception {
 ServerSocket socket = new ServerSocket(0);
@@ -207,7 +211,6 @@ public class JdbcIOTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testRead() throws Exception {
-TestPipeline pipeline = TestPipeline.create();
 
 PCollection> output = pipeline.apply(
 JdbcIO.>read()
@@ -245,7 +248,6 @@ public class JdbcIOTest implements Serializable {
@Test
@Category(NeedsRunner.class)
public void testReadWithSingleStringParameter() throws Exception {
- TestPipeline pipeline = TestPipeline.create();
 
  PCollection> output = pipeline.apply(
  JdbcIO.>read()
@@ -278,7 +280,6 @@ public class JdbcIOTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testWrite() throws Exception {
-TestPipeline pipeline = TestPipeline.create();
 
 ArrayList> data = new ArrayList<>();
 for (int i = 0; i < 1000; i++) {
@@ -316,7 +317,6 @@ public class JdbcIOTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testWriteWithEmptyPCollection() throws Exception {
-TestPipeline pipeline = TestPipeline.create();
 
 pipeline.apply(Create.of(new ArrayList>()))
 .apply(JdbcIO.>write()



[03/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-core module to TestPipeline as a JUnit rule. Plus, fixed some checkstyle errors from previous modules' migration.

2016-12-21 Thread kenn
Migrated the beam-sdks-java-core module to TestPipeline as a JUnit rule.
Plus, fixed some checkstyle errors from previous modules' migration.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/75a4c918
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/75a4c918
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/75a4c918

Branch: refs/heads/python-sdk
Commit: 75a4c918346b5a04213a54bf7d1bf6507655342a
Parents: 09c404a
Author: Stas Levin 
Authored: Mon Dec 19 23:54:47 2016 +0200
Committer: Kenneth Knowles 
Committed: Tue Dec 20 09:55:45 2016 -0800

--
 .../UnboundedReadFromBoundedSourceTest.java |   1 -
 .../direct/CloningBundleFactoryTest.java|   2 +-
 .../CopyOnAccessInMemoryStateInternalsTest.java |   6 +-
 .../ImmutabilityCheckingBundleFactoryTest.java  |   2 +-
 .../direct/ImmutableListBundleFactoryTest.java  |   2 +-
 .../direct/WriteWithShardingFactoryTest.java|   2 +-
 .../java/org/apache/beam/sdk/PipelineTest.java  |  37 +++---
 .../apache/beam/sdk/coders/AvroCoderTest.java   |  11 +-
 .../beam/sdk/coders/CoderRegistryTest.java  |   6 +-
 .../beam/sdk/coders/SerializableCoderTest.java  |   7 +-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  16 +--
 .../io/BoundedReadFromUnboundedSourceTest.java  |   6 +-
 .../beam/sdk/io/CompressedSourceTest.java   |  12 +-
 .../apache/beam/sdk/io/CountingInputTest.java   |  12 +-
 .../apache/beam/sdk/io/CountingSourceTest.java  |  13 +-
 .../apache/beam/sdk/io/FileBasedSourceTest.java |   4 +-
 .../beam/sdk/io/PubsubUnboundedSinkTest.java|  10 +-
 .../beam/sdk/io/PubsubUnboundedSourceTest.java  |  12 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  29 +++--
 .../java/org/apache/beam/sdk/io/WriteTest.java  |   2 +-
 .../org/apache/beam/sdk/io/XmlSourceTest.java   |  10 +-
 .../sdk/options/ProxyInvocationHandlerTest.java |   5 +-
 .../sdk/runners/TransformHierarchyTest.java |   6 +-
 .../beam/sdk/runners/TransformTreeTest.java |  11 +-
 .../beam/sdk/testing/GatherAllPanesTest.java|   7 +-
 .../apache/beam/sdk/testing/PAssertTest.java|  32 ++---
 .../apache/beam/sdk/testing/TestStreamTest.java |   7 +-
 .../transforms/ApproximateQuantilesTest.java|  12 +-
 .../sdk/transforms/ApproximateUniqueTest.java   |   6 +-
 .../beam/sdk/transforms/CombineFnsTest.java |   5 +-
 .../apache/beam/sdk/transforms/CombineTest.java |  25 +---
 .../apache/beam/sdk/transforms/CountTest.java   |  13 +-
 .../apache/beam/sdk/transforms/CreateTest.java  |  27 +
 .../beam/sdk/transforms/DistinctTest.java   |  12 +-
 .../apache/beam/sdk/transforms/DoFnTest.java|   4 +-
 .../beam/sdk/transforms/DoFnTesterTest.java |   6 +-
 .../apache/beam/sdk/transforms/FilterTest.java  |  18 +--
 .../sdk/transforms/FlatMapElementsTest.java |  10 +-
 .../apache/beam/sdk/transforms/FlattenTest.java |  35 +-
 .../beam/sdk/transforms/GroupByKeyTest.java |  30 ++---
 .../apache/beam/sdk/transforms/KeysTest.java|   9 +-
 .../apache/beam/sdk/transforms/KvSwapTest.java  |   9 +-
 .../apache/beam/sdk/transforms/LatestTest.java  |  12 +-
 .../beam/sdk/transforms/MapElementsTest.java|  14 +--
 .../beam/sdk/transforms/ParDoLifecycleTest.java |  17 +--
 .../apache/beam/sdk/transforms/ParDoTest.java   | 118 +++
 .../beam/sdk/transforms/PartitionTest.java  |   8 +-
 .../apache/beam/sdk/transforms/RegexTest.java   |  25 +---
 .../apache/beam/sdk/transforms/SampleTest.java  |  34 +++---
 .../beam/sdk/transforms/SplittableDoFnTest.java |  12 +-
 .../org/apache/beam/sdk/transforms/TopTest.java |  15 ++-
 .../apache/beam/sdk/transforms/ValuesTest.java  |   7 +-
 .../apache/beam/sdk/transforms/ViewTest.java|  84 -
 .../beam/sdk/transforms/WithKeysTest.java   |   8 +-
 .../beam/sdk/transforms/WithTimestampsTest.java |   9 +-
 .../sdk/transforms/join/CoGroupByKeyTest.java   |  11 +-
 .../sdk/transforms/windowing/WindowTest.java|  22 ++--
 .../sdk/transforms/windowing/WindowingTest.java |  11 +-
 .../org/apache/beam/sdk/util/ReshuffleTest.java |  11 +-
 .../beam/sdk/values/PCollectionTupleTest.java   |  12 +-
 .../org/apache/beam/sdk/values/PDoneTest.java   |   9 +-
 .../apache/beam/sdk/values/TypedPValueTest.java |  10 +-
 62 files changed, 353 insertions(+), 587 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
--
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
 

[22/51] [abbrv] incubator-beam git commit: This closes #1581: [BEAM-1117] Port direct runner StatefulParDo to KeyedWorkItem

2016-12-21 Thread kenn
This closes #1581: [BEAM-1117] Port direct runner StatefulParDo to KeyedWorkItem

  Port direct runner StatefulParDo to KeyedWorkItem
  Propagate key through ParDo if DoFn is key-preserving
  Move responsibility for knowing about keyedness into EvaluationContext
  Add some key-preserving to KeyedPValueTrackingVisitor


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2f4b8031
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2f4b8031
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2f4b8031

Branch: refs/heads/python-sdk
Commit: 2f4b80312c69da00df82aaa37d17cc2f6a742648
Parents: a526adb 1f018ab
Author: Kenneth Knowles 
Authored: Tue Dec 20 12:39:34 2016 -0800
Committer: Kenneth Knowles 
Committed: Tue Dec 20 12:39:34 2016 -0800

--
 .../beam/runners/direct/DirectRunner.java   |  13 +-
 .../beam/runners/direct/EvaluationContext.java  |  26 +++-
 .../direct/ExecutorServiceParallelExecutor.java |   8 +-
 .../direct/KeyedPValueTrackingVisitor.java  |  44 --
 .../beam/runners/direct/ParDoEvaluator.java |  13 +-
 .../runners/direct/ParDoEvaluatorFactory.java   |   3 +
 .../direct/ParDoMultiOverrideFactory.java   |  94 +++--
 ...littableProcessElementsEvaluatorFactory.java |   1 +
 .../direct/StatefulParDoEvaluatorFactory.java   |  36 ++---
 .../runners/direct/EvaluationContextTest.java   |   9 +-
 .../direct/KeyedPValueTrackingVisitorTest.java  | 135 +--
 .../beam/runners/direct/ParDoEvaluatorTest.java |   1 +
 .../StatefulParDoEvaluatorFactoryTest.java  |  51 ---
 13 files changed, 281 insertions(+), 153 deletions(-)
--




[06/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-io-jms module to TestPipeline as a JUnit rule.

2016-12-21 Thread kenn
Migrated the beam-sdks-java-io-jms module to TestPipeline as a JUnit rule.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8d478c0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8d478c0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8d478c0f

Branch: refs/heads/python-sdk
Commit: 8d478c0f38c656d3533d590a65c6ed95da229f81
Parents: 5ccbe67
Author: Stas Levin 
Authored: Tue Dec 20 17:31:23 2016 +0200
Committer: Kenneth Knowles 
Committed: Tue Dec 20 09:55:45 2016 -0800

--
 .../src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 9 -
 1 file changed, 4 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8d478c0f/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
--
diff --git 
a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java 
b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
index 4c3be6d..7259ce8 100644
--- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
+++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java
@@ -28,7 +28,6 @@ import javax.jms.TextMessage;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -38,6 +37,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -55,6 +55,9 @@ public class JmsIOTest {
   private BrokerService broker;
   private ConnectionFactory connectionFactory;
 
+  @Rule
+  public final transient TestPipeline pipeline = TestPipeline.create();
+
   @Before
   public void startBroker() throws Exception {
 broker = new BrokerService();
@@ -92,8 +95,6 @@ public class JmsIOTest {
 session.close();
 connection.close();
 
-Pipeline pipeline = TestPipeline.create();
-
 // read from the queue
 PCollection output = pipeline.apply(
 JmsIO.read()
@@ -117,8 +118,6 @@ public class JmsIOTest {
   @Category(NeedsRunner.class)
   public void testWriteMessage() throws Exception {
 
-Pipeline pipeline = TestPipeline.create();
-
 ArrayList data = new ArrayList<>();
 for (int i = 0; i < 100; i++) {
   data.add("Message " + i);



[11/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-io-java8tests module to TestPipeline as a JUnit rule + fixed WithTimestampsJava8Test.withTimestampsLambdaShouldApplyTimestamps.

2016-12-21 Thread kenn
Migrated the beam-sdks-java-io-java8tests module to TestPipeline as a JUnit 
rule + fixed WithTimestampsJava8Test.withTimestampsLambdaShouldApplyTimestamps.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4b23d42c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4b23d42c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4b23d42c

Branch: refs/heads/python-sdk
Commit: 4b23d42c31c95bed0d64bfc393fa193311e93498
Parents: fce4f65
Author: Stas Levin 
Authored: Tue Dec 20 18:57:57 2016 +0200
Committer: Kenneth Knowles 
Committed: Tue Dec 20 09:55:46 2016 -0800

--
 .../apache/beam/sdk/transforms/CombineJava8Test.java  |  8 +++-
 .../apache/beam/sdk/transforms/DistinctJava8Test.java |  5 +++--
 .../apache/beam/sdk/transforms/FilterJava8Test.java   |  9 +++--
 .../beam/sdk/transforms/FlatMapElementsJava8Test.java |  7 ---
 .../beam/sdk/transforms/MapElementsJava8Test.java |  9 ++---
 .../beam/sdk/transforms/PartitionJava8Test.java   |  7 ---
 .../apache/beam/sdk/transforms/WithKeysJava8Test.java |  6 --
 .../beam/sdk/transforms/WithTimestampsJava8Test.java  | 14 ++
 8 files changed, 37 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b23d42c/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java
--
diff --git 
a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java
 
b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java
index 98d99ce..a0f7ce6 100644
--- 
a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java
+++ 
b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java
@@ -23,7 +23,6 @@ import static org.hamcrest.Matchers.not;
 
 import com.google.common.collect.Iterables;
 import java.io.Serializable;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -45,6 +44,9 @@ import org.junit.runners.JUnit4;
 public class CombineJava8Test implements Serializable {
 
   @Rule
+  public final transient TestPipeline pipeline = TestPipeline.create();
+
+  @Rule
   public transient ExpectedException thrown = ExpectedException.none();
 
   /**
@@ -65,7 +67,6 @@ public class CombineJava8Test implements Serializable {
*/
   @Test
   public void testCombineGloballyLambda() {
-Pipeline pipeline = TestPipeline.create();
 
 PCollection output = pipeline
 .apply(Create.of(1, 2, 3, 4))
@@ -86,7 +87,6 @@ public class CombineJava8Test implements Serializable {
*/
   @Test
   public void testCombineGloballyInstanceMethodReference() {
-Pipeline pipeline = TestPipeline.create();
 
 PCollection output = pipeline
 .apply(Create.of(1, 2, 3, 4))
@@ -101,7 +101,6 @@ public class CombineJava8Test implements Serializable {
*/
   @Test
   public void testCombinePerKeyLambda() {
-Pipeline pipeline = TestPipeline.create();
 
 PCollection> output = pipeline
 .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), 
KV.of("c", 4)))
@@ -125,7 +124,6 @@ public class CombineJava8Test implements Serializable {
*/
   @Test
   public void testCombinePerKeyInstanceMethodReference() {
-Pipeline pipeline = TestPipeline.create();
 
 PCollection> output = pipeline
 .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), 
KV.of("c", 4)))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b23d42c/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
--
diff --git 
a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
 
b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
index 99ef232..790f51e 100644
--- 
a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
+++ 
b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java
@@ -44,11 +44,13 @@ import org.junit.runners.JUnit4;
 public class DistinctJava8Test {
 
   @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
+  @Rule
   public ExpectedException thrown = ExpectedException.none();
 
   @Test
   public void withLambdaRepresentativeValuesFnAndTypeDescriptorShouldApplyFn() 
{
-TestPipeline p = TestPipeline.create();
 
 Multimap 

[18/51] [abbrv] incubator-beam git commit: Port direct runner StatefulParDo to KeyedWorkItem

2016-12-21 Thread kenn
Port direct runner StatefulParDo to KeyedWorkItem


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1f018ab6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1f018ab6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1f018ab6

Branch: refs/heads/python-sdk
Commit: 1f018ab69fdcc720a10e2aeb8ec1eea1c06e1cbc
Parents: d040b7f
Author: Kenneth Knowles 
Authored: Mon Dec 12 19:49:58 2016 -0800
Committer: Kenneth Knowles 
Committed: Tue Dec 20 11:19:07 2016 -0800

--
 .../direct/KeyedPValueTrackingVisitor.java  | 13 ++-
 .../direct/ParDoMultiOverrideFactory.java   | 94 +---
 .../direct/StatefulParDoEvaluatorFactory.java   | 36 
 .../direct/KeyedPValueTrackingVisitorTest.java  | 69 --
 .../StatefulParDoEvaluatorFactoryTest.java  | 51 +++
 5 files changed, 205 insertions(+), 58 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f018ab6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
index e91a768..65c41e0 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PValue;
 
 /**
@@ -105,7 +106,15 @@ class KeyedPValueTrackingVisitor implements 
PipelineVisitor {
   }
 
   private static boolean isKeyPreserving(PTransform transform) {
-// There are currently no key-preserving transforms; this lays the 
infrastructure for them
-return false;
+// This is a hacky check for what is considered key-preserving to the 
direct runner.
+// The most obvious alternative would be a package-private marker 
interface, but
+// better to make this obviously hacky so it is less likely to 
proliferate. Meanwhile
+// we intend to allow explicit expression of key-preserving DoFn in the 
model.
+if (transform instanceof ParDo.BoundMulti) {
+  ParDo.BoundMulti parDo = (ParDo.BoundMulti) transform;
+  return parDo.getFn() instanceof 
ParDoMultiOverrideFactory.ToKeyedWorkItem;
+} else {
+  return false;
+}
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f018ab6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index c5bc069..2cea999 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -17,9 +17,15 @@
  */
 package org.apache.beam.runners.direct;
 
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItemCoder;
+import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.SplittableParDo;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -28,6 +34,8 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.BoundMulti;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -84,16 +92,41 @@ class ParDoMultiOverrideFactory
 @Override
 public PCollectionTuple 

[17/51] [abbrv] incubator-beam git commit: Move responsibility for knowing about keyedness into EvaluationContext

2016-12-21 Thread kenn
Move responsibility for knowing about keyedness into EvaluationContext

This will allow transform evaluators to inquire about whether
various collections are keyed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b26ceaa3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b26ceaa3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b26ceaa3

Branch: refs/heads/python-sdk
Commit: b26ceaa347c4bc50abfb4c3c138167a25a99cf57
Parents: 81702e6
Author: Kenneth Knowles 
Authored: Thu Dec 8 13:28:44 2016 -0800
Committer: Kenneth Knowles 
Committed: Tue Dec 20 11:18:04 2016 -0800

--
 .../beam/runners/direct/DirectRunner.java   |  4 +--
 .../beam/runners/direct/EvaluationContext.java  | 26 +---
 .../direct/ExecutorServiceParallelExecutor.java |  8 +-
 .../runners/direct/EvaluationContextTest.java   |  9 ++-
 4 files changed, 34 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b26ceaa3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index afa43ff..7e6ea15 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -315,14 +315,14 @@ public class DirectRunner extends 
PipelineRunner {
 getPipelineOptions(),
 clockSupplier.get(),
 Enforcement.bundleFactoryFor(enabledEnforcements, graph),
-graph);
+graph,
+keyedPValueVisitor.getKeyedPValues());
 
 RootProviderRegistry rootInputProvider = 
RootProviderRegistry.defaultRegistry(context);
 TransformEvaluatorRegistry registry = 
TransformEvaluatorRegistry.defaultRegistry(context);
 PipelineExecutor executor =
 ExecutorServiceParallelExecutor.create(
 options.getTargetParallelism(), graph,
-keyedPValueVisitor.getKeyedPValues(),
 rootInputProvider,
 registry,
 Enforcement.defaultModelEnforcements(enabledEnforcements),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b26ceaa3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 230d91b..cb9ddd8 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -27,6 +27,7 @@ import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import javax.annotation.Nullable;
@@ -99,17 +100,28 @@ class EvaluationContext {
 
   private final DirectMetrics metrics;
 
+  private final Set keyedPValues;
+
   public static EvaluationContext create(
-  DirectOptions options, Clock clock, BundleFactory bundleFactory, 
DirectGraph graph) {
-return new EvaluationContext(options, clock, bundleFactory, graph);
+  DirectOptions options,
+  Clock clock,
+  BundleFactory bundleFactory,
+  DirectGraph graph,
+  Set keyedPValues) {
+return new EvaluationContext(options, clock, bundleFactory, graph, 
keyedPValues);
   }
 
   private EvaluationContext(
-  DirectOptions options, Clock clock, BundleFactory bundleFactory, 
DirectGraph graph) {
+  DirectOptions options,
+  Clock clock,
+  BundleFactory bundleFactory,
+  DirectGraph graph,
+  Set keyedPValues) {
 this.options = checkNotNull(options);
 this.clock = clock;
 this.bundleFactory = checkNotNull(bundleFactory);
 this.graph = checkNotNull(graph);
+this.keyedPValues = keyedPValues;
 
 this.watermarkManager = WatermarkManager.create(clock, graph);
 this.sideInputContainer = SideInputContainer.create(this, 
graph.getViews());
@@ -244,6 +256,14 @@ class EvaluationContext {
   }
 
   /**
+   * Indicate whether or not this {@link PCollection} has been determined to be
+   * keyed.
+   */
+  public  boolean isKeyed(PValue pValue) {
+return keyedPValues.contains(pValue);
+  }
+
+  /**
* Create a {@link 

[26/51] [abbrv] incubator-beam git commit: Remove deprecated AggregatorFactory from SDK

2016-12-21 Thread kenn
Remove deprecated AggregatorFactory from SDK


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aab46a0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aab46a0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aab46a0e

Branch: refs/heads/python-sdk
Commit: aab46a0ec6e0e45208f64de7aabb9af643acd0ec
Parents: a3f68d3
Author: Kenneth Knowles 
Authored: Thu Dec 15 20:13:25 2016 -0800
Committer: Kenneth Knowles 
Committed: Tue Dec 20 14:05:35 2016 -0800

--
 .../apache/beam/sdk/transforms/Aggregator.java   | 19 ---
 1 file changed, 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aab46a0e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
index 43f53a8..4119c53 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.transforms;
 
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.util.ExecutionContext;
 
 /**
  * An {@code Aggregator} enables monitoring of values of type {@code 
InputT},
@@ -68,22 +67,4 @@ public interface Aggregator {
* aggregator.
*/
   CombineFn getCombineFn();
-
-  /**
-   * @deprecated this is for use only by runners and exists only for a 
migration period. Please
-   * use the identical interface in org.apache.beam.runners.core
-   */
-  @Deprecated
-  interface AggregatorFactory {
-/**
- * Create an aggregator with the given {@code name} and {@link CombineFn}.
- *
- *  This method is called to create an aggregator for a {@link DoFn}. 
It receives the
- *  class of the {@link DoFn} being executed and the context of the step 
it is being
- *  executed in.
- */
- Aggregator 
createAggregatorForDoFn(
-Class fnClass, ExecutionContext.StepContext stepContext,
-String aggregatorName, CombineFn combine);
-  }
 }



[32/51] [abbrv] incubator-beam git commit: This closes #1668: Remove deprecated InMemoryTimerInternals from SDK

2016-12-21 Thread kenn
This closes #1668: Remove deprecated InMemoryTimerInternals from SDK


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0d0a5e28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0d0a5e28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0d0a5e28

Branch: refs/heads/python-sdk
Commit: 0d0a5e2872aeba7a1069927408b3a9607709cf11
Parents: aadcf3a 9f1d3d1
Author: Kenneth Knowles 
Authored: Wed Dec 21 08:16:00 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 08:16:00 2016 -0800

--
 .../sdk/util/state/InMemoryTimerInternals.java  | 275 ---
 1 file changed, 275 deletions(-)
--




[15/51] [abbrv] incubator-beam git commit: Add some key-preserving to KeyedPValueTrackingVisitor

2016-12-21 Thread kenn
Add some key-preserving to KeyedPValueTrackingVisitor


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/81702e67
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/81702e67
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/81702e67

Branch: refs/heads/python-sdk
Commit: 81702e67b92a23849cbc8f4a16b2a619e4b477a1
Parents: 22e25a4
Author: Kenneth Knowles 
Authored: Thu Dec 8 11:49:15 2016 -0800
Committer: Kenneth Knowles 
Committed: Tue Dec 20 11:18:02 2016 -0800

--
 .../beam/runners/direct/DirectRunner.java   |  9 +--
 .../direct/KeyedPValueTrackingVisitor.java  | 35 +---
 .../direct/KeyedPValueTrackingVisitorTest.java  | 84 +++-
 3 files changed, 37 insertions(+), 91 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81702e67/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 78163c0..afa43ff 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -31,8 +31,6 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.SplittableParDo;
-import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
-import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import 
org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
 import org.apache.beam.runners.direct.ViewEvaluatorFactory.ViewOverrideFactory;
@@ -306,12 +304,7 @@ public class DirectRunner extends 
PipelineRunner {
 graphVisitor.finishSpecifyingRemainder();
 
 @SuppressWarnings("rawtypes")
-KeyedPValueTrackingVisitor keyedPValueVisitor =
-KeyedPValueTrackingVisitor.create(
-ImmutableSet.of(
-SplittableParDo.GBKIntoKeyedWorkItems.class,
-DirectGroupByKeyOnly.class,
-DirectGroupAlsoByWindow.class));
+KeyedPValueTrackingVisitor keyedPValueVisitor = 
KeyedPValueTrackingVisitor.create();
 pipeline.traverseTopologically(keyedPValueVisitor);
 
 DisplayDataValidator.validatePipeline(pipeline);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81702e67/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
index 7f85169..e91a768 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
@@ -18,9 +18,15 @@
 package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Predicates.in;
+import static com.google.common.collect.Iterables.all;
 
+import com.google.common.collect.ImmutableSet;
 import java.util.HashSet;
 import java.util.Set;
+import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
+import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -38,19 +44,21 @@ import org.apache.beam.sdk.values.PValue;
 // TODO: Handle Key-preserving transforms when appropriate and more 
aggressively make PTransforms
 // unkeyed
 class KeyedPValueTrackingVisitor implements PipelineVisitor {
-  @SuppressWarnings("rawtypes")
-  private final Set producesKeyedOutputs;
+
+  private static final Set PRODUCES_KEYED_OUTPUTS 
=
+  ImmutableSet.of(
+  SplittableParDo.GBKIntoKeyedWorkItems.class,
+  DirectGroupByKeyOnly.class,
+  DirectGroupAlsoByWindow.class);
+
   private final Set keyedValues;
   private boolean finalized;
 
-  public static KeyedPValueTrackingVisitor create(
-  @SuppressWarnings("rawtypes") Set 
producesKeyedOutputs) {
-return new 

[20/51] [abbrv] incubator-beam git commit: Move InMemoryTimerInternals to runners-core

2016-12-21 Thread kenn
Move InMemoryTimerInternals to runners-core


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/445c1205
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/445c1205
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/445c1205

Branch: refs/heads/python-sdk
Commit: 445c120510948fb23e6d35b502da1e5a4f0ffdfb
Parents: 22e25a4
Author: Kenneth Knowles 
Authored: Thu Dec 15 20:45:56 2016 -0800
Committer: Kenneth Knowles 
Committed: Tue Dec 20 11:21:52 2016 -0800

--
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |   1 -
 .../runners/core/InMemoryTimerInternals.java| 273 ++
 .../core/InMemoryTimerInternalsTest.java| 155 +++
 .../beam/runners/core/ReduceFnTester.java   |   1 -
 .../beam/runners/core/SplittableParDoTest.java  |  16 +-
 .../triggers/TriggerStateMachineTester.java |   2 +-
 .../translation/SparkGroupAlsoByWindowFn.java   |   2 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  |  36 ---
 .../sdk/util/state/InMemoryTimerInternals.java  | 275 ---
 .../util/state/InMemoryTimerInternalsTest.java  | 153 ---
 10 files changed, 443 insertions(+), 471 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
index 9189191..efcd771 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
new file mode 100644
index 000..5fcd088
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.MoreObjects;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.joda.time.Instant;
+
+/** {@link TimerInternals} with all watermarks and processing clock simulated 
in-memory. */
+public class InMemoryTimerInternals implements TimerInternals {
+
+  /** At most one timer per timestamp is kept. */
+  private Set existingTimers = new HashSet<>();
+
+  /** Pending input watermark timers, in timestamp order. */
+  private PriorityQueue 

[25/51] [abbrv] incubator-beam git commit: [BEAM-1097] Provide a better error message for non-existing gcpTempLocation

2016-12-21 Thread kenn
[BEAM-1097] Provide a better error message for non-existing gcpTempLocation

This closes #1522


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/acd2196c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/acd2196c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/acd2196c

Branch: refs/heads/python-sdk
Commit: acd2196cf54e30e18e69c4dd30b57e6179909ecf
Parents: 2f4b803 96d3931
Author: Luke Cwik 
Authored: Tue Dec 20 13:04:31 2016 -0800
Committer: Luke Cwik 
Committed: Tue Dec 20 13:04:31 2016 -0800

--
 .../beam/runners/dataflow/DataflowRunner.java   | 25 ++
 .../options/DataflowPipelineOptions.java| 19 
 .../runners/dataflow/DataflowRunnerTest.java| 48 ++--
 .../options/DataflowPipelineOptionsTest.java| 20 +---
 .../org/apache/beam/sdk/options/GcpOptions.java | 19 
 .../apache/beam/sdk/util/GcsPathValidator.java  |  3 +-
 .../apache/beam/sdk/options/GcpOptionsTest.java | 32 +++--
 .../beam/sdk/util/GcsPathValidatorTest.java | 15 +-
 8 files changed, 117 insertions(+), 64 deletions(-)
--




[19/51] [abbrv] incubator-beam git commit: Restore SDK's InMemoryTimerInternals, deprecated

2016-12-21 Thread kenn
Restore SDK's InMemoryTimerInternals, deprecated


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/69d2c47b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/69d2c47b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/69d2c47b

Branch: refs/heads/python-sdk
Commit: 69d2c47b6a476099535e9cefe62d4cce5ccafbc1
Parents: 445c120
Author: Kenneth Knowles 
Authored: Fri Dec 16 20:22:59 2016 -0800
Committer: Kenneth Knowles 
Committed: Tue Dec 20 11:21:52 2016 -0800

--
 .../sdk/util/state/InMemoryTimerInternals.java  | 275 +++
 1 file changed, 275 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69d2c47b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
new file mode 100644
index 000..a910d64
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.MoreObjects;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.joda.time.Instant;
+
+/**
+ * @deprecated use {@code org.apache.beam.runners.core.InMemoryTimerInternals}.
+ */
+@Deprecated
+public class InMemoryTimerInternals implements TimerInternals {
+
+  /** At most one timer per timestamp is kept. */
+  private Set existingTimers = new HashSet<>();
+
+  /** Pending input watermark timers, in timestamp order. */
+  private PriorityQueue watermarkTimers = new PriorityQueue<>(11);
+
+  /** Pending processing time timers, in timestamp order. */
+  private PriorityQueue processingTimers = new PriorityQueue<>(11);
+
+  /** Pending synchronized processing time timers, in timestamp order. */
+  private PriorityQueue synchronizedProcessingTimers = new 
PriorityQueue<>(11);
+
+  /** Current input watermark. */
+  private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+  /** Current output watermark. */
+  @Nullable private Instant outputWatermarkTime = null;
+
+  /** Current processing time. */
+  private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+  /** Current synchronized processing time. */
+  private Instant synchronizedProcessingTime = 
BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+  @Override
+  @Nullable
+  public Instant currentOutputWatermarkTime() {
+return outputWatermarkTime;
+  }
+
+  /**
+   * Returns when the next timer in the given time domain will fire, or {@code 
null}
+   * if there are no timers scheduled in that time domain.
+   */
+  @Nullable
+  public Instant getNextTimer(TimeDomain domain) {
+final TimerData data;
+switch (domain) {
+  case EVENT_TIME:
+data = watermarkTimers.peek();
+break;
+  case PROCESSING_TIME:
+data = processingTimers.peek();
+break;
+  case SYNCHRONIZED_PROCESSING_TIME:
+data = synchronizedProcessingTimers.peek();
+break;
+  default:
+throw new IllegalArgumentException("Unexpected time domain: " + 
domain);
+}
+return (data == null) ? null : data.getTimestamp();
+  }
+
+  private PriorityQueue queue(TimeDomain domain) {
+switch (domain) {
+  case EVENT_TIME:
+return watermarkTimers;
+  case 

[21/51] [abbrv] incubator-beam git commit: This closes #1652: Move InMemoryTimerInternals to runners-core

2016-12-21 Thread kenn
This closes #1652: Move InMemoryTimerInternals to runners-core

* github/pr/1652:
  Restore SDK's InMemoryTimerInternals, deprecated
  Move InMemoryTimerInternals to runners-core


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a526adb3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a526adb3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a526adb3

Branch: refs/heads/python-sdk
Commit: a526adb33c1eeece866f464e3dfd2cdbc3be6dea
Parents: 22e25a4 69d2c47
Author: Kenneth Knowles 
Authored: Tue Dec 20 11:22:24 2016 -0800
Committer: Kenneth Knowles 
Committed: Tue Dec 20 11:22:24 2016 -0800

--
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |   1 -
 .../runners/core/InMemoryTimerInternals.java| 273 +++
 .../core/InMemoryTimerInternalsTest.java| 155 +++
 .../beam/runners/core/ReduceFnTester.java   |   1 -
 .../beam/runners/core/SplittableParDoTest.java  |  16 +-
 .../triggers/TriggerStateMachineTester.java |   2 +-
 .../translation/SparkGroupAlsoByWindowFn.java   |   2 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  |  36 ---
 .../sdk/util/state/InMemoryTimerInternals.java  |   4 +-
 .../util/state/InMemoryTimerInternalsTest.java  | 153 ---
 10 files changed, 445 insertions(+), 198 deletions(-)
--




[12/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-io-mongodb module to TestPipeline as a JUnit rule.

2016-12-21 Thread kenn
Migrated the beam-sdks-java-io-mongodb module to TestPipeline as a JUnit rule.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fce4f658
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fce4f658
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fce4f658

Branch: refs/heads/python-sdk
Commit: fce4f6584ca2fd3c2c258405b9f3014be9da3514
Parents: 950aa7e
Author: Stas Levin 
Authored: Tue Dec 20 18:09:30 2016 +0200
Committer: Kenneth Knowles 
Committed: Tue Dec 20 09:55:46 2016 -0800

--
 .../org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java | 9 -
 .../java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java  | 7 ---
 2 files changed, 8 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fce4f658/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
--
diff --git 
a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
 
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
index df05c93..994be87 100644
--- 
a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
+++ 
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
@@ -55,7 +55,6 @@ import java.util.List;
 import java.util.Random;
 import java.util.Scanner;
 
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -79,6 +78,7 @@ import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
@@ -100,6 +100,9 @@ public class MongoDBGridFSIOTest implements Serializable {
 
   private static int port;
 
+  @Rule
+  public final transient TestPipeline pipeline = TestPipeline.create();
+
   @BeforeClass
   public static void setup() throws Exception {
 try (ServerSocket serverSocket = new ServerSocket(0)) {
@@ -182,7 +185,6 @@ public class MongoDBGridFSIOTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testFullRead() throws Exception {
-TestPipeline pipeline = TestPipeline.create();
 
 PCollection output = pipeline.apply(
 MongoDbGridFSIO.read()
@@ -212,7 +214,6 @@ public class MongoDBGridFSIOTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testReadWithParser() throws Exception {
-TestPipeline pipeline = TestPipeline.create();
 
 PCollection> output = pipeline.apply(
 MongoDbGridFSIO.>read()
@@ -297,8 +298,6 @@ public class MongoDBGridFSIOTest implements Serializable {
   @Category(NeedsRunner.class)
   public void testWriteMessage() throws Exception {
 
-Pipeline pipeline = TestPipeline.create();
-
 ArrayList data = new ArrayList<>(100);
 ArrayList intData = new ArrayList<>(100);
 for (int i = 0; i < 1000; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fce4f658/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
--
diff --git 
a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
 
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
index 5faa618..e7ff712 100644
--- 
a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
+++ 
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java
@@ -57,6 +57,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
@@ -80,6 +81,9 @@ public class MongoDbIOTest implements Serializable {
 
   private static int port;
 
+  @Rule
+  public final transient TestPipeline pipeline = TestPipeline.create();
+
   /**
* Looking for an available network port.
*/
@@ -143,7 +147,6 @@ public class MongoDbIOTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testFullRead() throws Exception {
-TestPipeline pipeline = TestPipeline.create();
 
 PCollection output = pipeline.apply(
 MongoDbIO.read()
@@ -177,7 +180,6 @@ public class MongoDbIOTest implements Serializable {
   @Test
   

[jira] [Commented] (BEAM-1198) ViewFn: explicitly decouple runner materialization of side inputs from SDK-specific mapping

2016-12-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768387#comment-15768387
 ] 

ASF GitHub Bot commented on BEAM-1198:
--

GitHub user kennknowles opened a pull request:

https://github.com/apache/incubator-beam/pull/1678

[BEAM-1198, BEAM-846, BEAM-260] Refactor Dataflow translator to decouple 
input and output graphs more easily

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [x] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

This is preparatory work to make it possible for the translator to have a 
more loosely coupled relationship between its input and output.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kennknowles/incubator-beam Dataflow-Translator

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1678.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1678


commit 8ed4bb68660c537e4a12c1077ecfa104f9a82eaa
Author: Kenneth Knowles 
Date:   2016-12-21T22:21:50Z

Inline needless interface DataflowTranslator.TranslationContext

The only implementation was DataflowTranslator.Translator. This class
needs some updating and the extra layer of the interface simply
obscures that work.

commit 272d06d7507ad7162616dd1b613efa7c8f5f4069
Author: Kenneth Knowles 
Date:   2016-12-21T22:34:27Z

Explicitly pass Step to mutate in Dataflow translator

Previously, there was always a "current" step that was the most recent
step created. This makes it cumbersome or impossible to do things like
translate one primitive transform into a small subgraph of steps. Thus
we added hacks like CreatePCollectionView which are not actually part
of the model at all - in fact, we should be able to add the needed
CollectionToSingleton steps simply by looking at the side inputs of a
ParDo node.




> ViewFn: explicitly decouple runner materialization of side inputs from 
> SDK-specific mapping
> ---
>
> Key: BEAM-1198
> URL: https://issues.apache.org/jira/browse/BEAM-1198
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model-fn-api, beam-model-runner-api
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
>
> For side inputs, the field {{PCollectionView.fromIterableInternal}} implies 
> an "iterable" materialization of the contents of a PCollection, which is 
> adapted to the desired user-facing type within a UDF (today the 
> PCollectionView "is" the UDF)
> In practice, runners get adequate performance by special casing just a couple 
> of types of PCollectionView in a rather cumbersome manner.
> The design to improve this is https://s.apache.org/beam-side-inputs-1-pager 
> and this makes the de facto standard approach the actual model.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #1678: [BEAM-1198, BEAM-846, BEAM-260] Refactor ...

2016-12-21 Thread kennknowles
GitHub user kennknowles opened a pull request:

https://github.com/apache/incubator-beam/pull/1678

[BEAM-1198, BEAM-846, BEAM-260] Refactor Dataflow translator to decouple 
input and output graphs more easily

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [x] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

This is preparatory work to make it possible for the translator to have a 
more loosely coupled relationship between its input and output.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kennknowles/incubator-beam Dataflow-Translator

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1678.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1678


commit 8ed4bb68660c537e4a12c1077ecfa104f9a82eaa
Author: Kenneth Knowles 
Date:   2016-12-21T22:21:50Z

Inline needless interface DataflowTranslator.TranslationContext

The only implementation was DataflowTranslator.Translator. This class
needs some updating and the extra layer of the interface simply
obscures that work.

commit 272d06d7507ad7162616dd1b613efa7c8f5f4069
Author: Kenneth Knowles 
Date:   2016-12-21T22:34:27Z

Explicitly pass Step to mutate in Dataflow translator

Previously, there was always a "current" step that was the most recent
step created. This makes it cumbersome or impossible to do things like
translate one primitive transform into a small subgraph of steps. Thus
we added hacks like CreatePCollectionView which are not actually part
of the model at all - in fact, we should be able to add the needed
CollectionToSingleton steps simply by looking at the side inputs of a
ParDo node.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (BEAM-1198) ViewFn: explicitly decouple runner materialization of side inputs from SDK-specific mapping

2016-12-21 Thread Kenneth Knowles (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth Knowles reassigned BEAM-1198:
-

Assignee: Kenneth Knowles

> ViewFn: explicitly decouple runner materialization of side inputs from 
> SDK-specific mapping
> ---
>
> Key: BEAM-1198
> URL: https://issues.apache.org/jira/browse/BEAM-1198
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model-fn-api, beam-model-runner-api
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
>
> For side inputs, the field {{PCollectionView.fromIterableInternal}} implies 
> an "iterable" materialization of the contents of a PCollection, which is 
> adapted to the desired user-facing type within a UDF (today the 
> PCollectionView "is" the UDF)
> In practice, runners get adequate performance by special casing just a couple 
> of types of PCollectionView in a rather cumbersome manner.
> The design to improve this is https://s.apache.org/beam-side-inputs-1-pager 
> and this makes the de facto standard approach the actual model.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #1677: Add a Test for Flatten with Heterogeneous...

2016-12-21 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/incubator-beam/pull/1677

Add a Test for Flatten with Heterogeneous Coders

Add a category, and suppress in the Flink and Apex runners

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/incubator-beam flatten_multiple_coders

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1677.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1677


commit 13a8da998c9c03a8ccf914e1dcc16c2aed038930
Author: Thomas Groh 
Date:   2016-12-21T21:53:48Z

Add a Test for Flatten with Heterogeneous Coders

Add a category, and suppress in the Flink and Apex runners




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1676: Merge remote-tracking branch 'origin/mast...

2016-12-21 Thread aaltay
GitHub user aaltay opened a pull request:

https://github.com/apache/incubator-beam/pull/1676

Merge remote-tracking branch 'origin/master' into python-sdk

There are frequent Apex runner failures the python-sdk branch, bringing up 
pom.xml changes to update those.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aaltay/incubator-beam python-sdk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1676.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1676


commit c22e2a435113c9653b58f1040a4e9266059767f4
Author: Kenneth Knowles 
Date:   2016-12-08T04:04:51Z

No longer reject timers in ParDo

commit 274f17f0c0df08785a78d9a60c22d5556e46584a
Author: Kenneth Knowles 
Date:   2016-12-08T04:37:33Z

Reject timers for ParDo in DirectRunner

commit 96f9fce782d7ccc5257eff8993b4f9b8261651a6
Author: Kenneth Knowles 
Date:   2016-12-08T17:53:09Z

This closes #1550

commit 95e2c53db535952aaf0c335e0d3d27a721c6b55d
Author: Sela 
Date:   2016-12-08T18:29:35Z

[BEAM-] Reject timers for ParDo in SparkRunner streaming evaluators

commit 0bfa02dd26a7fb80753da1ed130acff1265d093a
Author: Kenneth Knowles 
Date:   2016-12-08T19:54:33Z

This closes #1553

commit a11eb637f6f1c7bdc41b24154cb3893bb194b096
Author: Kenneth Knowles 
Date:   2016-12-08T20:42:03Z

Fix exclusion of UsesTimersInParDo for FlinkRunner streaming tests

commit e292032cbe3defc5c61fb171a00dd8391a27afd8
Author: Kenneth Knowles 
Date:   2016-12-08T20:46:37Z

This closes #1555

commit 4a12cd303df6d4c49425d38a6290f496349cc312
Author: Pei He 
Date:   2016-11-11T05:53:57Z

[BEAM-978] Support bulk get file size in GcsUtil.

commit 409b5dfcf6e9a9699eff43b041e9726bbb979b89
Author: Kenneth Knowles 
Date:   2016-12-08T23:47:48Z

This closes #1359

commit bf1fba450e6b5fd6c98d006b381472eee8db7b72
Author: Eugene Kirpichov 
Date:   2016-12-07T02:00:03Z

Fix a bug in SplittableDoFn Checkpointing

Call checkpoint() only once if the SDF emits output several times per
claim call.

Calling checkpoint multiple times would clobber an existing checkpoint,
and the second call would only ever return an empty residual, losing all
of the initial residual.

commit 40bd27602ebe2269ccaba2685addb8e5e3ba533e
Author: Thomas Groh 
Date:   2016-12-09T01:17:56Z

This closes #1525

commit 30ff1ee17bb290f2b50fd082d8cb63d48280c5c2
Author: Luke Cwik 
Date:   2016-12-08T23:22:35Z

Add support for having an empty CountingInput/CountingSource

commit ddb59125aeacb809b7695c203fe8b1a40e36aed2
Author: Luke Cwik 
Date:   2016-12-09T02:41:42Z

Add support for having an empty CountingInput/CountingSource

This closes #1557

commit 5169e492578a3759e20b50e36ace61bc86636ad2
Author: Luke Cwik 
Date:   2016-12-09T03:17:21Z

fixup! Fix CountingInput naming

commit 9bab78b55fe4661e9a221969441d8a2a4716f7e1
Author: Luke Cwik 
Date:   2016-12-09T03:20:22Z

fixup! Fix extraneous brace

commit 9bcba398c7516437c00517e03d75e27544b01166
Author: Dan Halperin 
Date:   2016-12-09T07:15:19Z

DataflowRunner: bump environment major version

commit 63d197cd0cff332b62a5f4398b1693b6839a348b
Author: Dan Halperin 
Date:   2016-12-09T09:43:16Z

Closes #1560

commit e48b0e6bc20d8eba2968decf7ac2b4ee7503a4df
Author: Kenneth Knowles 
Date:   2016-12-09T07:33:40Z

Remove misc uses of OldDoFn

commit d9a6311734064b1c7171b943eeb511c4d648187a
Author: Sela 
Date:   2016-12-09T16:01:12Z

This closes #1561

commit f83370d69d193818e5be9ff613498f9f3296a658
Author: Thomas Groh 
Date:   2016-12-09T18:52:46Z

Enable the DirectRunner by default in Examples Archetype

This ensures a runner will be on the classpath if no profile is
specified. This matches the generated examples with the quickstart.

commit 49215106d818b6f0585fef5f37851fad5a2d4102
Author: Davor Bonaci 
Date:   2016-12-09T19:41:48Z

This closes #1563

commit 9875b52cc3b6e1af2cbde850c6d3f5b1e75e313e
Author: manuzhang 
Date:   2016-12-09T01:49:49Z

[BEAM-1093] Change Dataflow to Beam in StateInternals' Javadocs

commit 080dbaa3e2ea688fe04c3cad9b2612c39daba477
Author: Davor Bonaci 
Date:   2016-12-09T20:15:48Z

This closes #1559

commit 1e44cb12c2663b1353717bf9237618df74684102
Author: bchambers 
Date:   2016-12-08T18:40:17Z

Change Dataflow profiling option to 

[jira] [Created] (BEAM-1200) PubsubIO should allow for a user to supply the function which computes the watermark that is reported

2016-12-21 Thread Luke Cwik (JIRA)
Luke Cwik created BEAM-1200:
---

 Summary: PubsubIO should allow for a user to supply the function 
which computes the watermark that is reported
 Key: BEAM-1200
 URL: https://issues.apache.org/jira/browse/BEAM-1200
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-gcp
Reporter: Luke Cwik
Assignee: Daniel Halperin
Priority: Minor


A user wanted to build a watermark function which tracked the datas watermark 
but never falls behind current time more than Y minutes. PubsubIO does not 
support specifying the function which computes and reports the watermark.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-1117) Support for new Timer API in Direct runner

2016-12-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768294#comment-15768294
 ] 

ASF GitHub Bot commented on BEAM-1117:
--

Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/1669


> Support for new Timer API in Direct runner
> --
>
> Key: BEAM-1117
> URL: https://issues.apache.org/jira/browse/BEAM-1117
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-direct
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #1669: [BEAM-1117] Preliminaries for timers in t...

2016-12-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-beam/pull/1669


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[5/7] incubator-beam git commit: Add static Window.withOutputTimeFn to match build method

2016-12-21 Thread kenn
Add static Window.withOutputTimeFn to match build method


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8188040d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8188040d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8188040d

Branch: refs/heads/master
Commit: 8188040d930b1fa49efd4ed7d5f821d05d6f28ef
Parents: fa4958a
Author: Kenneth Knowles 
Authored: Tue Dec 20 13:57:55 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 13:45:37 2016 -0800

--
 .../org/apache/beam/sdk/transforms/windowing/Window.java| 9 +
 1 file changed, 9 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8188040d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 0c430d0..1241abe 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -223,6 +223,15 @@ public class Window {
   }
 
   /**
+   * (Experimental) Override the default {@link OutputTimeFn}, 
to control
+   * the output timestamp of values output from a {@link GroupByKey} operation.
+   */
+  @Experimental(Kind.OUTPUT_TIME)
+  public static  Bound withOutputTimeFn(OutputTimeFn outputTimeFn) {
+return new Bound(null).withOutputTimeFn(outputTimeFn);
+  }
+
+  /**
* A {@code PTransform} that windows the elements of a {@code 
PCollection},
* into finite windows according to a user-specified {@code WindowFn}.
*



[6/7] incubator-beam git commit: Add UsesTestStream for use with JUnit @Category

2016-12-21 Thread kenn
Add UsesTestStream for use with JUnit @Category


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4d71924c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4d71924c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4d71924c

Branch: refs/heads/master
Commit: 4d71924ccda9dae97c7cc9535a9780df9457cc3f
Parents: 8188040
Author: Kenneth Knowles 
Authored: Tue Dec 20 14:20:07 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 13:45:37 2016 -0800

--
 .../apache/beam/sdk/testing/UsesTestStream.java | 24 
 .../apache/beam/sdk/testing/TestStreamTest.java | 12 +-
 2 files changed, 30 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d71924c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java
new file mode 100644
index 000..8debb46
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+/**
+ * Category tag for tests that use {@link TestStream}, which is not a part of 
the Beam model
+ * but a special feature currently only implemented by the direct runner.
+ */
+public interface UsesTestStream {}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d71924c/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
--
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index 64aeca3..c12e9f3 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -69,7 +69,7 @@ public class TestStreamTest implements Serializable {
   @Rule public transient ExpectedException thrown = ExpectedException.none();
 
   @Test
-  @Category(NeedsRunner.class)
+  @Category({NeedsRunner.class, UsesTestStream.class})
   public void testLateDataAccumulating() {
 Instant instant = new Instant(0);
 TestStream source = TestStream.create(VarIntCoder.of())
@@ -136,7 +136,7 @@ public class TestStreamTest implements Serializable {
   }
 
   @Test
-  @Category(NeedsRunner.class)
+  @Category({NeedsRunner.class, UsesTestStream.class})
   public void testProcessingTimeTrigger() {
 TestStream source = TestStream.create(VarLongCoder.of())
 .addElements(TimestampedValue.of(1L, new Instant(1000L)),
@@ -159,7 +159,7 @@ public class TestStreamTest implements Serializable {
   }
 
   @Test
-  @Category(NeedsRunner.class)
+  @Category({NeedsRunner.class, UsesTestStream.class})
   public void testDiscardingMode() {
 TestStream stream =
 TestStream.create(StringUtf8Coder.of())
@@ -208,7 +208,7 @@ public class TestStreamTest implements Serializable {
   }
 
   @Test
-  @Category(NeedsRunner.class)
+  @Category({NeedsRunner.class, UsesTestStream.class})
   public void testFirstElementLate() {
 Instant lateElementTimestamp = new Instant(-1_000_000);
 TestStream stream =
@@ -238,7 +238,7 @@ public class TestStreamTest implements Serializable {
   }
 
   @Test
-  @Category(NeedsRunner.class)
+  @Category({NeedsRunner.class, UsesTestStream.class})
   public void testElementsAtAlmostPositiveInfinity() {
 Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp();
 TestStream stream = TestStream.create(StringUtf8Coder.of())
@@ -261,7 +261,7 @@ public class TestStreamTest implements Serializable {
   }
 
   @Test
-  @Category(NeedsRunner.class)
+  

[2/7] incubator-beam git commit: Hold output watermark according to pending timers

2016-12-21 Thread kenn
Hold output watermark according to pending timers


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dfe2e62d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dfe2e62d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dfe2e62d

Branch: refs/heads/master
Commit: dfe2e62d103595583e3ca4594cc03885fe1bba16
Parents: 7f14c46
Author: Kenneth Knowles 
Authored: Tue Dec 20 13:37:40 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 13:45:37 2016 -0800

--
 .../beam/runners/direct/WatermarkManager.java   | 59 
 1 file changed, 48 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfe2e62d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index f7bafd1..248fafd 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -211,12 +211,18 @@ public class WatermarkManager {
   private static class AppliedPTransformInputWatermark implements Watermark {
 private final Collection inputWatermarks;
 private final SortedMultiset pendingElements;
-private final Map objectTimers;
+
+// This tracks only the quantity of timers at each timestamp, for quickly 
getting the cross-key
+// minimum
+private final SortedMultiset pendingTimers;
 
 // Entries in this table represent the authoritative timestamp for which
 // a per-key-and-StateNamespace timer is set.
 private final Map> existingTimers;
 
+// This per-key sorted set allows quick retrieval of timers that should 
fire for a key
+private final Map objectTimers;
+
 private AtomicReference currentWatermark;
 
 public AppliedPTransformInputWatermark(Collection 
inputWatermarks) {
@@ -224,10 +230,13 @@ public class WatermarkManager {
   // The ordering must order elements by timestamp, and must not compare 
two distinct elements
   // as equal. This is built on the assumption that any element added as a 
pending element will
   // be consumed without modifications.
+  //
+  // The same logic is applied for pending timers
   Ordering pendingBundleComparator =
   new 
BundleByElementTimestampComparator().compound(Ordering.arbitrary());
   this.pendingElements =
   TreeMultiset.create(pendingBundleComparator);
+  this.pendingTimers = TreeMultiset.create();
   this.objectTimers = new HashMap<>();
   this.existingTimers = new HashMap<>();
   currentWatermark = new 
AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
@@ -278,6 +287,14 @@ public class WatermarkManager {
   pendingElements.remove(completed);
 }
 
+private synchronized Instant getEarliestTimerTimestamp() {
+  if (pendingTimers.isEmpty()) {
+return BoundedWindow.TIMESTAMP_MAX_VALUE;
+  } else {
+return pendingTimers.firstEntry().getElement();
+  }
+}
+
 private synchronized void updateTimers(TimerUpdate update) {
   NavigableSet keyTimers = objectTimers.get(update.key);
   if (keyTimers == null) {
@@ -291,27 +308,43 @@ public class WatermarkManager {
 existingTimers.put(update.key, existingTimersForKey);
   }
 
-  for (TimerData timer : update.setTimers) {
+  for (TimerData timer : update.getSetTimers()) {
+if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
+  @Nullable
+  TimerData existingTimer =
+  existingTimersForKey.get(timer.getNamespace(), 
timer.getTimerId());
+
+  if (existingTimer == null) {
+pendingTimers.add(timer.getTimestamp());
+keyTimers.add(timer);
+  } else if (!existingTimer.equals(timer)) {
+keyTimers.remove(existingTimer);
+keyTimers.add(timer);
+  } // else the timer is already set identically, so noop
+
+  existingTimersForKey.put(timer.getNamespace(), timer.getTimerId(), 
timer);
+}
+  }
+
+  for (TimerData timer : update.getDeletedTimers()) {
 if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
   @Nullable
   TimerData existingTimer =
   existingTimersForKey.get(timer.getNamespace(), 
timer.getTimerId());
 
   

[3/7] incubator-beam git commit: Use informative Instant formatter in WatermarkHold

2016-12-21 Thread kenn
Use informative Instant formatter in WatermarkHold


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa4958a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa4958a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa4958a6

Branch: refs/heads/master
Commit: fa4958a6140eb00ceee08b2468f7d88f17538794
Parents: 280a6a8
Author: Kenneth Knowles 
Authored: Mon Dec 19 20:40:47 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 13:45:37 2016 -0800

--
 .../apache/beam/runners/core/WatermarkHold.java  |  4 +++-
 .../sdk/transforms/windowing/BoundedWindow.java  | 19 +++
 2 files changed, 22 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa4958a6/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
--
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 7f1afcc..5e5f44d 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -207,7 +207,9 @@ class WatermarkHold implements 
Serializable {
 Instant shifted = 
windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window);
 checkState(!shifted.isBefore(timestamp),
 "OutputTimeFn moved element from %s to earlier time %s for window %s",
-timestamp, shifted, window);
+BoundedWindow.formatTimestamp(timestamp),
+BoundedWindow.formatTimestamp(shifted),
+window);
 checkState(timestamp.isAfter(window.maxTimestamp())
 || !shifted.isAfter(window.maxTimestamp()),
 "OutputTimeFn moved element from %s to %s which is beyond end of "

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa4958a6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
index 6da2495..74223b5 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
@@ -34,11 +34,30 @@ import org.joda.time.Instant;
 public abstract class BoundedWindow {
   // The min and max timestamps that won't overflow when they are converted to
   // usec.
+
+  /**
+   * The minimum value for any Beam timestamp. Often referred to as 
"-infinity".
+   *
+   * This value and {@link #TIMESTAMP_MAX_VALUE} are chosen so that their
+   * microseconds-since-epoch can be safely represented with a {@code long}.
+   */
   public static final Instant TIMESTAMP_MIN_VALUE =
   new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
+
+  /**
+   * The maximum value for any Beam timestamp. Often referred to as 
"+infinity".
+   *
+   * This value and {@link #TIMESTAMP_MIN_VALUE} are chosen so that their
+   * microseconds-since-epoch can be safely represented with a {@code long}.
+   */
   public static final Instant TIMESTAMP_MAX_VALUE =
   new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
 
+  /**
+   * Formats a {@link Instant} timestamp with additional Beam-specific 
metadata, such as indicating
+   * whether the timestamp is the end of the global window or one of the 
distinguished values {@link
+   * #TIMESTAMP_MIN_VALUE} or {@link #TIMESTAMP_MIN_VALUE}.
+   */
   public static String formatTimestamp(Instant timestamp) {
 if (timestamp.equals(TIMESTAMP_MIN_VALUE)) {
   return timestamp.toString() + " (TIMESTAMP_MIN_VALUE)";



[7/7] incubator-beam git commit: This closes #1669: Preliminaries for timers in the direct runner

2016-12-21 Thread kenn
This closes #1669: Preliminaries for timers in the direct runner

  Hold output watermark according to pending timers
  Allow setting timer by ID in DirectTimerInternals
  Add UsesTestStream for use with JUnit @Category
  Add static Window.withOutputTimeFn to match build method
  Use informative Instant formatter in WatermarkHold
  Add informative Instant formatter to BoundedWindow


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/57d9bbd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/57d9bbd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/57d9bbd7

Branch: refs/heads/master
Commit: 57d9bbd797edfcf32fdd9284b802fc4f9694e8d2
Parents: ff39516 dfe2e62
Author: Kenneth Knowles 
Authored: Wed Dec 21 13:46:34 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 13:46:34 2016 -0800

--
 .../apache/beam/runners/core/WatermarkHold.java |  4 +-
 .../runners/direct/DirectTimerInternals.java|  2 +-
 .../beam/runners/direct/WatermarkManager.java   | 78 ++--
 .../apache/beam/sdk/testing/UsesTestStream.java | 24 ++
 .../sdk/transforms/windowing/BoundedWindow.java | 31 
 .../beam/sdk/transforms/windowing/Window.java   |  9 +++
 .../apache/beam/sdk/testing/TestStreamTest.java | 12 +--
 7 files changed, 144 insertions(+), 16 deletions(-)
--




[4/7] incubator-beam git commit: Allow setting timer by ID in DirectTimerInternals

2016-12-21 Thread kenn
Allow setting timer by ID in DirectTimerInternals


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7f14c463
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7f14c463
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7f14c463

Branch: refs/heads/master
Commit: 7f14c463acd2ae5b86ac81a9528ac4aa7dff765f
Parents: 4d71924
Author: Kenneth Knowles 
Authored: Wed Dec 7 20:18:44 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 13:45:37 2016 -0800

--
 .../runners/direct/DirectTimerInternals.java|  2 +-
 .../beam/runners/direct/WatermarkManager.java   | 25 
 2 files changed, 26 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f14c463/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
index 5ca276d..80e0721 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
@@ -49,7 +49,7 @@ class DirectTimerInternals implements TimerInternals {
   @Override
   public void setTimer(StateNamespace namespace, String timerId, Instant 
target,
   TimeDomain timeDomain) {
-throw new UnsupportedOperationException("Setting timer by ID not yet 
supported.");
+timerUpdateBuilder.setTimer(TimerData.of(timerId, namespace, target, 
timeDomain));
   }
 
   @Deprecated

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f14c463/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
--
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 7bed751..f7bafd1 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -23,11 +23,13 @@ import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.SortedMultiset;
+import com.google.common.collect.Table;
 import com.google.common.collect.TreeMultiset;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -56,6 +58,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TaggedPValue;
 import org.joda.time.Instant;
@@ -210,6 +213,10 @@ public class WatermarkManager {
 private final SortedMultiset pendingElements;
 private final Map objectTimers;
 
+// Entries in this table represent the authoritative timestamp for which
+// a per-key-and-StateNamespace timer is set.
+private final Map> existingTimers;
+
 private AtomicReference currentWatermark;
 
 public AppliedPTransformInputWatermark(Collection 
inputWatermarks) {
@@ -222,6 +229,7 @@ public class WatermarkManager {
   this.pendingElements =
   TreeMultiset.create(pendingBundleComparator);
   this.objectTimers = new HashMap<>();
+  this.existingTimers = new HashMap<>();
   currentWatermark = new 
AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
 }
 
@@ -276,14 +284,31 @@ public class WatermarkManager {
 keyTimers = new TreeSet<>();
 objectTimers.put(update.key, keyTimers);
   }
+  Table existingTimersForKey =
+  existingTimers.get(update.key);
+  if (existingTimersForKey == null) {
+existingTimersForKey = HashBasedTable.create();
+existingTimers.put(update.key, existingTimersForKey);
+  }
+

[1/7] incubator-beam git commit: Add informative Instant formatter to BoundedWindow

2016-12-21 Thread kenn
Repository: incubator-beam
Updated Branches:
  refs/heads/master ff3951699 -> 57d9bbd79


Add informative Instant formatter to BoundedWindow


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/280a6a8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/280a6a8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/280a6a8f

Branch: refs/heads/master
Commit: 280a6a8f729cb382616ad65f71860b61277cbd6f
Parents: ff39516
Author: Kenneth Knowles 
Authored: Mon Dec 19 20:40:11 2016 -0800
Committer: Kenneth Knowles 
Committed: Wed Dec 21 13:45:36 2016 -0800

--
 .../beam/sdk/transforms/windowing/BoundedWindow.java| 12 
 1 file changed, 12 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/280a6a8f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
index 3654074..6da2495 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java
@@ -39,6 +39,18 @@ public abstract class BoundedWindow {
   public static final Instant TIMESTAMP_MAX_VALUE =
   new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
 
+  public static String formatTimestamp(Instant timestamp) {
+if (timestamp.equals(TIMESTAMP_MIN_VALUE)) {
+  return timestamp.toString() + " (TIMESTAMP_MIN_VALUE)";
+} else if (timestamp.equals(TIMESTAMP_MAX_VALUE)) {
+  return timestamp.toString() + " (TIMESTAMP_MAX_VALUE)";
+} else if (timestamp.equals(GlobalWindow.INSTANCE.maxTimestamp())) {
+  return timestamp.toString() + " (end of global window)";
+} else {
+  return timestamp.toString();
+}
+  }
+
   /**
* Returns the inclusive upper bound of timestamps for values in this window.
*/



[jira] [Commented] (BEAM-1194) DataflowRunner should test a variety of valid tempLocation/stagingLocation/etc formats.

2016-12-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768284#comment-15768284
 ] 

ASF GitHub Bot commented on BEAM-1194:
--

Github user dhalperi closed the pull request at:

https://github.com/apache/incubator-beam/pull/1671


> DataflowRunner should test a variety of valid 
> tempLocation/stagingLocation/etc formats.
> ---
>
> Key: BEAM-1194
> URL: https://issues.apache.org/jira/browse/BEAM-1194
> Project: Beam
>  Issue Type: Test
>  Components: runner-dataflow
>Reporter: Daniel Halperin
>Assignee: Daniel Halperin
>Priority: Minor
>
> Cloud Dataflow has a minor history of small bugs related to various code 
> paths expecting there to be or not be a trailing forward-slash in these 
> location fields. The way that Beam's integration tests are set up, we are 
> likely to only have one of these two cases tested (there is a single set of 
> integration test pipeline options).
> We should add a dedicated DataflowRunner integration test to handle this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #1671: [BEAM-1194] Add DataflowLocationIT

2016-12-21 Thread dhalperi
Github user dhalperi closed the pull request at:

https://github.com/apache/incubator-beam/pull/1671


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1671: [BEAM-1194] Add DataflowLocationIT

2016-12-21 Thread dhalperi
GitHub user dhalperi reopened a pull request:

https://github.com/apache/incubator-beam/pull/1671

[BEAM-1194] Add DataflowLocationIT

Opening PR to test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dhalperi/incubator-beam trailing-slash

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1671.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1671


commit 7ad2ebd8e96a469e38c156de2d2701e500c3d955
Author: Dan Halperin 
Date:   2016-12-21T01:56:00Z

Add DataflowLocationIT




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1675: [Beam-1186] Migrating the remaining tests...

2016-12-21 Thread staslev
GitHub user staslev opened a pull request:

https://github.com/apache/incubator-beam/pull/1675

[Beam-1186] Migrating the remaining tests to use TestPipeline as a JUnit 
rule.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/staslev/incubator-beam 
BEAM-1186-migrating-remaining-tests-to-TestPipeline-rule

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1675.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1675


commit e6fd0b308583222b37e055476f34d020563893f5
Author: Stas Levin 
Date:   2016-12-21T15:58:35Z

[BEAM-1186] Broke AvroIOGeneratedClassTest into 2 parametrised test classes 
that support TestPipeline as a JUnit rule.

commit 52ca16451f807c026276dba9448d4c5db12c866e
Author: Stas Levin 
Date:   2016-12-21T21:00:39Z

[BEAM-1186] Broke ApproximateUniqueTest into 3 test classes that support 
TestPipeline as a JUnit rule.

commit d46f66d8c5f0f63f783561ba684dc003b3327b3a
Author: Stas Levin 
Date:   2016-12-21T21:21:11Z

[BEAM-1186] Broke SampleTest into 2 test classes that support TestPipeline 
as a JUnit rule.

commit 96da02a1c9014d186ce264c745e4fd48883dd4c3
Author: Stas Levin 
Date:   2016-12-21T21:32:27Z

[BEAM-1186] Migrated BigtableIOTest to use TestPipeline as a JUnit rule by 
stabilizing runReadTest's (read) transform name.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1639: [BEAM-1112] Python E2E Test Framework And...

2016-12-21 Thread markflyhigh
Github user markflyhigh closed the pull request at:

https://github.com/apache/incubator-beam/pull/1639


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (BEAM-1199) Condense recordAsOutput, finishSpecifyingOutput from POutput

2016-12-21 Thread Thomas Groh (JIRA)
Thomas Groh created BEAM-1199:
-

 Summary: Condense recordAsOutput, finishSpecifyingOutput from 
POutput
 Key: BEAM-1199
 URL: https://issues.apache.org/jira/browse/BEAM-1199
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-core
Reporter: Thomas Groh
Assignee: Davor Bonaci
Priority: Minor


{{recordAsOutput}} and {{finishSpecifyingOutput}} are both methods which are 
called after an output has been attached to a PTransform application. They can 
be combined to only have one method that does any after-production work (such 
as the initial run of Coder inference)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] incubator-beam pull request #1639: [BEAM-1112] Python E2E Test Framework And...

2016-12-21 Thread markflyhigh
GitHub user markflyhigh reopened a pull request:

https://github.com/apache/incubator-beam/pull/1639

[BEAM-1112] Python E2E Test Framework And Wordcount E2E Test

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

 - E2e test framework that supports TestRunner start and verify pipeline 
job.
   - add `TestOptions` which defined `on_success_matcher` that is used to 
verify state/output of pipeline job.
   - validate `on_success_matcher` before pipeline execution to make sure 
it's unpicklable to a subclass of BaseMatcher.
   - create a `TestDataflowRunner` which provide functionalities of 
`DataflowRunner` plus result verification.
   - provide a test verifier `PipelineStateMatcher` that can verify 
pipeline job finished in DONE or not.
 - Add wordcount_it (it = integration test) that build e2e test based on 
existing wordcount pipeline.
   - include wordcount_it to nose collector, so that wordcount_it can be 
collected and run by nose.
   - skip ITs when running unit tests from tox in precommit and postcommit.

Current changes will not change behavior of existing pre/postcommit.
Test is done by running `tox -e py27 -c sdks/python/tox.ini` for unit test 
and running wordcount_it with `TestDataflowRunner` on service 
([link](https://pantheon.corp.google.com/dataflow/job/2016-12-15_17_36_16-3857167705491723621?project=google.com:clouddfe)).

TODO:
 - Output data verifier that verify pipeline output that stores in 
filesystem.
 - Add wordcount_it to precommit and replace existing wordcount execution 
command in postcommit with a better structured nose command.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/markflyhigh/incubator-beam e2e-testrunner

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1639.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1639


commit e1e1fa3a60e1fe234829432d144d6689e240b6f0
Author: Mark Liu 
Date:   2016-12-16T01:41:20Z

[BEAM-1112] Python E2E Test Framework And Wordcount E2E Test

commit 0e7007879ee082e3afe5db36107f51c03274f3f5
Author: Mark Liu 
Date:   2016-12-16T02:55:53Z

fixup! Fix Code Style

commit d6d71a717e8ed7ab32ffa02621c837c797f66cd7
Author: Mark Liu 
Date:   2016-12-20T19:15:59Z

fixup! Address Ahmet comments

commit 669077fff2032cf5d48e891a097f3e33bef75679
Author: Mark Liu 
Date:   2016-12-20T22:18:52Z

Add Hamcrest To Tox For autocomplete_test Execution

commit 601956fba1bee486e1aadc4fd1193848867c50d6
Author: Mark Liu 
Date:   2016-12-21T19:16:22Z

fixup! Fix Unit Test




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (BEAM-846) Decouple side input window mapping from WindowFn

2016-12-21 Thread Kenneth Knowles (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15613384#comment-15613384
 ] 

Kenneth Knowles edited comment on BEAM-846 at 12/21/16 9:19 PM:


Design 1-pager is https://s.apache.org/beam-windowmappingfn-1-pager


was (Author: kenn):
Design 1-pager is https://s.apache.org/beam-side-inputs-1-pager and a couple 
PRs have been authored 
([#520|https://github.com/apache/incubator-beam/pull/520] and 
[#1076|https://github.com/apache/incubator-beam/pull/1076]) attributed to 
BEAM-115 (the "all of the Runner API" ticket)


> Decouple side input window mapping from WindowFn
> 
>
> Key: BEAM-846
> URL: https://issues.apache.org/jira/browse/BEAM-846
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model-runner-api, sdk-java-core
>Reporter: Robert Bradshaw
>Assignee: Kenneth Knowles
>  Labels: backward-incompatible
>
> Currently the main WindowFn provides as getSideInputWindow method. Instead, 
> this mapping should be specified per-side-input (thought the default mapping 
> would remain the same). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-210) Allow control of empty ON_TIME panes analogous to final panes

2016-12-21 Thread Kenneth Knowles (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth Knowles updated BEAM-210:
-
Component/s: (was: beam-model)

> Allow control of empty ON_TIME panes analogous to final panes
> -
>
> Key: BEAM-210
> URL: https://issues.apache.org/jira/browse/BEAM-210
> Project: Beam
>  Issue Type: Bug
>  Components: beam-model-runner-api, sdk-java-core
>Reporter: Mark Shields
>Assignee: Thomas Groh
>
> Today, ON_TIME panes are emitted whether or not they are empty. We had 
> decided that for final panes the user would want to control this behavior, to 
> control data volume. But for ON_TIME panes no such control exists. The 
> rationale is perhaps that the ON_TIME pane is a fundamental result that 
> should not be elided. To be considered: whether this is what we want.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-260) WindowMappingFn: Know the getSideInputWindow upper bound to release side input resources

2016-12-21 Thread Kenneth Knowles (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth Knowles updated BEAM-260:
-
Component/s: (was: beam-model)
 beam-model-fn-api

> WindowMappingFn: Know the getSideInputWindow upper bound to release side 
> input resources
> 
>
> Key: BEAM-260
> URL: https://issues.apache.org/jira/browse/BEAM-260
> Project: Beam
>  Issue Type: Bug
>  Components: beam-model-fn-api, beam-model-runner-api
>Reporter: Mark Shields
>Assignee: Kenneth Knowles
>
> We currently have no static knowledge about the getSideInputWindow function, 
> and runners are thus forced to hold on to all side input state / elements in 
> case a future element reaches back into an earlier side input element.
> Maybe we need an upper bound on lag from current to result of 
> getSideInputWindow so we can have a progressing gc horizon as we do for  GKB 
> window state. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-846) Decouple side input window mapping from WindowFn

2016-12-21 Thread Kenneth Knowles (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth Knowles updated BEAM-846:
-
Component/s: (was: beam-model)

> Decouple side input window mapping from WindowFn
> 
>
> Key: BEAM-846
> URL: https://issues.apache.org/jira/browse/BEAM-846
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model-runner-api, sdk-java-core
>Reporter: Robert Bradshaw
>Assignee: Kenneth Knowles
>  Labels: backward-incompatible
>
> Currently the main WindowFn provides as getSideInputWindow method. Instead, 
> this mapping should be specified per-side-input (thought the default mapping 
> would remain the same). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-1193) Give Coders URNs and document their binary formats outside the Java code base

2016-12-21 Thread Kenneth Knowles (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth Knowles updated BEAM-1193:
--
Component/s: (was: beam-model)

> Give Coders URNs and document their binary formats outside the Java code base
> -
>
> Key: BEAM-1193
> URL: https://issues.apache.org/jira/browse/BEAM-1193
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model-runner-api
>Reporter: Kenneth Knowles
>Assignee: Frances Perry
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (BEAM-653) Refine specification for WindowFn.isCompatible()

2016-12-21 Thread Kenneth Knowles (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth Knowles updated BEAM-653:
-
Component/s: beam-model-runner-api

> Refine specification for WindowFn.isCompatible() 
> -
>
> Key: BEAM-653
> URL: https://issues.apache.org/jira/browse/BEAM-653
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, beam-model-runner-api
>Reporter: Kenneth Knowles
>
> {{WindowFn#isCompatible}} doesn't really have a spec. In practice, it is used 
> primarily when flattening together multiple PCollections. All of the 
> WindowFns must be compatible, and then just a single WindowFn is selected 
> arbitrarily for the output PCollection.
> In consequence, downstream of the Flatten, the merging behavior will be taken 
> from this WindowFn.
> Currently, there are some mismatches:
>  - Sessions with different gap durations _are_ compatible today, but probably 
> shouldn't be since merging makes little sense. (The use of tiny proto-windows 
> is an implementation detail anyhow)
>  - SlidingWindows and FixedWindows _could_ reasonably be compatible if they 
> had the same duration, though it might be odd.
> Either way, we should just nail down what we actually mean so we can arrive 
> at a verdict in these cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   >