[jira] [Commented] (BEAM-469) NullableCoder optimized encoding via passthrough context
[ https://issues.apache.org/jira/browse/BEAM-469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768525#comment-15768525 ] ASF GitHub Bot commented on BEAM-469: - GitHub user dhalperi opened a pull request: https://github.com/apache/incubator-beam/pull/1680 [BEAM-XXX] Make KVCoder more efficient by removing unnecessary nesting See [BEAM-469] for more information about why this is correct. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dhalperi/incubator-beam efficient-nested-coders Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1680.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1680 commit 621e8250c9535d773c4f4440a34ea0833912b51f Author: Dan HalperinDate: 2016-12-21T23:37:49Z [BEAM-XXX] Make KVCoder more efficient by removing unnecessary nesting See [BEAM-469] for more information about why this is correct. > NullableCoder optimized encoding via passthrough context > > > Key: BEAM-469 > URL: https://issues.apache.org/jira/browse/BEAM-469 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Luke Cwik >Assignee: Thomas Groh >Priority: Trivial > Labels: backward-incompatible > Fix For: 0.3.0-incubating > > > NullableCoder should encode using the context given and not always use the > nested context. For coders which can efficiently encode in the outer context > such as StringUtf8Coder or ByteArrayCoder, we are forcing them to prefix > themselves with their length. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request #1680: [BEAM-XXX] Make KVCoder more efficient by...
GitHub user dhalperi opened a pull request: https://github.com/apache/incubator-beam/pull/1680 [BEAM-XXX] Make KVCoder more efficient by removing unnecessary nesting See [BEAM-469] for more information about why this is correct. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dhalperi/incubator-beam efficient-nested-coders Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1680.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1680 commit 621e8250c9535d773c4f4440a34ea0833912b51f Author: Dan HalperinDate: 2016-12-21T23:37:49Z [BEAM-XXX] Make KVCoder more efficient by removing unnecessary nesting See [BEAM-469] for more information about why this is correct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (BEAM-1201) Remove producesSortedKeys from Source
[ https://issues.apache.org/jira/browse/BEAM-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768483#comment-15768483 ] ASF GitHub Bot commented on BEAM-1201: -- GitHub user dhalperi opened a pull request: https://github.com/apache/incubator-beam/pull/1679 [BEAM-1201] Remove BoundedSource.producesSortedKeys R: @jkff You can merge this pull request into a Git repository by running: $ git pull https://github.com/dhalperi/incubator-beam remove-produces-sorted-keys Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1679.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1679 commit ee15138543f8b9926466cf4e4dc6857b3173345e Author: Dan HalperinDate: 2016-12-21T23:32:38Z [BEAM-1201] Remove BoundedSource.producesSortedKeys Unused and unclear; for more information see the linked JIRA. > Remove producesSortedKeys from Source > - > > Key: BEAM-1201 > URL: https://issues.apache.org/jira/browse/BEAM-1201 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Daniel Halperin >Assignee: Daniel Halperin >Priority: Minor > Labels: backward-incompatible > > This is a holdover from a precursor of the old Dataflow SDK that we just > failed to delete before releasing Dataflow 1.0, but we can delete before the > first stable release of Beam. > This function has never been used by any runner. It does not mean anything > obvious to implementors, as many sources produce {{T}}, not {{KV }} -- > what does it mean in the former case? (And how do you get the latter case > correct?) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-1201) Remove producesSortedKeys from Source
[ https://issues.apache.org/jira/browse/BEAM-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Halperin updated BEAM-1201: -- Labels: backward-incompatible (was: ) > Remove producesSortedKeys from Source > - > > Key: BEAM-1201 > URL: https://issues.apache.org/jira/browse/BEAM-1201 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Daniel Halperin >Assignee: Daniel Halperin >Priority: Minor > Labels: backward-incompatible > > This is a holdover from a precursor of the old Dataflow SDK that we just > failed to delete before releasing Dataflow 1.0, but we can delete before the > first stable release of Beam. > This function has never been used by any runner. It does not mean anything > obvious to implementors, as many sources produce {{T}}, not {{KV}} -- > what does it mean in the former case? (And how do you get the latter case > correct?) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-1201) Remove producesSortedKeys from BoundedSource
[ https://issues.apache.org/jira/browse/BEAM-1201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Halperin updated BEAM-1201: -- Summary: Remove producesSortedKeys from BoundedSource (was: Remove producesSortedKeys from Source) > Remove producesSortedKeys from BoundedSource > > > Key: BEAM-1201 > URL: https://issues.apache.org/jira/browse/BEAM-1201 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Daniel Halperin >Assignee: Daniel Halperin >Priority: Minor > Labels: backward-incompatible > > This is a holdover from a precursor of the old Dataflow SDK that we just > failed to delete before releasing Dataflow 1.0, but we can delete before the > first stable release of Beam. > This function has never been used by any runner. It does not mean anything > obvious to implementors, as many sources produce {{T}}, not {{KV}} -- > what does it mean in the former case? (And how do you get the latter case > correct?) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request #1679: [BEAM-1201] Remove BoundedSource.produces...
GitHub user dhalperi opened a pull request: https://github.com/apache/incubator-beam/pull/1679 [BEAM-1201] Remove BoundedSource.producesSortedKeys R: @jkff You can merge this pull request into a Git repository by running: $ git pull https://github.com/dhalperi/incubator-beam remove-produces-sorted-keys Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1679.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1679 commit ee15138543f8b9926466cf4e4dc6857b3173345e Author: Dan HalperinDate: 2016-12-21T23:32:38Z [BEAM-1201] Remove BoundedSource.producesSortedKeys Unused and unclear; for more information see the linked JIRA. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (BEAM-469) NullableCoder optimized encoding via passthrough context
[ https://issues.apache.org/jira/browse/BEAM-469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768476#comment-15768476 ] Daniel Halperin commented on BEAM-469: -- Sorry I missed this JIRA comment, [~mariusz89016]! A bit late, but... Say a coder C does not have the nested context. Then we actually have the guarantee that no one will put later elements. So if {{NullableCoder}} does not have the nested context, then no one will put more elements after whatever the {{NullableCoder}} puts. If the NC puts {{0}} then that's it -- the element is null. But if the NC puts {{1}}, then we know that all remaining bytes in the encoded string belong to the inner coder. That is effectively saying that the inner coder also does not need to have the nested context, so it does not need to write its own length. In your example, the {{NullableCoder}} is used in an inner context. So the inner coder needs to also use the inner context, because there may be more encoded elements later. In either case: the context of the nullable coder can be the same as the context of the inner coder. This is why in the patch here, we simply pass the NC's context down into the inner coder. All we have removed is the _additional_ nesting that was used. https://patch-diff.githubusercontent.com/raw/apache/incubator-beam/pull/992.patch > NullableCoder optimized encoding via passthrough context > > > Key: BEAM-469 > URL: https://issues.apache.org/jira/browse/BEAM-469 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Luke Cwik >Assignee: Thomas Groh >Priority: Trivial > Labels: backward-incompatible > Fix For: 0.3.0-incubating > > > NullableCoder should encode using the context given and not always use the > nested context. For coders which can efficiently encode in the outer context > such as StringUtf8Coder or ByteArrayCoder, we are forcing them to prefix > themselves with their length. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-646) Get runners out of the apply()
[ https://issues.apache.org/jira/browse/BEAM-646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768470#comment-15768470 ] ASF GitHub Bot commented on BEAM-646: - Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/1582 > Get runners out of the apply() > -- > > Key: BEAM-646 > URL: https://issues.apache.org/jira/browse/BEAM-646 > Project: Beam > Issue Type: Improvement > Components: beam-model-runner-api, sdk-java-core >Reporter: Kenneth Knowles >Assignee: Thomas Groh > > Right now, the runner intercepts calls to apply() and replaces transforms as > we go. This means that there is no "original" user graph. For portability and > misc architectural benefits, we would like to build the original graph first, > and have the runner override later. > Some runners already work in this manner, but we could integrate it more > smoothly, with more validation, via some handy APIs on e.g. the Pipeline > object. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request #1582: [BEAM-646] Add Parameters to finishSpecif...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/1582 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/3] incubator-beam git commit: Add Parameters to finishSpecifying
Repository: incubator-beam Updated Branches: refs/heads/master 57d9bbd79 -> 7984fe3fc Add Parameters to finishSpecifying Remove the need to use getProducingTransformInternal in TypedPValue. Ensure that all nodes are finished specifying before a call to TransformHierarchy#visit. This ensures that all nodes are fully specified without requiring the Pipeline or Runner to do so explicitly. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/038950df Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/038950df Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/038950df Branch: refs/heads/master Commit: 038950df02fa553cbb91f57978efe125a9ebc80f Parents: b053be4 Author: Thomas GrohAuthored: Thu Dec 8 14:33:36 2016 -0800 Committer: Thomas Groh Committed: Wed Dec 21 15:26:17 2016 -0800 -- .../translation/ParDoBoundTranslatorTest.java | 32 .../beam/runners/direct/DirectGraphVisitor.java | 21 -- .../beam/runners/direct/DirectRunner.java | 1 - .../runners/direct/DirectGraphVisitorTest.java | 32 +--- .../direct/FlattenEvaluatorFactoryTest.java | 2 + .../direct/KeyedPValueTrackingVisitorTest.java | 17 - .../beam/runners/spark/ForceStreamingTest.java | 2 - .../main/java/org/apache/beam/sdk/Pipeline.java | 3 + .../beam/sdk/runners/TransformHierarchy.java| 45 ++- .../transforms/join/KeyedPCollectionTuple.java | 32 .../java/org/apache/beam/sdk/values/PBegin.java | 5 -- .../apache/beam/sdk/values/PCollectionList.java | 13 +--- .../beam/sdk/values/PCollectionTuple.java | 13 +--- .../java/org/apache/beam/sdk/values/PInput.java | 9 --- .../org/apache/beam/sdk/values/POutput.java | 20 ++--- .../beam/sdk/values/POutputValueBase.java | 4 +- .../java/org/apache/beam/sdk/values/PValue.java | 15 .../org/apache/beam/sdk/values/PValueBase.java | 3 +- .../org/apache/beam/sdk/values/TypedPValue.java | 78 +++- .../sdk/runners/TransformHierarchyTest.java | 34 + .../apache/beam/sdk/transforms/ParDoTest.java | 7 +- .../apache/beam/sdk/values/TypedPValueTest.java | 7 +- 22 files changed, 185 insertions(+), 210 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/038950df/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java -- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java index fa94b2a..f88a94d 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslatorTest.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; @@ -286,21 +287,22 @@ public class ParDoBoundTranslatorTest { Arrays.asList(sideInput1, sideInput2), Arrays. asList(; - outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector())); - ApexRunnerResult result = (ApexRunnerResult) pipeline.run(); - - HashSet expected = Sets.newHashSet("processing: 3: [11, 222]", - "processing: -42: [11, 222]", "processing: 666: [11, 222]"); - long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS; - while (System.currentTimeMillis() < timeout) { - if (EmbeddedCollector.RESULTS.containsAll(expected)) { - break; - } - LOG.info("Waiting for expected results."); - Thread.sleep(SLEEP_MILLIS); - } - result.cancel(); - Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); +outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector())); +outputs.get(sideOutputTag).setCoder(VoidCoder.of()); +ApexRunnerResult result = (ApexRunnerResult) pipeline.run(); + +HashSet expected = Sets.newHashSet("processing: 3: [11, 222]", +"processing: -42: [11, 222]", "processing: 666: [11, 222]"); +long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS; +while (System.currentTimeMillis() < timeout) { + if (EmbeddedCollector.RESULTS.containsAll(expected)) { +break; + } + LOG.info("Waiting for expected
[2/3] incubator-beam git commit: Use CountingSource in ForceStreamingTest
Use CountingSource in ForceStreamingTest Removes the requirement to have a FakeUnboundedSource, plus the read is fully specified. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b053be46 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b053be46 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b053be46 Branch: refs/heads/master Commit: b053be460c2e6ff486faed1b1a0996af63f93db2 Parents: 57d9bbd Author: Thomas GrohAuthored: Tue Dec 20 14:23:21 2016 -0800 Committer: Thomas Groh Committed: Wed Dec 21 15:26:17 2016 -0800 -- .../beam/runners/spark/ForceStreamingTest.java | 39 +--- 1 file changed, 2 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b053be46/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java -- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java index eb17eea..1b2ff08 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ForceStreamingTest.java @@ -23,10 +23,9 @@ import static org.hamcrest.MatcherAssert.assertThat; import java.io.IOException; import java.util.List; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource; +import org.apache.beam.sdk.io.CountingSource; import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.TransformHierarchy; @@ -59,7 +58,7 @@ public class ForceStreamingTest { // apply the BoundedReadFromUnboundedSource. @SuppressWarnings("unchecked") BoundedReadFromUnboundedSource boundedRead = -Read.from(new FakeUnboundedSource()).withMaxNumRecords(-1); +Read.from(CountingSource.unbounded()).withMaxNumRecords(-1); //noinspection unchecked pipeline.apply(boundedRead); @@ -86,38 +85,4 @@ public class ForceStreamingTest { } } - - /** - * A fake {@link UnboundedSource} to satisfy the compiler. - */ - private static class FakeUnboundedSource extends UnboundedSource { - -@Override -public List generateInitialSplits( -int desiredNumSplits, -PipelineOptions options) throws Exception { - return null; -} - -@Override -public UnboundedReader createReader( -PipelineOptions options, -CheckpointMark checkpointMark) throws IOException { - return null; -} - -@Override -public Coder getCheckpointMarkCoder() { - return null; -} - -@Override -public void validate() { } - -@Override -public Coder getDefaultOutputCoder() { - return null; -} - } - }
[3/3] incubator-beam git commit: This closes #1582
This closes #1582 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7984fe3f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7984fe3f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7984fe3f Branch: refs/heads/master Commit: 7984fe3fc20160d2286433434190f35658aef158 Parents: 57d9bbd 038950d Author: Thomas GrohAuthored: Wed Dec 21 15:26:18 2016 -0800 Committer: Thomas Groh Committed: Wed Dec 21 15:26:18 2016 -0800 -- .../translation/ParDoBoundTranslatorTest.java | 32 .../beam/runners/direct/DirectGraphVisitor.java | 21 -- .../beam/runners/direct/DirectRunner.java | 1 - .../runners/direct/DirectGraphVisitorTest.java | 32 +--- .../direct/FlattenEvaluatorFactoryTest.java | 2 + .../direct/KeyedPValueTrackingVisitorTest.java | 17 - .../beam/runners/spark/ForceStreamingTest.java | 41 +- .../main/java/org/apache/beam/sdk/Pipeline.java | 3 + .../beam/sdk/runners/TransformHierarchy.java| 45 ++- .../transforms/join/KeyedPCollectionTuple.java | 32 .../java/org/apache/beam/sdk/values/PBegin.java | 5 -- .../apache/beam/sdk/values/PCollectionList.java | 13 +--- .../beam/sdk/values/PCollectionTuple.java | 13 +--- .../java/org/apache/beam/sdk/values/PInput.java | 9 --- .../org/apache/beam/sdk/values/POutput.java | 20 ++--- .../beam/sdk/values/POutputValueBase.java | 4 +- .../java/org/apache/beam/sdk/values/PValue.java | 15 .../org/apache/beam/sdk/values/PValueBase.java | 3 +- .../org/apache/beam/sdk/values/TypedPValue.java | 78 +++- .../sdk/runners/TransformHierarchyTest.java | 34 + .../apache/beam/sdk/transforms/ParDoTest.java | 7 +- .../apache/beam/sdk/values/TypedPValueTest.java | 7 +- 22 files changed, 187 insertions(+), 247 deletions(-) --
[jira] [Commented] (BEAM-1112) Python E2E Integration Test Framework - Batch Only
[ https://issues.apache.org/jira/browse/BEAM-1112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768461#comment-15768461 ] ASF GitHub Bot commented on BEAM-1112: -- Github user markflyhigh closed the pull request at: https://github.com/apache/incubator-beam/pull/1639 > Python E2E Integration Test Framework - Batch Only > -- > > Key: BEAM-1112 > URL: https://issues.apache.org/jira/browse/BEAM-1112 > Project: Beam > Issue Type: Task > Components: sdk-py, testing >Reporter: Mark Liu >Assignee: Mark Liu > > Parity with Java. > Build e2e integration test framework that can configure and run batch > pipeline with specified test runner, wait for pipeline execution and verify > results with given verifiers in the end. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-1112) Python E2E Integration Test Framework - Batch Only
[ https://issues.apache.org/jira/browse/BEAM-1112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768462#comment-15768462 ] ASF GitHub Bot commented on BEAM-1112: -- GitHub user markflyhigh reopened a pull request: https://github.com/apache/incubator-beam/pull/1639 [BEAM-1112] Python E2E Test Framework And Wordcount E2E Test Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [x] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- - E2e test framework that supports TestRunner start and verify pipeline job. - add `TestOptions` which defined `on_success_matcher` that is used to verify state/output of pipeline job. - validate `on_success_matcher` before pipeline execution to make sure it's unpicklable to a subclass of BaseMatcher. - create a `TestDataflowRunner` which provide functionalities of `DataflowRunner` plus result verification. - provide a test verifier `PipelineStateMatcher` that can verify pipeline job finished in DONE or not. - Add wordcount_it (it = integration test) that build e2e test based on existing wordcount pipeline. - include wordcount_it to nose collector, so that wordcount_it can be collected and run by nose. - skip ITs when running unit tests from tox in precommit and postcommit. Current changes will not change behavior of existing pre/postcommit. Test is done by running `tox -e py27 -c sdks/python/tox.ini` for unit test and running wordcount_it with `TestDataflowRunner` on service ([link](https://pantheon.corp.google.com/dataflow/job/2016-12-15_17_36_16-3857167705491723621?project=google.com:clouddfe)). TODO: - Output data verifier that verify pipeline output that stores in filesystem. - Add wordcount_it to precommit and replace existing wordcount execution command in postcommit with a better structured nose command. You can merge this pull request into a Git repository by running: $ git pull https://github.com/markflyhigh/incubator-beam e2e-testrunner Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1639.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1639 > Python E2E Integration Test Framework - Batch Only > -- > > Key: BEAM-1112 > URL: https://issues.apache.org/jira/browse/BEAM-1112 > Project: Beam > Issue Type: Task > Components: sdk-py, testing >Reporter: Mark Liu >Assignee: Mark Liu > > Parity with Java. > Build e2e integration test framework that can configure and run batch > pipeline with specified test runner, wait for pipeline execution and verify > results with given verifiers in the end. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request #1639: [BEAM-1112] Python E2E Test Framework And...
GitHub user markflyhigh reopened a pull request: https://github.com/apache/incubator-beam/pull/1639 [BEAM-1112] Python E2E Test Framework And Wordcount E2E Test Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [x] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- - E2e test framework that supports TestRunner start and verify pipeline job. - add `TestOptions` which defined `on_success_matcher` that is used to verify state/output of pipeline job. - validate `on_success_matcher` before pipeline execution to make sure it's unpicklable to a subclass of BaseMatcher. - create a `TestDataflowRunner` which provide functionalities of `DataflowRunner` plus result verification. - provide a test verifier `PipelineStateMatcher` that can verify pipeline job finished in DONE or not. - Add wordcount_it (it = integration test) that build e2e test based on existing wordcount pipeline. - include wordcount_it to nose collector, so that wordcount_it can be collected and run by nose. - skip ITs when running unit tests from tox in precommit and postcommit. Current changes will not change behavior of existing pre/postcommit. Test is done by running `tox -e py27 -c sdks/python/tox.ini` for unit test and running wordcount_it with `TestDataflowRunner` on service ([link](https://pantheon.corp.google.com/dataflow/job/2016-12-15_17_36_16-3857167705491723621?project=google.com:clouddfe)). TODO: - Output data verifier that verify pipeline output that stores in filesystem. - Add wordcount_it to precommit and replace existing wordcount execution command in postcommit with a better structured nose command. You can merge this pull request into a Git repository by running: $ git pull https://github.com/markflyhigh/incubator-beam e2e-testrunner Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1639.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1639 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1639: [BEAM-1112] Python E2E Test Framework And...
Github user markflyhigh closed the pull request at: https://github.com/apache/incubator-beam/pull/1639 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: Add support for date partitioned table names
Repository: incubator-beam Updated Branches: refs/heads/python-sdk bb09c07b6 -> 409d067b3 Add support for date partitioned table names These names have the format "tablename$mmdd". Previously the dollar sign caused this to be deemed invalid. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1af871a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1af871a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1af871a Branch: refs/heads/python-sdk Commit: a1af871a0c8c92a6d84f2e9950615f7737118d7e Parents: bb09c07 Author: Kevin GraneyAuthored: Tue Dec 6 15:09:42 2016 -0500 Committer: Robert Bradshaw Committed: Wed Dec 21 15:16:45 2016 -0800 -- sdks/python/apache_beam/io/bigquery.py | 6 -- sdks/python/apache_beam/io/bigquery_test.py | 8 2 files changed, 12 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1af871a/sdks/python/apache_beam/io/bigquery.py -- diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index ce75e10..2059de4 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -275,7 +275,9 @@ def _parse_table_reference(table, dataset=None, project=None): then the table argument must contain the entire table reference: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'. This argument can be a bigquery.TableReference instance in which case dataset and project are - ignored and the reference is returned as a result. + ignored and the reference is returned as a result. Additionally, for date + partitioned tables, appending '$mmdd' to the table name is supported, + e.g. 'DATASET.TABLE$mmdd'. dataset: The ID of the dataset containing this table or null if the table reference is specified entirely by the table argument. project: The ID of the project containing this table or null if the table @@ -300,7 +302,7 @@ def _parse_table_reference(table, dataset=None, project=None): # table name. if dataset is None: match = re.match( -r'^((?P.+):)?(?P\w+)\.(?P\w+)$', table) +r'^((?P.+):)?(?P\w+)\.(?P[\w\$]+)$', table) if not match: raise ValueError( 'Expected a table reference (PROJECT:DATASET.TABLE or ' http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1af871a/sdks/python/apache_beam/io/bigquery_test.py -- diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py index a2cf947..f6f9363 100644 --- a/sdks/python/apache_beam/io/bigquery_test.py +++ b/sdks/python/apache_beam/io/bigquery_test.py @@ -208,6 +208,14 @@ class TestBigQuerySource(unittest.TestCase): self.assertEqual(source.query, 'my_query') self.assertIsNone(source.table_reference) + def test_date_partitioned_table_name(self): +source = beam.io.BigQuerySource('dataset.table$20030102', validate=True) +dd = DisplayData.create_from(source) +expected_items = [ +DisplayDataItemMatcher('validation', True), +DisplayDataItemMatcher('table', 'dataset.table$20030102')] +hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + class TestBigQuerySink(unittest.TestCase):
[2/2] incubator-beam git commit: Closes #1534
Closes #1534 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/409d067b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/409d067b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/409d067b Branch: refs/heads/python-sdk Commit: 409d067b36036981e330a055b652bb74a93f4ca2 Parents: bb09c07 a1af871 Author: Robert BradshawAuthored: Wed Dec 21 15:16:46 2016 -0800 Committer: Robert Bradshaw Committed: Wed Dec 21 15:16:46 2016 -0800 -- sdks/python/apache_beam/io/bigquery.py | 6 -- sdks/python/apache_beam/io/bigquery_test.py | 8 2 files changed, 12 insertions(+), 2 deletions(-) --
[1/2] incubator-beam git commit: Fixing inconsistencies in PipelineOptions
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 3454d691f -> bb09c07b6 Fixing inconsistencies in PipelineOptions The following options have changed: * job_name - Default is 'beamapp-username-date-microseconds'. Test was added. * staging_location and temp_location - staging_location was the default of temp_location. Now it's the other way around, and the tests reflect that. * machine_type alias of worker_machine_type has been removed. * disk_type alias of worker_disk_type has been removed. * disk_source_image option has been removed. * no_save_main_session option has been removed. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/35e2fdc7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/35e2fdc7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/35e2fdc7 Branch: refs/heads/python-sdk Commit: 35e2fdc7f22f20d74a569e86ced931209661dec1 Parents: 3454d69 Author: PabloAuthored: Tue Dec 6 18:01:54 2016 -0800 Committer: Robert Bradshaw Committed: Wed Dec 21 15:14:52 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 45 .../apache_beam/internal/apiclient_test.py | 6 +++ sdks/python/apache_beam/utils/options.py| 33 ++ .../utils/pipeline_options_validator.py | 11 ++--- .../utils/pipeline_options_validator_test.py| 8 ++-- 5 files changed, 54 insertions(+), 49 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35e2fdc7/sdks/python/apache_beam/internal/apiclient.py -- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index f1341a7..3a9ba46 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -18,6 +18,8 @@ """Dataflow client utility functions.""" import codecs +from datetime import datetime +import getpass import json import logging import os @@ -46,10 +48,6 @@ from apache_beam.utils.options import WorkerOptions from apache_beam.internal.clients import storage import apache_beam.internal.clients.dataflow as dataflow -BIGQUERY_API_SERVICE = 'bigquery.googleapis.com' -COMPUTE_API_SERVICE = 'compute.googleapis.com' -STORAGE_API_SERVICE = 'storage.googleapis.com' - class Step(object): """Wrapper for a dataflow Step protobuf.""" @@ -121,11 +119,13 @@ class Environment(object): self.worker_options = options.view_as(WorkerOptions) self.debug_options = options.view_as(DebugOptions) self.proto = dataflow.Environment() -self.proto.clusterManagerApiService = COMPUTE_API_SERVICE -self.proto.dataset = '%s/cloud_dataflow' % BIGQUERY_API_SERVICE +self.proto.clusterManagerApiService = GoogleCloudOptions.COMPUTE_API_SERVICE +self.proto.dataset = '{}/cloud_dataflow'.format( +GoogleCloudOptions.BIGQUERY_API_SERVICE) self.proto.tempStoragePrefix = ( -self.google_cloud_options.temp_location.replace('gs:/', -STORAGE_API_SERVICE)) +self.google_cloud_options.temp_location.replace( +'gs:/', +GoogleCloudOptions.STORAGE_API_SERVICE)) # User agent information. self.proto.userAgent = dataflow.Environment.UserAgentValue() self.local = 'localhost' in self.google_cloud_options.dataflow_endpoint @@ -165,7 +165,7 @@ class Environment(object): dataflow.Package( location='%s/%s' % ( self.google_cloud_options.staging_location.replace( - 'gs:/', STORAGE_API_SERVICE), + 'gs:/', GoogleCloudOptions.STORAGE_API_SERVICE), package), name=package)) @@ -174,7 +174,7 @@ class Environment(object): packages=package_descriptors, taskrunnerSettings=dataflow.TaskRunnerSettings( parallelWorkerSettings=dataflow.WorkerSettings( -baseUrl='https://dataflow.googleapis.com', +baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT, servicePath=self.google_cloud_options.dataflow_endpoint))) pool.autoscalingSettings = dataflow.AutoscalingSettings() # Set worker pool options received through command line. @@ -195,8 +195,6 @@ class Environment(object): pool.diskSizeGb = self.worker_options.disk_size_gb if self.worker_options.disk_type: pool.diskType = self.worker_options.disk_type -if self.worker_options.disk_source_image: - pool.diskSourceImage = self.worker_options.disk_source_image if self.worker_options.zone: pool.zone = self.worker_options.zone if
[2/2] incubator-beam git commit: Closes #1526
Closes #1526 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bb09c07b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bb09c07b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bb09c07b Branch: refs/heads/python-sdk Commit: bb09c07b6351dcc53c0bdc8bf1259261ad2edfba Parents: 3454d69 35e2fdc Author: Robert BradshawAuthored: Wed Dec 21 15:15:20 2016 -0800 Committer: Robert Bradshaw Committed: Wed Dec 21 15:15:20 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 45 .../apache_beam/internal/apiclient_test.py | 6 +++ sdks/python/apache_beam/utils/options.py| 33 ++ .../utils/pipeline_options_validator.py | 11 ++--- .../utils/pipeline_options_validator_test.py| 8 ++-- 5 files changed, 54 insertions(+), 49 deletions(-) --
[jira] [Created] (BEAM-1202) Coders should have meaningful equals methods
Thomas Groh created BEAM-1202: - Summary: Coders should have meaningful equals methods Key: BEAM-1202 URL: https://issues.apache.org/jira/browse/BEAM-1202 Project: Beam Issue Type: Bug Components: sdk-java-core Reporter: Thomas Groh Assignee: Davor Bonaci {{StandardCoder}} implements an equality check based on the component coders and equal classes. Any coder that is configured, or that does not extend {{StandardCoder}}, should have meaningful implementations of {{equals}} and {{hashCode}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-1202) Coders should have meaningful equals methods
[ https://issues.apache.org/jira/browse/BEAM-1202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Groh updated BEAM-1202: -- Assignee: (was: Davor Bonaci) > Coders should have meaningful equals methods > > > Key: BEAM-1202 > URL: https://issues.apache.org/jira/browse/BEAM-1202 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Thomas Groh > > {{StandardCoder}} implements an equality check based on the component coders > and equal classes. Any coder that is configured, or that does not extend > {{StandardCoder}}, should have meaningful implementations of {{equals}} and > {{hashCode}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-1201) Remove producesSortedKeys from Source
Daniel Halperin created BEAM-1201: - Summary: Remove producesSortedKeys from Source Key: BEAM-1201 URL: https://issues.apache.org/jira/browse/BEAM-1201 Project: Beam Issue Type: Improvement Components: sdk-java-core Reporter: Daniel Halperin Assignee: Daniel Halperin Priority: Minor This is a holdover from a precursor of the old Dataflow SDK that we just failed to delete before releasing Dataflow 1.0, but we can delete before the first stable release of Beam. This function has never been used by any runner. It does not mean anything obvious to implementors, as many sources produce {{T}}, not {{KV}} -- what does it mean in the former case? (And how do you get the latter case correct?) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[46/51] [abbrv] incubator-beam git commit: Allow setting timer by ID in DirectTimerInternals
Allow setting timer by ID in DirectTimerInternals Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7f14c463 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7f14c463 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7f14c463 Branch: refs/heads/python-sdk Commit: 7f14c463acd2ae5b86ac81a9528ac4aa7dff765f Parents: 4d71924 Author: Kenneth KnowlesAuthored: Wed Dec 7 20:18:44 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 13:45:37 2016 -0800 -- .../runners/direct/DirectTimerInternals.java| 2 +- .../beam/runners/direct/WatermarkManager.java | 25 2 files changed, 26 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f14c463/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java index 5ca276d..80e0721 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java @@ -49,7 +49,7 @@ class DirectTimerInternals implements TimerInternals { @Override public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) { -throw new UnsupportedOperationException("Setting timer by ID not yet supported."); +timerUpdateBuilder.setTimer(TimerData.of(timerId, namespace, target, timeDomain)); } @Deprecated http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f14c463/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index 7bed751..f7bafd1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -23,11 +23,13 @@ import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.collect.ComparisonChain; +import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.common.collect.SortedMultiset; +import com.google.common.collect.Table; import com.google.common.collect.TreeMultiset; import java.io.Serializable; import java.util.ArrayList; @@ -56,6 +58,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TaggedPValue; import org.joda.time.Instant; @@ -210,6 +213,10 @@ public class WatermarkManager { private final SortedMultiset pendingElements; private final Map objectTimers; +// Entries in this table represent the authoritative timestamp for which +// a per-key-and-StateNamespace timer is set. +private final Map > existingTimers; + private AtomicReference currentWatermark; public AppliedPTransformInputWatermark(Collection inputWatermarks) { @@ -222,6 +229,7 @@ public class WatermarkManager { this.pendingElements = TreeMultiset.create(pendingBundleComparator); this.objectTimers = new HashMap<>(); + this.existingTimers = new HashMap<>(); currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE); } @@ -276,14 +284,31 @@ public class WatermarkManager { keyTimers = new TreeSet<>(); objectTimers.put(update.key, keyTimers); } + Table existingTimersForKey = + existingTimers.get(update.key); + if (existingTimersForKey == null) { +existingTimersForKey = HashBasedTable.create(); +existingTimers.put(update.key, existingTimersForKey); + } +
[49/51] [abbrv] incubator-beam git commit: This closes #1669: Preliminaries for timers in the direct runner
This closes #1669: Preliminaries for timers in the direct runner Hold output watermark according to pending timers Allow setting timer by ID in DirectTimerInternals Add UsesTestStream for use with JUnit @Category Add static Window.withOutputTimeFn to match build method Use informative Instant formatter in WatermarkHold Add informative Instant formatter to BoundedWindow Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/57d9bbd7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/57d9bbd7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/57d9bbd7 Branch: refs/heads/python-sdk Commit: 57d9bbd797edfcf32fdd9284b802fc4f9694e8d2 Parents: ff39516 dfe2e62 Author: Kenneth KnowlesAuthored: Wed Dec 21 13:46:34 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 13:46:34 2016 -0800 -- .../apache/beam/runners/core/WatermarkHold.java | 4 +- .../runners/direct/DirectTimerInternals.java| 2 +- .../beam/runners/direct/WatermarkManager.java | 78 ++-- .../apache/beam/sdk/testing/UsesTestStream.java | 24 ++ .../sdk/transforms/windowing/BoundedWindow.java | 31 .../beam/sdk/transforms/windowing/Window.java | 9 +++ .../apache/beam/sdk/testing/TestStreamTest.java | 12 +-- 7 files changed, 144 insertions(+), 16 deletions(-) --
[50/51] [abbrv] incubator-beam git commit: Merge remote-tracking branch 'origin/master' into python-sdk
Merge remote-tracking branch 'origin/master' into python-sdk Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/77742979 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/77742979 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/77742979 Branch: refs/heads/python-sdk Commit: 77742979463d0590969b186cee2af017a2bb1037 Parents: 3b4fd5c 57d9bbd Author: Ahmet AltayAuthored: Wed Dec 21 14:22:06 2016 -0800 Committer: Ahmet Altay Committed: Wed Dec 21 14:22:06 2016 -0800 -- .gitignore | 4 + .jenkins/common_job_properties.groovy | 173 ++ ...job_beam_PostCommit_Java_MavenInstall.groovy | 42 ++ ...ostCommit_Java_RunnableOnService_Apex.groovy | 41 ++ ...ommit_Java_RunnableOnService_Dataflow.groovy | 39 ++ ...stCommit_Java_RunnableOnService_Flink.groovy | 38 ++ ...ommit_Java_RunnableOnService_Gearpump.groovy | 41 ++ ...stCommit_Java_RunnableOnService_Spark.groovy | 38 ++ .../job_beam_PostCommit_Python_Verify.groovy| 37 ++ .../job_beam_PreCommit_Java_MavenInstall.groovy | 42 ++ .../job_beam_Release_NightlySnapshot.groovy | 46 ++ .jenkins/job_seed.groovy| 47 ++ .travis.yml | 2 +- .travis/README.md | 2 +- README.md | 2 +- examples/java/pom.xml | 20 +- .../apache/beam/examples/WindowedWordCount.java | 177 +++--- .../org/apache/beam/examples/WordCount.java | 2 +- .../examples/common/WriteWindowedFilesDoFn.java | 77 +++ .../beam/examples/complete/AutoComplete.java| 6 +- .../org/apache/beam/examples/complete/README.md | 14 +- .../apache/beam/examples/complete/TfIdf.java| 6 +- .../examples/complete/TopWikipediaSessions.java | 15 +- .../examples/complete/TrafficMaxLaneFlow.java | 4 +- .../beam/examples/complete/TrafficRoutes.java | 4 +- .../examples/cookbook/BigQueryTornadoes.java| 2 +- .../cookbook/CombinePerKeyExamples.java | 2 +- .../beam/examples/cookbook/DeDupExample.java| 96 --- .../beam/examples/cookbook/DistinctExample.java | 96 +++ .../beam/examples/cookbook/FilterExamples.java | 2 +- .../examples/cookbook/MaxPerKeyExamples.java| 2 +- .../org/apache/beam/examples/cookbook/README.md | 16 +- .../beam/examples/cookbook/TriggerExample.java | 4 +- .../beam/examples/WindowedWordCountIT.java | 182 +- .../org/apache/beam/examples/WordCountIT.java | 2 +- .../org/apache/beam/examples/WordCountTest.java | 7 +- .../examples/complete/AutoCompleteTest.java | 13 +- .../beam/examples/complete/TfIdfTest.java | 6 +- .../complete/TopWikipediaSessionsTest.java | 7 +- .../examples/cookbook/DistinctExampleTest.java | 9 +- .../examples/cookbook/JoinExamplesTest.java | 6 +- .../examples/cookbook/TriggerExampleTest.java | 6 +- examples/java8/pom.xml | 2 +- .../beam/examples/complete/game/GameStats.java | 2 +- .../examples/complete/game/LeaderBoard.java | 4 +- .../beam/examples/complete/game/UserScore.java | 2 +- .../complete/game/utils/WriteToBigQuery.java| 2 +- .../game/utils/WriteWindowedToBigQuery.java | 7 +- .../examples/MinimalWordCountJava8Test.java | 6 +- .../examples/complete/game/GameStatsTest.java | 7 +- .../complete/game/HourlyTeamScoreTest.java | 5 +- .../examples/complete/game/LeaderBoardTest.java | 11 +- .../examples/complete/game/UserScoreTest.java | 10 +- examples/pom.xml| 2 +- pom.xml | 15 +- runners/apex/pom.xml| 69 ++- .../apache/beam/runners/apex/ApexRunner.java| 111 ++-- .../beam/runners/apex/ApexRunnerResult.java | 50 +- .../beam/runners/apex/ApexYarnLauncher.java | 395 .../translation/ApexPipelineTranslator.java | 14 +- .../translation/ParDoBoundMultiTranslator.java | 22 +- .../apex/translation/ParDoBoundTranslator.java | 22 +- .../apex/translation/TranslationContext.java| 10 +- .../apex/translation/WindowBoundTranslator.java | 78 +++ .../operators/ApexGroupByKeyOperator.java | 26 +- .../operators/ApexParDoOperator.java| 29 +- .../apex/translation/utils/NoOpStepContext.java | 3 +- .../beam/runners/apex/ApexYarnLauncherTest.java | 138 + .../FlattenPCollectionTranslatorTest.java | 15 +- .../translation/GroupByKeyTranslatorTest.java | 21 +- .../translation/ParDoBoundTranslatorTest.java | 38 +- .../translation/ReadUnboundTranslatorTest.java | 15 +- runners/core-java/pom.xml | 6 +-
[41/51] [abbrv] incubator-beam git commit: Fix windowing in direct runner Stateful ParDo
Fix windowing in direct runner Stateful ParDo Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4fb16e8f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4fb16e8f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4fb16e8f Branch: refs/heads/python-sdk Commit: 4fb16e8fb9bb087c0975f38c54665634868cfed7 Parents: 7ee8c86 Author: Kenneth KnowlesAuthored: Tue Dec 20 13:58:29 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 13:11:22 2016 -0800 -- .../direct/ParDoMultiOverrideFactory.java | 34 ++-- 1 file changed, 31 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4fb16e8f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index 2cea999..b35df87 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -34,8 +34,13 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo.BoundMulti; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -92,9 +97,12 @@ class ParDoMultiOverrideFactory @Override public PCollectionTuple expand(PCollection > input) { + WindowingStrategy inputWindowingStrategy = input.getWindowingStrategy(); + // A KvCoder is required since this goes through GBK. Further, WindowedValueCoder // is not registered by default, so we explicitly set the relevant coders. - checkState(input.getCoder() instanceof KvCoder, + checkState( + input.getCoder() instanceof KvCoder, "Input to a %s using state requires a %s, but the coder was %s", ParDo.class.getSimpleName(), KvCoder.class.getSimpleName(), @@ -102,14 +110,27 @@ class ParDoMultiOverrideFactory KvCoder kvCoder = (KvCoder ) input.getCoder(); Coder keyCoder = kvCoder.getKeyCoder(); Coder windowCoder = - input.getWindowingStrategy().getWindowFn().windowCoder(); + inputWindowingStrategy.getWindowFn().windowCoder(); - PCollectionTuple outputs = + PCollection >> adjustedInput = input // Stash the original timestamps, etc, for when it is fed to the user's DoFn .apply("Reify timestamps", ParDo.of(new ReifyWindowedValueFn ())) .setCoder(KvCoder.of(keyCoder, WindowedValue.getFullCoder(kvCoder, windowCoder))) + // We are going to GBK to gather keys and windows but otherwise do not want + // to alter the flow of data. This entails: + // - trigger as fast as possible + // - maintain the full timestamps of elements + // - ensure this GBK holds to the minimum of those timestamps (via OutputTimeFn) + // - discard past panes as it is "just a stream" of elements + .apply( + Window. >>create()) @@ -117,6 +138,13 @@ class ParDoMultiOverrideFactory .apply("To KeyedWorkItem", ParDo.of(new ToKeyedWorkItem ()))
[44/51] [abbrv] incubator-beam git commit: Add static Window.withOutputTimeFn to match build method
Add static Window.withOutputTimeFn to match build method Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8188040d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8188040d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8188040d Branch: refs/heads/python-sdk Commit: 8188040d930b1fa49efd4ed7d5f821d05d6f28ef Parents: fa4958a Author: Kenneth KnowlesAuthored: Tue Dec 20 13:57:55 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 13:45:37 2016 -0800 -- .../org/apache/beam/sdk/transforms/windowing/Window.java| 9 + 1 file changed, 9 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8188040d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index 0c430d0..1241abe 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -223,6 +223,15 @@ public class Window { } /** + * (Experimental) Override the default {@link OutputTimeFn}, to control + * the output timestamp of values output from a {@link GroupByKey} operation. + */ + @Experimental(Kind.OUTPUT_TIME) + public static Bound withOutputTimeFn(OutputTimeFn outputTimeFn) { +return new Bound(null).withOutputTimeFn(outputTimeFn); + } + + /** * A {@code PTransform} that windows the elements of a {@code PCollection}, * into finite windows according to a user-specified {@code WindowFn}. *
[45/51] [abbrv] incubator-beam git commit: Add UsesTestStream for use with JUnit @Category
Add UsesTestStream for use with JUnit @Category Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4d71924c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4d71924c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4d71924c Branch: refs/heads/python-sdk Commit: 4d71924ccda9dae97c7cc9535a9780df9457cc3f Parents: 8188040 Author: Kenneth KnowlesAuthored: Tue Dec 20 14:20:07 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 13:45:37 2016 -0800 -- .../apache/beam/sdk/testing/UsesTestStream.java | 24 .../apache/beam/sdk/testing/TestStreamTest.java | 12 +- 2 files changed, 30 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d71924c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java new file mode 100644 index 000..8debb46 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +/** + * Category tag for tests that use {@link TestStream}, which is not a part of the Beam model + * but a special feature currently only implemented by the direct runner. + */ +public interface UsesTestStream {} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d71924c/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java index 64aeca3..c12e9f3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java @@ -69,7 +69,7 @@ public class TestStreamTest implements Serializable { @Rule public transient ExpectedException thrown = ExpectedException.none(); @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, UsesTestStream.class}) public void testLateDataAccumulating() { Instant instant = new Instant(0); TestStream source = TestStream.create(VarIntCoder.of()) @@ -136,7 +136,7 @@ public class TestStreamTest implements Serializable { } @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, UsesTestStream.class}) public void testProcessingTimeTrigger() { TestStream source = TestStream.create(VarLongCoder.of()) .addElements(TimestampedValue.of(1L, new Instant(1000L)), @@ -159,7 +159,7 @@ public class TestStreamTest implements Serializable { } @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, UsesTestStream.class}) public void testDiscardingMode() { TestStream stream = TestStream.create(StringUtf8Coder.of()) @@ -208,7 +208,7 @@ public class TestStreamTest implements Serializable { } @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, UsesTestStream.class}) public void testFirstElementLate() { Instant lateElementTimestamp = new Instant(-1_000_000); TestStream stream = @@ -238,7 +238,7 @@ public class TestStreamTest implements Serializable { } @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, UsesTestStream.class}) public void testElementsAtAlmostPositiveInfinity() { Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp(); TestStream stream = TestStream.create(StringUtf8Coder.of()) @@ -261,7 +261,7 @@ public class TestStreamTest implements Serializable { } @Test - @Category(NeedsRunner.class) +
[36/51] [abbrv] incubator-beam git commit: This closes #1673: Require TimeDomain to delete a timer
This closes #1673: Require TimeDomain to delete a timer Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4843dc59 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4843dc59 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4843dc59 Branch: refs/heads/python-sdk Commit: 4843dc59c6e87ea0be75f7abd1e312bf5bc5a529 Parents: 0d0a5e2 35a0274 Author: Kenneth KnowlesAuthored: Wed Dec 21 10:15:56 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 10:15:56 2016 -0800 -- .../operators/ApexGroupByKeyOperator.java | 8 .../beam/runners/core/InMemoryTimerInternals.java | 8 .../beam/runners/direct/DirectTimerInternals.java | 8 .../wrappers/streaming/WindowDoFnOperator.java | 9 + .../org/apache/beam/sdk/util/TimerInternals.java | 17 +++-- 5 files changed, 48 insertions(+), 2 deletions(-) --
[51/51] [abbrv] incubator-beam git commit: This closes #1676: Merge master (57d9bbd) into python-sdk
This closes #1676: Merge master (57d9bbd) into python-sdk No fixups. Commit from master: Hold output watermark according to pending timers Allow setting timer by ID in DirectTimerInternals Add UsesTestStream for use with JUnit @Category Add static Window.withOutputTimeFn to match build method Use informative Instant formatter in WatermarkHold Add informative Instant formatter to BoundedWindow Actually propagate and commit state in direct runner Fix windowing in direct runner Stateful ParDo Support set and delete of timer by ID in InMemoryTimerInternals Move ExecutionContext and related classes to runners-core Update Dataflow worker to beam-master-20161221 Require TimeDomain to delete a timer Provide local tags in PInput, POutput expansions Remove deprecated InMemoryTimerInternals from SDK Remove deprecated AggregatorFactory from SDK Update Dataflow worker to beam-master-20161220 Fixup usage of canonical name with name since canonical name != name for inner classes. Provide a better error message for non-existing gcpTempLocation Restore SDK's InMemoryTimerInternals, deprecated Move InMemoryTimerInternals to runners-core Port direct runner StatefulParDo to KeyedWorkItem Propagate key through ParDo if DoFn is key-preserving Move responsibility for knowing about keyedness into EvaluationContext Add some key-preserving to KeyedPValueTrackingVisitor Migrated the beam-sdks-java-io-java8tests module to TestPipeline as a JUnit rule + fixed WithTimestampsJava8Test.withTimestampsLambdaShouldApplyTimestamps. Migrated the beam-sdks-java-io-mongodb module to TestPipeline as a JUnit rule. Migrated the beam-sdks-java-io-kinesis module to TestPipeline as a JUnit rule. Migrated the beam-sdks-java-io-kafka module to TestPipeline as a JUnit rule. Migrated the beam-sdks-java-io-jms module to TestPipeline as a JUnit rule. Migrated the beam-sdks-java-io-jdbc module to TestPipeline as a JUnit rule. Migrated the beam-sdks-java-io-google-cloud-platform module to TestPipeline as a JUnit rule. Migrated the beam-sdks-java-extensions-sorter module to TestPipeline as a JUnit rule. Migrated the beam-sdks-java-extensions-join-library module to TestPipeline as a JUnit rule. Migrated the beam-sdks-java-core module to TestPipeline as a JUnit rule. Plus, fixed some checkstyle errors from previous modules' migration. Migrated the beam-runners-direct-java module to TestPipeline as a JUnit rule. Migrated the beam-runners-core module to TestPipeline as a JUnit rule. Migrated the beam-examples-java8 module to TestPipeline as a JUnit rule. Migrated the beam-examples-java module to TestPipeline as a JUnit rule. Disable automatic archiving of Maven builds [BEAM-59] initial interfaces and classes of Beam FileSystem. Change counter name in TestDataflowRunner More escaping in Jenkins timestamp spec Add RunnableOnService test for Metrics Fix seed job fetch spec Show timestamps on log lines in Jenkins [BEAM-1165] Fix unexpected file creation when checking dependencies [BEAM-1178] Make naming of logger objects consistent [BEAM-716] Fix javadoc on with* methods [BEAM-959] Improve check preconditions in JmsIO [BEAM-716] Use AutoValue in JmsIO Fix grammar error (repeated for) Empty TestPipeline need not be run [BEAM-85, BEAM-298] Make TestPipeline a JUnit Rule checking proper usage Change counter name in TestDataflowRunner BigQueryIO: fix streaming write, typo in API [BEAM-853] Force streaming execution on batch pipelines for testing. Expose the adapted source. Use empty SideInputReader, fixes NPE in SimpleDoFnRunnerTest Test that SimpleDoFnRunner wraps exceptions in startBundle and finishBundle Add timer support to DoFnRunner(s) Make TimerSpec and StateSpec fields accessible View.asMap: minor javadoc fixes Revert "Move InMemoryTimerInternals to runners-core" Revert "Moves DoFnAdapters to runners-core" Revert "Removes ArgumentProvider.windowingInternals" Revert "Removes code for wrapping DoFn as an OldDoFn" checkstyle: missed newline in DistributionCell Make {Metric,Counter,Distribution}Cell public Add PTransformOverrideFactory to the Core SDK Move ActiveWindowSet and implementations to runners-core Update Dataflow worker to beam-master-20161216 [BEAM-1108] Remove outdated language about experimental autoscaling [BEAM-450] Shade modules to separate paths [BEAM-362] Port runners to runners-core AggregatoryFactory Move InMemoryTimerInternals to runners-core Delete deprecated TimerCallback Remove deprecated methods of InMemoryTimerInternals Don't incorrectly log error in MetricsEnvironment Renames ParDo.getNewFn to getFn Moves DoFnAdapters to runners-core Removes unused code from NoOpOldDoFn Removes ArgumentProvider.windowingInternals Removes code for wrapping DoFn as an OldDoFn Removes OldDoFn from ParDo Pushes uses of OldDoFn deeper inside Flin
[38/51] [abbrv] incubator-beam git commit: Support set and delete of timer by ID in InMemoryTimerInternals
Support set and delete of timer by ID in InMemoryTimerInternals Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/df2e540d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/df2e540d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/df2e540d Branch: refs/heads/python-sdk Commit: df2e540d7a7b8444b9ff3b404740d5a3394b7691 Parents: acd2196 Author: Kenneth KnowlesAuthored: Mon Dec 19 14:01:36 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 11:01:36 2016 -0800 -- .../runners/core/InMemoryTimerInternals.java| 65 +++ .../core/InMemoryTimerInternalsTest.java| 112 +-- 2 files changed, 120 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/df2e540d/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java index 5fcd088..292ac23 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java @@ -17,13 +17,15 @@ */ package org.apache.beam.runners.core; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.common.base.MoreObjects; -import java.util.HashSet; -import java.util.PriorityQueue; -import java.util.Set; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Table; +import java.util.NavigableSet; +import java.util.TreeSet; import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; @@ -35,17 +37,17 @@ import org.joda.time.Instant; /** {@link TimerInternals} with all watermarks and processing clock simulated in-memory. */ public class InMemoryTimerInternals implements TimerInternals { - /** At most one timer per timestamp is kept. */ - private Set existingTimers = new HashSet<>(); + /** The current set timers by namespace and ID. */ + Table existingTimers = HashBasedTable.create(); /** Pending input watermark timers, in timestamp order. */ - private PriorityQueue watermarkTimers = new PriorityQueue<>(11); + private NavigableSet watermarkTimers = new TreeSet<>(); /** Pending processing time timers, in timestamp order. */ - private PriorityQueue processingTimers = new PriorityQueue<>(11); + private NavigableSet processingTimers = new TreeSet<>(); /** Pending synchronized processing time timers, in timestamp order. */ - private PriorityQueue synchronizedProcessingTimers = new PriorityQueue<>(11); + private NavigableSet synchronizedProcessingTimers = new TreeSet<>(); /** Current input watermark. */ private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; @@ -74,13 +76,13 @@ public class InMemoryTimerInternals implements TimerInternals { final TimerData data; switch (domain) { case EVENT_TIME: -data = watermarkTimers.peek(); +data = watermarkTimers.first(); break; case PROCESSING_TIME: -data = processingTimers.peek(); +data = processingTimers.first(); break; case SYNCHRONIZED_PROCESSING_TIME: -data = synchronizedProcessingTimers.peek(); +data = synchronizedProcessingTimers.first(); break; default: throw new IllegalArgumentException("Unexpected time domain: " + domain); @@ -88,7 +90,7 @@ public class InMemoryTimerInternals implements TimerInternals { return (data == null) ? null : data.getTimestamp(); } - private PriorityQueue queue(TimeDomain domain) { + private NavigableSet timersForDomain(TimeDomain domain) { switch (domain) { case EVENT_TIME: return watermarkTimers; @@ -104,27 +106,45 @@ public class InMemoryTimerInternals implements TimerInternals { @Override public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) { -throw new UnsupportedOperationException("Setting a timer by ID is not yet supported."); +setTimer(TimerData.of(timerId, namespace, target, timeDomain)); } @Override public void setTimer(TimerData timerData) { WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData); -
[35/51] [abbrv] incubator-beam git commit: Update Dataflow worker to beam-master-20161221
Update Dataflow worker to beam-master-20161221 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/64336e40 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/64336e40 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/64336e40 Branch: refs/heads/python-sdk Commit: 64336e40dd6a48b3b6b48634bb9204db0aa0c7ca Parents: 0d0a5e2 Author: Kenneth Knowles <k...@google.com> Authored: Wed Dec 21 10:09:49 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Wed Dec 21 10:10:00 2016 -0800 -- .../org/apache/beam/runners/dataflow/dataflow.properties | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64336e40/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties -- diff --git a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties index bf08e83..2912f61 100644 --- a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties +++ b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties @@ -18,6 +18,6 @@ environment.major.version=6 -worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161220 +worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161221 -worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161220 +worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161221
[47/51] [abbrv] incubator-beam git commit: Hold output watermark according to pending timers
Hold output watermark according to pending timers Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dfe2e62d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dfe2e62d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dfe2e62d Branch: refs/heads/python-sdk Commit: dfe2e62d103595583e3ca4594cc03885fe1bba16 Parents: 7f14c46 Author: Kenneth KnowlesAuthored: Tue Dec 20 13:37:40 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 13:45:37 2016 -0800 -- .../beam/runners/direct/WatermarkManager.java | 59 1 file changed, 48 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfe2e62d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index f7bafd1..248fafd 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -211,12 +211,18 @@ public class WatermarkManager { private static class AppliedPTransformInputWatermark implements Watermark { private final Collection inputWatermarks; private final SortedMultiset pendingElements; -private final Map objectTimers; + +// This tracks only the quantity of timers at each timestamp, for quickly getting the cross-key +// minimum +private final SortedMultiset pendingTimers; // Entries in this table represent the authoritative timestamp for which // a per-key-and-StateNamespace timer is set. private final Map > existingTimers; +// This per-key sorted set allows quick retrieval of timers that should fire for a key +private final Map objectTimers; + private AtomicReference currentWatermark; public AppliedPTransformInputWatermark(Collection inputWatermarks) { @@ -224,10 +230,13 @@ public class WatermarkManager { // The ordering must order elements by timestamp, and must not compare two distinct elements // as equal. This is built on the assumption that any element added as a pending element will // be consumed without modifications. + // + // The same logic is applied for pending timers Ordering pendingBundleComparator = new BundleByElementTimestampComparator().compound(Ordering.arbitrary()); this.pendingElements = TreeMultiset.create(pendingBundleComparator); + this.pendingTimers = TreeMultiset.create(); this.objectTimers = new HashMap<>(); this.existingTimers = new HashMap<>(); currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE); @@ -278,6 +287,14 @@ public class WatermarkManager { pendingElements.remove(completed); } +private synchronized Instant getEarliestTimerTimestamp() { + if (pendingTimers.isEmpty()) { +return BoundedWindow.TIMESTAMP_MAX_VALUE; + } else { +return pendingTimers.firstEntry().getElement(); + } +} + private synchronized void updateTimers(TimerUpdate update) { NavigableSet keyTimers = objectTimers.get(update.key); if (keyTimers == null) { @@ -291,27 +308,43 @@ public class WatermarkManager { existingTimers.put(update.key, existingTimersForKey); } - for (TimerData timer : update.setTimers) { + for (TimerData timer : update.getSetTimers()) { +if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { + @Nullable + TimerData existingTimer = + existingTimersForKey.get(timer.getNamespace(), timer.getTimerId()); + + if (existingTimer == null) { +pendingTimers.add(timer.getTimestamp()); +keyTimers.add(timer); + } else if (!existingTimer.equals(timer)) { +keyTimers.remove(existingTimer); +keyTimers.add(timer); + } // else the timer is already set identically, so noop + + existingTimersForKey.put(timer.getNamespace(), timer.getTimerId(), timer); +} + } + + for (TimerData timer : update.getDeletedTimers()) { if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { @Nullable TimerData existingTimer = existingTimersForKey.get(timer.getNamespace(), timer.getTimerId());
[43/51] [abbrv] incubator-beam git commit: Add informative Instant formatter to BoundedWindow
Add informative Instant formatter to BoundedWindow Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/280a6a8f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/280a6a8f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/280a6a8f Branch: refs/heads/python-sdk Commit: 280a6a8f729cb382616ad65f71860b61277cbd6f Parents: ff39516 Author: Kenneth KnowlesAuthored: Mon Dec 19 20:40:11 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 13:45:36 2016 -0800 -- .../beam/sdk/transforms/windowing/BoundedWindow.java| 12 1 file changed, 12 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/280a6a8f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java index 3654074..6da2495 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java @@ -39,6 +39,18 @@ public abstract class BoundedWindow { public static final Instant TIMESTAMP_MAX_VALUE = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE)); + public static String formatTimestamp(Instant timestamp) { +if (timestamp.equals(TIMESTAMP_MIN_VALUE)) { + return timestamp.toString() + " (TIMESTAMP_MIN_VALUE)"; +} else if (timestamp.equals(TIMESTAMP_MAX_VALUE)) { + return timestamp.toString() + " (TIMESTAMP_MAX_VALUE)"; +} else if (timestamp.equals(GlobalWindow.INSTANCE.maxTimestamp())) { + return timestamp.toString() + " (end of global window)"; +} else { + return timestamp.toString(); +} + } + /** * Returns the inclusive upper bound of timestamps for values in this window. */
[28/51] [abbrv] incubator-beam git commit: This closes #1665: Remove deprecated AggregatorFactory from SDK
This closes #1665: Remove deprecated AggregatorFactory from SDK Update Dataflow worker to beam-master-20161220 Remove deprecated AggregatorFactory from SDK Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6a05d7f1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6a05d7f1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6a05d7f1 Branch: refs/heads/python-sdk Commit: 6a05d7f17aab5cab202cdbf50b766b4fc86180b4 Parents: acd2196 aab46a0 Author: Kenneth KnowlesAuthored: Tue Dec 20 15:07:12 2016 -0800 Committer: Kenneth Knowles Committed: Tue Dec 20 15:07:12 2016 -0800 -- .../beam/runners/dataflow/dataflow.properties| 4 ++-- .../apache/beam/sdk/transforms/Aggregator.java | 19 --- 2 files changed, 2 insertions(+), 21 deletions(-) --
[GitHub] incubator-beam pull request #1676: Merge remote-tracking branch 'origin/mast...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/1676 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[37/51] [abbrv] incubator-beam git commit: This closes #1666: Move ExecutionContext and related classes to runners-core
This closes #1666: Move ExecutionContext and related classes to runners-core Move ExecutionContext and related classes to runners-core Update Dataflow worker to beam-master-20161221 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a9447a22 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a9447a22 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a9447a22 Branch: refs/heads/python-sdk Commit: a9447a2251f46496b7a773c8b07b3281dbc7a6fb Parents: 4843dc5 9d2b8e0 Author: Kenneth Knowles <k...@google.com> Authored: Wed Dec 21 10:24:11 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Wed Dec 21 10:24:11 2016 -0800 -- .../operators/ApexParDoOperator.java| 2 +- .../apex/translation/utils/NoOpStepContext.java | 3 +- .../beam/runners/core/AggregatorFactory.java| 1 - .../beam/runners/core/BaseExecutionContext.java | 176 +++ .../apache/beam/runners/core/DoFnRunners.java | 2 +- .../beam/runners/core/ExecutionContext.java | 102 +++ .../beam/runners/core/SimpleDoFnRunner.java | 2 +- .../beam/runners/core/SimpleOldDoFnRunner.java | 2 +- .../beam/runners/core/SimpleDoFnRunnerTest.java | 2 +- .../runners/core/SimpleOldDoFnRunnerTest.java | 3 +- .../runners/direct/AggregatorContainer.java | 2 +- .../runners/direct/DirectExecutionContext.java | 6 +- .../beam/runners/direct/EvaluationContext.java | 2 +- .../runners/direct/AggregatorContainerTest.java | 2 +- .../wrappers/streaming/DoFnOperator.java| 2 +- .../wrappers/streaming/WindowDoFnOperator.java | 2 +- .../beam/runners/dataflow/dataflow.properties | 4 +- .../spark/aggregators/SparkAggregators.java | 2 +- .../spark/translation/SparkProcessContext.java | 2 +- .../beam/sdk/util/BaseExecutionContext.java | 174 -- .../apache/beam/sdk/util/ExecutionContext.java | 100 --- 21 files changed, 297 insertions(+), 296 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a9447a22/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java --
[48/51] [abbrv] incubator-beam git commit: Use informative Instant formatter in WatermarkHold
Use informative Instant formatter in WatermarkHold Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa4958a6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa4958a6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa4958a6 Branch: refs/heads/python-sdk Commit: fa4958a6140eb00ceee08b2468f7d88f17538794 Parents: 280a6a8 Author: Kenneth KnowlesAuthored: Mon Dec 19 20:40:47 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 13:45:37 2016 -0800 -- .../apache/beam/runners/core/WatermarkHold.java | 4 +++- .../sdk/transforms/windowing/BoundedWindow.java | 19 +++ 2 files changed, 22 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa4958a6/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java index 7f1afcc..5e5f44d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java @@ -207,7 +207,9 @@ class WatermarkHold implements Serializable { Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window); checkState(!shifted.isBefore(timestamp), "OutputTimeFn moved element from %s to earlier time %s for window %s", -timestamp, shifted, window); +BoundedWindow.formatTimestamp(timestamp), +BoundedWindow.formatTimestamp(shifted), +window); checkState(timestamp.isAfter(window.maxTimestamp()) || !shifted.isAfter(window.maxTimestamp()), "OutputTimeFn moved element from %s to %s which is beyond end of " http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa4958a6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java index 6da2495..74223b5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java @@ -34,11 +34,30 @@ import org.joda.time.Instant; public abstract class BoundedWindow { // The min and max timestamps that won't overflow when they are converted to // usec. + + /** + * The minimum value for any Beam timestamp. Often referred to as "-infinity". + * + * This value and {@link #TIMESTAMP_MAX_VALUE} are chosen so that their + * microseconds-since-epoch can be safely represented with a {@code long}. + */ public static final Instant TIMESTAMP_MIN_VALUE = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE)); + + /** + * The maximum value for any Beam timestamp. Often referred to as "+infinity". + * + * This value and {@link #TIMESTAMP_MIN_VALUE} are chosen so that their + * microseconds-since-epoch can be safely represented with a {@code long}. + */ public static final Instant TIMESTAMP_MAX_VALUE = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE)); + /** + * Formats a {@link Instant} timestamp with additional Beam-specific metadata, such as indicating + * whether the timestamp is the end of the global window or one of the distinguished values {@link + * #TIMESTAMP_MIN_VALUE} or {@link #TIMESTAMP_MIN_VALUE}. + */ public static String formatTimestamp(Instant timestamp) { if (timestamp.equals(TIMESTAMP_MIN_VALUE)) { return timestamp.toString() + " (TIMESTAMP_MIN_VALUE)";
[42/51] [abbrv] incubator-beam git commit: This closes #1670: Fixes for direct runner expansion and evaluation of stateful ParDo
This closes #1670: Fixes for direct runner expansion and evaluation of stateful ParDo Actually propagate and commit state in direct runner Fix windowing in direct runner Stateful ParDo Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ff395169 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ff395169 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ff395169 Branch: refs/heads/python-sdk Commit: ff395169993d84cd920be21f11d9af8f8d8b Parents: 7ee8c86 55176c3 Author: Kenneth KnowlesAuthored: Wed Dec 21 13:11:54 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 13:11:54 2016 -0800 -- .../direct/ParDoMultiOverrideFactory.java | 34 ++-- .../direct/StatefulParDoEvaluatorFactory.java | 1 + 2 files changed, 32 insertions(+), 3 deletions(-) --
[40/51] [abbrv] incubator-beam git commit: Actually propagate and commit state in direct runner
Actually propagate and commit state in direct runner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/55176c38 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/55176c38 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/55176c38 Branch: refs/heads/python-sdk Commit: 55176c385cc802be42b5467fbb2dcc9a1c7467ea Parents: 4fb16e8 Author: Kenneth KnowlesAuthored: Tue Dec 20 15:59:45 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 13:11:22 2016 -0800 -- .../apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55176c38/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index 5f9d8f4..003df0f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -233,6 +233,7 @@ final class StatefulParDoEvaluatorFactory implements Transfo StepTransformResult. >>withHold( delegateResult.getTransform(), delegateResult.getWatermarkHold()) .withTimerUpdate(delegateResult.getTimerUpdate()) + .withState(delegateResult.getState()) .withAggregatorChanges(delegateResult.getAggregatorChanges()) .withMetricUpdates(delegateResult.getLogicalMetricUpdates()) .addOutput(Lists.newArrayList(delegateResult.getOutputBundles()));
[10/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-extensions-join-library module to TestPipeline as a JUnit rule.
Migrated the beam-sdks-java-extensions-join-library module to TestPipeline as a JUnit rule. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/24ad1831 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/24ad1831 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/24ad1831 Branch: refs/heads/python-sdk Commit: 24ad18319248a128a1c5db4f2bef8861f7361d9f Parents: 75a4c91 Author: Stas LevinAuthored: Tue Dec 20 00:01:31 2016 +0200 Committer: Kenneth Knowles Committed: Tue Dec 20 09:55:45 2016 -0800 -- .../beam/sdk/extensions/joinlibrary/InnerJoinTest.java| 9 ++--- .../sdk/extensions/joinlibrary/OuterLeftJoinTest.java | 10 +++--- .../sdk/extensions/joinlibrary/OuterRightJoinTest.java| 10 +++--- 3 files changed, 20 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24ad1831/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java -- diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java index 423ab9c..1c120c2 100644 --- a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/InnerJoinTest.java @@ -19,13 +19,13 @@ package org.apache.beam.sdk.extensions.joinlibrary; import java.util.ArrayList; import java.util.List; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; /** @@ -33,15 +33,16 @@ import org.junit.Test; */ public class InnerJoinTest { - Pipeline p; List > leftListOfKv; List > listRightOfKv; List >> expectedResult; + @Rule + public final transient TestPipeline p = TestPipeline.create(); + @Before public void setup() { -p = TestPipeline.create(); leftListOfKv = new ArrayList<>(); listRightOfKv = new ArrayList<>(); @@ -131,11 +132,13 @@ public class InnerJoinTest { @Test(expected = NullPointerException.class) public void testJoinLeftCollectionNull() { +p.enableAbandonedNodeEnforcement(false); Join.innerJoin(null, p.apply(Create.of(listRightOfKv))); } @Test(expected = NullPointerException.class) public void testJoinRightCollectionNull() { +p.enableAbandonedNodeEnforcement(false); Join.innerJoin(p.apply(Create.of(leftListOfKv)), null); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24ad1831/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java -- diff --git a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java index c32163f..81f4fa3 100644 --- a/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java +++ b/sdks/java/extensions/join-library/src/test/java/org/apache/beam/sdk/extensions/joinlibrary/OuterLeftJoinTest.java @@ -19,13 +19,13 @@ package org.apache.beam.sdk.extensions.joinlibrary; import java.util.ArrayList; import java.util.List; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; @@ -34,15 +34,16 @@ import org.junit.Test; */ public class OuterLeftJoinTest { - Pipeline p; List > leftListOfKv; List > listRightOfKv; List >> expectedResult; + @Rule + public final transient TestPipeline p = TestPipeline.create(); + @Before public void setup() { -p = TestPipeline.create(); leftListOfKv = new ArrayList<>(); listRightOfKv = new ArrayList<>(); @@ -133,16 +134,19 @@ public class OuterLeftJoinTest
[09/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-extensions-sorter module to TestPipeline as a JUnit rule.
Migrated the beam-sdks-java-extensions-sorter module to TestPipeline as a JUnit rule. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/63331aa8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/63331aa8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/63331aa8 Branch: refs/heads/python-sdk Commit: 63331aa8aa6314e8469c23a4f4a89fbf287cbc5a Parents: 24ad183 Author: Stas LevinAuthored: Tue Dec 20 09:54:57 2016 +0200 Committer: Kenneth Knowles Committed: Tue Dec 20 09:55:45 2016 -0800 -- .../org/apache/beam/sdk/extensions/sorter/SortValuesTest.java | 7 --- 1 file changed, 4 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/63331aa8/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java -- diff --git a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java index ebfbd0e..4f77100 100644 --- a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java +++ b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java @@ -24,7 +24,6 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import java.util.Arrays; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -35,6 +34,7 @@ import org.apache.beam.sdk.values.PCollection; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -43,10 +43,11 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class SortValuesTest { + @Rule + public final transient TestPipeline p = TestPipeline.create(); + @Test public void testSecondaryKeySorting() throws Exception { -Pipeline p = TestPipeline.create(); - // Create a PCollection of > pairs. PCollection >> input = p.apply(
[02/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-core module to TestPipeline as a JUnit rule. Plus, fixed some checkstyle errors from previous modules' migration.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java index 2a89a18..3bc0a65 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java @@ -38,7 +38,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Random; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; @@ -76,12 +75,12 @@ import org.junit.runners.JUnit4; @SuppressWarnings("unchecked") public class CreateTest { @Rule public final ExpectedException thrown = ExpectedException.none(); + @Rule public final TestPipeline p = TestPipeline.create(); + @Test @Category(RunnableOnService.class) public void testCreate() { -Pipeline p = TestPipeline.create(); - PCollection output = p.apply(Create.of(LINES)); @@ -93,8 +92,6 @@ public class CreateTest { @Test @Category(RunnableOnService.class) public void testCreateEmpty() { -Pipeline p = TestPipeline.create(); - PCollection output = p.apply(Create.of(NO_LINES) .withCoder(StringUtf8Coder.of())); @@ -106,7 +103,7 @@ public class CreateTest { @Test public void testCreateEmptyInfersCoder() { -Pipeline p = TestPipeline.create(); +p.enableAbandonedNodeEnforcement(false); PCollection output = p.apply(Create.of()); @@ -126,8 +123,6 @@ public class CreateTest { thrown.expectMessage( Matchers.containsString("Unable to infer a coder")); -Pipeline p = TestPipeline.create(); - // Create won't infer a default coder in this case. p.apply(Create.of(new Record(), new Record2())); @@ -137,8 +132,6 @@ public class CreateTest { @Test @Category(RunnableOnService.class) public void testCreateWithNullsAndValues() throws Exception { -Pipeline p = TestPipeline.create(); - PCollection output = p.apply(Create.of(null, "test1", null, "test2", null) .withCoder(SerializableCoder.of(String.class))); @@ -150,8 +143,6 @@ public class CreateTest { @Test @Category(NeedsRunner.class) public void testCreateParameterizedType() throws Exception { -Pipeline p = TestPipeline.create(); - PCollectionoutput = p.apply(Create.of( TimestampedValue.of("a", new Instant(0)), @@ -216,7 +207,6 @@ public class CreateTest { Create.Values create = Create.of(elements).withCoder(new UnserializableRecord.UnserializableRecordCoder()); -TestPipeline p = TestPipeline.create(); PAssert.that(p.apply(create)) .containsInAnyOrder( new UnserializableRecord("foo"), @@ -235,8 +225,6 @@ public class CreateTest { @Test @Category(RunnableOnService.class) public void testCreateTimestamped() { -Pipeline p = TestPipeline.create(); - List data = Arrays.asList( TimestampedValue.of("a", new Instant(1L)), TimestampedValue.of("b", new Instant(2L)), @@ -254,8 +242,6 @@ public class CreateTest { @Test @Category(RunnableOnService.class) public void testCreateTimestampedEmpty() { -Pipeline p = TestPipeline.create(); - PCollection output = p .apply(Create.timestamped(new ArrayList ()) .withCoder(StringUtf8Coder.of())); @@ -266,7 +252,7 @@ public class CreateTest { @Test public void testCreateTimestampedEmptyInfersCoder() { -Pipeline p = TestPipeline.create(); +p.enableAbandonedNodeEnforcement(false); PCollection output = p .apply(Create.timestamped()); @@ -280,8 +266,6 @@ public class CreateTest { thrown.expectMessage( Matchers.containsString("Unable to infer a coder")); -Pipeline p = TestPipeline.create(); - // Create won't infer a default coder in this case. PCollection c = p.apply(Create.timestamped( TimestampedValue.of(new Record(), new Instant(0)), @@ -295,7 +279,6 @@ public class CreateTest { @Test @Category(RunnableOnService.class) public void testCreateWithVoidType() throws Exception { -Pipeline p = TestPipeline.create(); PCollection output = p.apply(Create.of((Void) null, (Void) null)); PAssert.that(output).containsInAnyOrder((Void) null, (Void) null); p.run(); @@ -304,8 +287,6 @@ public class CreateTest { @Test @Category(RunnableOnService.class) public void testCreateWithKVVoidType() throws Exception { -Pipeline p = TestPipeline.create(); - PCollection
[08/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-io-google-cloud-platform module to TestPipeline as a JUnit rule.
Migrated the beam-sdks-java-io-google-cloud-platform module to TestPipeline as a JUnit rule. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6dea0992 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6dea0992 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6dea0992 Branch: refs/heads/python-sdk Commit: 6dea0992d9976b39232cf846906831feaa25ec43 Parents: 63331aa Author: Stas LevinAuthored: Tue Dec 20 13:26:07 2016 +0200 Committer: Kenneth Knowles Committed: Tue Dec 20 09:55:45 2016 -0800 -- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 25 +--- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 13 ++ 2 files changed, 25 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6dea0992/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java -- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index dc566d2..4ddfdea 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -643,6 +643,7 @@ public class BigQueryIOTest implements Serializable { } } + @Rule public final transient TestPipeline p = TestPipeline.create(); @Rule public transient ExpectedException thrown = ExpectedException.none(); @Rule public transient ExpectedLogs logged = ExpectedLogs.none(BigQueryIO.class); @Rule public transient TemporaryFolder testFolder = new TemporaryFolder(); @@ -1370,7 +1371,7 @@ public class BigQueryIOTest implements Serializable { @Test @Category(NeedsRunner.class) public void testBuildWriteWithoutTable() { -Pipeline p = TestPipeline.create(); + thrown.expect(IllegalStateException.class); thrown.expectMessage("must set the table reference"); p.apply(Create.of().withCoder(TableRowJsonCoder.of())) @@ -1591,9 +1592,11 @@ public class BigQueryIOTest implements Serializable { @Test public void testWriteValidateFailsCreateNoSchema() { +p.enableAbandonedNodeEnforcement(false); + thrown.expect(IllegalArgumentException.class); thrown.expectMessage("no schema was provided"); -TestPipeline.create() +p .apply(Create.of()) .apply(BigQueryIO.Write .to("dataset.table") @@ -1602,9 +1605,11 @@ public class BigQueryIOTest implements Serializable { @Test public void testWriteValidateFailsTableAndTableSpec() { +p.enableAbandonedNodeEnforcement(false); + thrown.expect(IllegalStateException.class); thrown.expectMessage("Cannot set both a table reference and a table function"); -TestPipeline.create() +p .apply(Create.of()) .apply(BigQueryIO.Write .to("dataset.table") @@ -1618,9 +1623,11 @@ public class BigQueryIOTest implements Serializable { @Test public void testWriteValidateFailsNoTableAndNoTableSpec() { +p.enableAbandonedNodeEnforcement(false); + thrown.expect(IllegalStateException.class); thrown.expectMessage("must set the table reference of a BigQueryIO.Write transform"); -TestPipeline.create() +p .apply(Create.of()) .apply("name", BigQueryIO.Write.withoutValidation()); } @@ -1950,7 +1957,6 @@ public class BigQueryIOTest implements Serializable { @Test @Category(NeedsRunner.class) public void testPassThroughThenCleanup() throws Exception { -Pipeline p = TestPipeline.create(); PCollection output = p .apply(Create.of(1, 2, 3)) @@ -1968,7 +1974,6 @@ public class BigQueryIOTest implements Serializable { @Test @Category(NeedsRunner.class) public void testPassThroughThenCleanupExecuted() throws Exception { -Pipeline p = TestPipeline.create(); p.apply(Create.of()) .apply(new PassThroughThenCleanup(new CleanupOperation() { @@ -2025,6 +2030,8 @@ public class BigQueryIOTest implements Serializable { private void testWritePartition(long numFiles, long fileSize, long expectedNumPartitions) throws Exception { +p.enableAbandonedNodeEnforcement(false); + List expectedPartitionIds = Lists.newArrayList(); for (long i = 1; i <= expectedNumPartitions; ++i) { expectedPartitionIds.add(i); @@ -2044,7 +2051,7 @@ public class BigQueryIOTest implements Serializable { new TupleTag >("singlePartitionTag")
[34/51] [abbrv] incubator-beam git commit: Move ExecutionContext and related classes to runners-core
Move ExecutionContext and related classes to runners-core Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9d2b8e09 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9d2b8e09 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9d2b8e09 Branch: refs/heads/python-sdk Commit: 9d2b8e09bcb5e04017b487e1a919d335875dbfc0 Parents: 64336e4 Author: Kenneth KnowlesAuthored: Thu Dec 15 20:20:34 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 10:10:00 2016 -0800 -- .../operators/ApexParDoOperator.java| 2 +- .../apex/translation/utils/NoOpStepContext.java | 3 +- .../beam/runners/core/AggregatorFactory.java| 1 - .../beam/runners/core/BaseExecutionContext.java | 176 +++ .../apache/beam/runners/core/DoFnRunners.java | 2 +- .../beam/runners/core/ExecutionContext.java | 102 +++ .../beam/runners/core/SimpleDoFnRunner.java | 2 +- .../beam/runners/core/SimpleOldDoFnRunner.java | 2 +- .../beam/runners/core/SimpleDoFnRunnerTest.java | 2 +- .../runners/core/SimpleOldDoFnRunnerTest.java | 3 +- .../runners/direct/AggregatorContainer.java | 2 +- .../runners/direct/DirectExecutionContext.java | 6 +- .../beam/runners/direct/EvaluationContext.java | 2 +- .../runners/direct/AggregatorContainerTest.java | 2 +- .../wrappers/streaming/DoFnOperator.java| 2 +- .../wrappers/streaming/WindowDoFnOperator.java | 2 +- .../spark/aggregators/SparkAggregators.java | 2 +- .../spark/translation/SparkProcessContext.java | 2 +- .../beam/sdk/util/BaseExecutionContext.java | 174 -- .../apache/beam/sdk/util/ExecutionContext.java | 100 --- 20 files changed, 295 insertions(+), 294 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java -- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index a3d3a97..c41cd45 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -41,6 +41,7 @@ import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.sdk.coders.Coder; @@ -50,7 +51,6 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.util.ExecutionContext; import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.UserCodeException; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java -- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java index 078f95f..f169ae6 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java @@ -19,10 +19,9 @@ package org.apache.beam.runners.apex.translation.utils; import java.io.IOException; import java.io.Serializable; - +import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.ExecutionContext; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.StateInternals; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d2b8e09/runners/core-java/src/main/java/org/apache/beam/runners/core/AggregatorFactory.java
[27/51] [abbrv] incubator-beam git commit: Update Dataflow worker to beam-master-20161220
Update Dataflow worker to beam-master-20161220 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a3f68d34 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a3f68d34 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a3f68d34 Branch: refs/heads/python-sdk Commit: a3f68d343d018d99a04ac5c9134a11f1bc74935b Parents: 2f4b803 Author: Kenneth KnowlesAuthored: Tue Dec 20 14:05:19 2016 -0800 Committer: Kenneth Knowles Committed: Tue Dec 20 14:05:35 2016 -0800 -- .../org/apache/beam/runners/dataflow/dataflow.properties | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a3f68d34/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties -- diff --git a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties index 27a518f..bf08e83 100644 --- a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties +++ b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties @@ -18,6 +18,6 @@ environment.major.version=6 -worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161216 +worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161220 -worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161216 +worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161220
[14/51] [abbrv] incubator-beam git commit: This closes #1664: [BEAM-1176] Migrating tests to use TestPipeline as a JUnit rule
This closes #1664: [BEAM-1176] Migrating tests to use TestPipeline as a JUnit rule Migrated the beam-sdks-java-io-java8tests module to TestPipeline as a JUnit rule + fixed WithTimestampsJava8Test.withTimestampsLambdaShouldApplyTimestamps. Migrated the beam-sdks-java-io-mongodb module to TestPipeline as a JUnit rule. Migrated the beam-sdks-java-io-kinesis module to TestPipeline as a JUnit rule. Migrated the beam-sdks-java-io-kafka module to TestPipeline as a JUnit rule. Migrated the beam-sdks-java-io-jms module to TestPipeline as a JUnit rule. Migrated the beam-sdks-java-io-jdbc module to TestPipeline as a JUnit rule. Migrated the beam-sdks-java-io-google-cloud-platform module to TestPipeline as a JUnit rule. Migrated the beam-sdks-java-extensions-sorter module to TestPipeline as a JUnit rule. Migrated the beam-sdks-java-extensions-join-library module to TestPipeline as a JUnit rule. Migrated the beam-sdks-java-core module to TestPipeline as a JUnit rule. Plus, fixed some checkstyle errors from previous modules' migration. Migrated the beam-runners-direct-java module to TestPipeline as a JUnit rule. Migrated the beam-runners-core module to TestPipeline as a JUnit rule. Migrated the beam-examples-java8 module to TestPipeline as a JUnit rule. Migrated the beam-examples-java module to TestPipeline as a JUnit rule. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/22e25a47 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/22e25a47 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/22e25a47 Branch: refs/heads/python-sdk Commit: 22e25a47e2edc7b7f702eaca12630f6de7195657 Parents: b3de17b 4b23d42 Author: Kenneth KnowlesAuthored: Tue Dec 20 09:55:57 2016 -0800 Committer: Kenneth Knowles Committed: Tue Dec 20 09:55:57 2016 -0800 -- .../org/apache/beam/examples/WordCountTest.java | 7 +- .../examples/complete/AutoCompleteTest.java | 11 +- .../beam/examples/complete/TfIdfTest.java | 6 +- .../complete/TopWikipediaSessionsTest.java | 7 +- .../examples/cookbook/DistinctExampleTest.java | 9 +- .../examples/cookbook/JoinExamplesTest.java | 6 +- .../examples/cookbook/TriggerExampleTest.java | 6 +- .../examples/MinimalWordCountJava8Test.java | 6 +- .../examples/complete/game/GameStatsTest.java | 7 +- .../complete/game/HourlyTeamScoreTest.java | 5 +- .../examples/complete/game/LeaderBoardTest.java | 11 +- .../examples/complete/game/UserScoreTest.java | 10 +- .../core/PushbackSideInputDoFnRunnerTest.java | 5 +- .../beam/runners/core/SplittableParDoTest.java | 10 +- .../UnboundedReadFromBoundedSourceTest.java | 6 +- .../direct/BoundedReadEvaluatorFactoryTest.java | 13 +- .../direct/CloningBundleFactoryTest.java| 8 +- .../runners/direct/CommittedResultTest.java | 6 +- .../CopyOnAccessInMemoryStateInternalsTest.java | 7 +- .../runners/direct/DirectGraphVisitorTest.java | 3 +- .../runners/direct/EvaluationContextTest.java | 7 +- .../direct/FlattenEvaluatorFactoryTest.java | 6 +- .../direct/GroupByKeyEvaluatorFactoryTest.java | 5 +- .../GroupByKeyOnlyEvaluatorFactoryTest.java | 5 +- .../ImmutabilityCheckingBundleFactoryTest.java | 4 +- .../ImmutabilityEnforcementFactoryTest.java | 3 +- .../direct/ImmutableListBundleFactoryTest.java | 14 ++- .../direct/KeyedPValueTrackingVisitorTest.java | 6 +- .../beam/runners/direct/ParDoEvaluatorTest.java | 5 +- .../runners/direct/SideInputContainerTest.java | 5 +- .../StatefulParDoEvaluatorFactoryTest.java | 7 +- .../runners/direct/StepTransformResultTest.java | 5 +- .../direct/TestStreamEvaluatorFactoryTest.java | 5 +- .../runners/direct/TransformExecutorTest.java | 4 +- .../UnboundedReadEvaluatorFactoryTest.java | 9 +- .../direct/ViewEvaluatorFactoryTest.java| 5 +- .../direct/WatermarkCallbackExecutorTest.java | 5 +- .../runners/direct/WatermarkManagerTest.java| 6 +- .../direct/WindowEvaluatorFactoryTest.java | 5 +- .../direct/WriteWithShardingFactoryTest.java| 14 +-- .../java/org/apache/beam/sdk/PipelineTest.java | 37 +++--- .../apache/beam/sdk/coders/AvroCoderTest.java | 11 +- .../beam/sdk/coders/CoderRegistryTest.java | 6 +- .../beam/sdk/coders/SerializableCoderTest.java | 7 +- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 16 +-- .../io/BoundedReadFromUnboundedSourceTest.java | 6 +- .../beam/sdk/io/CompressedSourceTest.java | 12 +- .../apache/beam/sdk/io/CountingInputTest.java | 12 +- .../apache/beam/sdk/io/CountingSourceTest.java | 13 +- .../apache/beam/sdk/io/FileBasedSourceTest.java | 4 +- .../beam/sdk/io/PubsubUnboundedSinkTest.java| 10 +-
[39/51] [abbrv] incubator-beam git commit: This closes #1160: Support set and delete of timer by ID in InMemoryTimerInternals
This closes #1160: Support set and delete of timer by ID in InMemoryTimerInternals Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7ee8c86d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7ee8c86d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7ee8c86d Branch: refs/heads/python-sdk Commit: 7ee8c86d3b0553d8cb7de60b0dc1a03103dfbbc5 Parents: a9447a2 df2e540 Author: Kenneth KnowlesAuthored: Wed Dec 21 11:02:02 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 11:02:02 2016 -0800 -- .../runners/core/InMemoryTimerInternals.java| 65 +++ .../core/InMemoryTimerInternalsTest.java| 112 +-- 2 files changed, 120 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7ee8c86d/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java -- diff --cc runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java index 5ddd5a7,292ac23..2c3d78a --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java @@@ -104,10 -106,9 +106,10 @@@ public class InMemoryTimerInternals imp @Override public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) { - throw new UnsupportedOperationException("Setting a timer by ID is not yet supported."); + setTimer(TimerData.of(timerId, namespace, target, timeDomain)); } + @Deprecated @Override public void setTimer(TimerData timerData) { WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData); @@@ -117,17 -133,13 +134,20 @@@ } @Override + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { +throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported."); + } + + @Deprecated + @Override public void deleteTimer(StateNamespace namespace, String timerId) { - throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported."); + TimerData existing = existingTimers.get(namespace, timerId); + if (existing != null) { + deleteTimer(existing); + } } + @Deprecated @Override public void deleteTimer(TimerData timer) { WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer);
[23/51] [abbrv] incubator-beam git commit: Fixup usage of canonical name with name since canonical name != name for inner classes.
Fixup usage of canonical name with name since canonical name != name for inner classes. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96d39314 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96d39314 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96d39314 Branch: refs/heads/python-sdk Commit: 96d393147c365d0911a091d0b3600fef621709f4 Parents: ef1a858 Author: Luke CwikAuthored: Tue Dec 20 11:47:42 2016 -0800 Committer: Luke Cwik Committed: Tue Dec 20 13:04:24 2016 -0800 -- .../org/apache/beam/runners/dataflow/DataflowRunnerTest.java | 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96d39314/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java -- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index b29c4cd..21d575a 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -267,7 +267,7 @@ public class DataflowRunnerTest { "--runner=DataflowRunner", "--tempLocation=/tmp/not/a/gs/path", "--project=test-project", -"--credentialFactoryClass=" + NoopCredentialFactory.class.getCanonicalName(), +"--credentialFactoryClass=" + NoopCredentialFactory.class.getName(), }; try { @@ -286,7 +286,7 @@ public class DataflowRunnerTest { "--runner=DataflowRunner", "--tempLocation=gs://does/not/exist", "--project=test-project", -"--credentialFactoryClass=" + NoopCredentialFactory.class.getCanonicalName(), +"--credentialFactoryClass=" + NoopCredentialFactory.class.getName(), }; try { @@ -306,8 +306,8 @@ public class DataflowRunnerTest { "--runner=DataflowRunner", "--tempLocation=/tmp/testing", "--project=test-project", -"--credentialFactoryClass=" + NoopCredentialFactory.class.getCanonicalName(), -"--pathValidatorClass=" + NoopPathValidator.class.getCanonicalName(), +"--credentialFactoryClass=" + NoopCredentialFactory.class.getName(), +"--pathValidatorClass=" + NoopPathValidator.class.getName(), }; // Should not crash, because gcpTempLocation should get set from tempLocation TestPipeline.fromOptions(PipelineOptionsFactory.fromArgs(args).create());
[05/51] [abbrv] incubator-beam git commit: Migrated the beam-runners-core module to TestPipeline as a JUnit rule.
Migrated the beam-runners-core module to TestPipeline as a JUnit rule. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b6710251 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b6710251 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b6710251 Branch: refs/heads/python-sdk Commit: b6710251d8bb5d1968aea2258ce5878b43368dd5 Parents: 7106e88 Author: Stas LevinAuthored: Sun Dec 18 18:51:31 2016 +0200 Committer: Kenneth Knowles Committed: Tue Dec 20 09:55:45 2016 -0800 -- .../runners/core/PushbackSideInputDoFnRunnerTest.java | 5 - .../org/apache/beam/runners/core/SplittableParDoTest.java | 10 -- .../runners/core/UnboundedReadFromBoundedSourceTest.java | 5 +++-- 3 files changed, 15 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6710251/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java -- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java index a1cdbf6..251c7c2 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java @@ -47,6 +47,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.hamcrest.Matchers; import org.joda.time.Instant; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -63,10 +64,12 @@ public class PushbackSideInputDoFnRunnerTest { private TestDoFnRunner underlying; private PCollectionView singletonView; + @Rule + public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Before public void setup() { MockitoAnnotations.initMocks(this); -TestPipeline p = TestPipeline.create(); PCollection created = p.apply(Create.of(1, 2, 3)); singletonView = created http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6710251/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java -- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index cf96b66..0f0b106 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -57,6 +57,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -132,9 +133,13 @@ public class SplittableParDoTest { return ParDo.of(fn).withOutputTags(MAIN_OUTPUT_TAG, TupleTagList.empty()); } + @Rule + public TestPipeline pipeline = TestPipeline.create(); + @Test public void testBoundednessForBoundedFn() { -Pipeline pipeline = TestPipeline.create(); +pipeline.enableAbandonedNodeEnforcement(false); + DoFn boundedFn = new BoundedFakeFn(); assertEquals( "Applying a bounded SDF to a bounded collection produces a bounded collection", @@ -154,7 +159,8 @@ public class SplittableParDoTest { @Test public void testBoundednessForUnboundedFn() { -Pipeline pipeline = TestPipeline.create(); +pipeline.enableAbandonedNodeEnforcement(false); + DoFn unboundedFn = new UnboundedFakeFn(); assertEquals( "Applying an unbounded SDF to a bounded collection produces a bounded collection", http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6710251/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java -- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java index 7fd8807..86450f2 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java +++
[01/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-core module to TestPipeline as a JUnit rule. Plus, fixed some checkstyle errors from previous modules' migration.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 3b4fd5c7d -> 3454d691f http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java index 3bf63fd..1d8b32c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java @@ -84,12 +84,14 @@ public class ViewTest implements Serializable { // anonymous inner classes inside the non-static test methods. @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + + @Rule public transient ExpectedException thrown = ExpectedException.none(); @Test @Category(RunnableOnService.class) public void testSingletonSideInput() { -Pipeline pipeline = TestPipeline.create(); final PCollectionView view = pipeline.apply("Create47", Create.of(47)).apply(View.asSingleton()); @@ -112,7 +114,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testWindowedSingletonSideInput() { -Pipeline pipeline = TestPipeline.create(); final PCollectionView view = pipeline.apply("Create47", Create.timestamped( @@ -143,7 +144,6 @@ public class ViewTest implements Serializable { @Test @Category(NeedsRunner.class) public void testEmptySingletonSideInput() throws Exception { -Pipeline pipeline = TestPipeline.create(); final PCollectionView view = pipeline.apply("CreateEmptyIntegers", Create.of().withCoder(VarIntCoder.of())) @@ -169,7 +169,6 @@ public class ViewTest implements Serializable { @Test @Category(NeedsRunner.class) public void testNonSingletonSideInput() throws Exception { -Pipeline pipeline = TestPipeline.create(); PCollection oneTwoThree = pipeline.apply(Create.of(1, 2, 3)); final PCollectionView view = oneTwoThree.apply(View.asSingleton()); @@ -194,7 +193,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testListSideInput() { -Pipeline pipeline = TestPipeline.create(); final PCollectionViewview = pipeline.apply("CreateSideInput", Create.of(11, 13, 17, 23)).apply(View.asList()); @@ -221,7 +219,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testWindowedListSideInput() { -Pipeline pipeline = TestPipeline.create(); final PCollectionView
view = pipeline.apply("CreateSideInput", Create.timestamped( @@ -262,7 +259,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testEmptyListSideInput() throws Exception { -Pipeline pipeline = TestPipeline.create(); final PCollectionView
view = pipeline.apply("CreateEmptyView", Create.of().withCoder(VarIntCoder.of())) @@ -289,7 +285,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testListSideInputIsImmutable() { -Pipeline pipeline = TestPipeline.create(); final PCollectionView
view = pipeline.apply("CreateSideInput", Create.of(11)).apply(View.asList()); @@ -335,7 +330,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testIterableSideInput() { -Pipeline pipeline = TestPipeline.create(); final PCollectionView
view = pipeline.apply("CreateSideInput", Create.of(11, 13, 17, 23)) @@ -361,7 +355,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testWindowedIterableSideInput() { -Pipeline pipeline = TestPipeline.create(); final PCollectionView view = pipeline.apply("CreateSideInput", Create.timestamped( @@ -401,7 +394,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testEmptyIterableSideInput() throws Exception { -Pipeline pipeline = TestPipeline.create(); final PCollectionView view = pipeline.apply("CreateEmptyView", Create.of().withCoder(VarIntCoder.of())) @@ -427,7 +419,6 @@ public class ViewTest implements Serializable { @Test @Category(RunnableOnService.class) public void testIterableSideInputIsImmutable() { -Pipeline pipeline = TestPipeline.create(); final PCollectionView view = pipeline.apply("CreateSideInput", Create.of(11)).apply(View.asIterable()); @@ -459,7 +450,6 @@ public class ViewTest implements Serializable { @Test
[30/51] [abbrv] incubator-beam git commit: Provide local tags in PInput, POutput expansions
Provide local tags in PInput, POutput expansions Output an ordered colleciton in PInput and POutput expansions. This provides information that is necessary to reconstruct a PInput or POutput from its expansion. Implement PCollectionList.equals, PCollectionTuple.equals Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/34373c21 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/34373c21 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/34373c21 Branch: refs/heads/python-sdk Commit: 34373c21ed67696235d88ef40d50e31c77b84c33 Parents: 6a05d7f Author: Thomas GrohAuthored: Tue Dec 6 11:03:52 2016 -0800 Committer: Thomas Groh Committed: Tue Dec 20 15:18:55 2016 -0800 -- .../beam/runners/direct/DirectGraphVisitor.java | 18 +-- .../beam/runners/direct/EvaluationContext.java | 7 +- .../direct/KeyedPValueTrackingVisitor.java | 16 ++- .../beam/runners/direct/WatermarkManager.java | 19 +-- .../apache/beam/runners/spark/SparkRunner.java | 13 ++- .../beam/sdk/runners/TransformHierarchy.java| 49 .../transforms/join/KeyedPCollectionTuple.java | 9 +- .../java/org/apache/beam/sdk/values/PBegin.java | 4 +- .../apache/beam/sdk/values/PCollectionList.java | 65 +++ .../beam/sdk/values/PCollectionTuple.java | 28 - .../java/org/apache/beam/sdk/values/PDone.java | 4 +- .../java/org/apache/beam/sdk/values/PInput.java | 4 +- .../org/apache/beam/sdk/values/POutput.java | 4 +- .../java/org/apache/beam/sdk/values/PValue.java | 10 ++ .../org/apache/beam/sdk/values/PValueBase.java | 11 +- .../apache/beam/sdk/values/TaggedPValue.java| 42 +++ .../sdk/runners/TransformHierarchyTest.java | 23 +++- .../apache/beam/sdk/transforms/ParDoTest.java | 34 ++ .../beam/sdk/values/PCollectionListTest.java| 117 +++ .../beam/sdk/values/PCollectionTupleTest.java | 70 +++ 20 files changed, 449 insertions(+), 98 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java index 0283d03..425bbf1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TaggedPValue; /** * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the @@ -79,14 +80,16 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults { @Override public void visitPrimitiveTransform(TransformHierarchy.Node node) { -toFinalize.removeAll(node.getInputs()); +for (TaggedPValue consumed : node.getInputs()) { + toFinalize.remove(consumed.getValue()); +} AppliedPTransform appliedTransform = getAppliedTransform(node); stepNames.put(appliedTransform, genStepName()); if (node.getInputs().isEmpty()) { rootTransforms.add(appliedTransform); } else { - for (PValue value : node.getInputs()) { -primitiveConsumers.put(value, appliedTransform); + for (TaggedPValue value : node.getInputs()) { +primitiveConsumers.put(value.getValue(), appliedTransform); } } } @@ -96,15 +99,12 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults { toFinalize.add(value); AppliedPTransform appliedTransform = getAppliedTransform(producer); +if (value instanceof PCollectionView) { + views.add((PCollectionView) value); +} if (!producers.containsKey(value)) { producers.put(value, appliedTransform); } - if (value instanceof PCollectionView) { - views.add((PCollectionView) value); - } - if (!producers.containsKey(value)) { - producers.put(value, appliedTransform); - } } private AppliedPTransform getAppliedTransform(TransformHierarchy.Node node) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/34373c21/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java -- diff --git
[31/51] [abbrv] incubator-beam git commit: This closes #1569
This closes #1569 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aadcf3a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aadcf3a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aadcf3a1 Branch: refs/heads/python-sdk Commit: aadcf3a1203b257961a1a474acf74e6bbca1e2ad Parents: 6a05d7f 34373c2 Author: Thomas GrohAuthored: Tue Dec 20 15:18:55 2016 -0800 Committer: Thomas Groh Committed: Tue Dec 20 15:18:55 2016 -0800 -- .../beam/runners/direct/DirectGraphVisitor.java | 18 +-- .../beam/runners/direct/EvaluationContext.java | 7 +- .../direct/KeyedPValueTrackingVisitor.java | 16 ++- .../beam/runners/direct/WatermarkManager.java | 19 +-- .../apache/beam/runners/spark/SparkRunner.java | 13 ++- .../beam/sdk/runners/TransformHierarchy.java| 49 .../transforms/join/KeyedPCollectionTuple.java | 9 +- .../java/org/apache/beam/sdk/values/PBegin.java | 4 +- .../apache/beam/sdk/values/PCollectionList.java | 65 +++ .../beam/sdk/values/PCollectionTuple.java | 28 - .../java/org/apache/beam/sdk/values/PDone.java | 4 +- .../java/org/apache/beam/sdk/values/PInput.java | 4 +- .../org/apache/beam/sdk/values/POutput.java | 4 +- .../java/org/apache/beam/sdk/values/PValue.java | 10 ++ .../org/apache/beam/sdk/values/PValueBase.java | 11 +- .../apache/beam/sdk/values/TaggedPValue.java| 42 +++ .../sdk/runners/TransformHierarchyTest.java | 23 +++- .../apache/beam/sdk/transforms/ParDoTest.java | 34 ++ .../beam/sdk/values/PCollectionListTest.java| 117 +++ .../beam/sdk/values/PCollectionTupleTest.java | 70 +++ 20 files changed, 449 insertions(+), 98 deletions(-) --
[33/51] [abbrv] incubator-beam git commit: Require TimeDomain to delete a timer
Require TimeDomain to delete a timer Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/35a02740 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/35a02740 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/35a02740 Branch: refs/heads/python-sdk Commit: 35a02740748182ee52729d8bfb621a3c342b8312 Parents: 0d0a5e2 Author: Kenneth KnowlesAuthored: Tue Dec 20 20:09:25 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 08:20:28 2016 -0800 -- .../operators/ApexGroupByKeyOperator.java | 8 .../beam/runners/core/InMemoryTimerInternals.java | 8 .../beam/runners/direct/DirectTimerInternals.java | 8 .../wrappers/streaming/WindowDoFnOperator.java | 9 + .../org/apache/beam/sdk/util/TimerInternals.java | 17 +++-- 5 files changed, 48 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java -- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 48ac177..49ec1c8 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -425,12 +425,19 @@ public class ApexGroupByKeyOperator implements Operator { */ public class ApexTimerInternals implements TimerInternals { +@Deprecated @Override public void setTimer(TimerData timerData) { registerActiveTimer(context.element().key(), timerData); } @Override +public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { + throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported."); +} + +@Deprecated +@Override public void deleteTimer(TimerData timerKey) { unregisterActiveTimer(context.element().key(), timerKey); } @@ -463,6 +470,7 @@ public class ApexGroupByKeyOperator implements Operator { throw new UnsupportedOperationException("Setting timer by ID not yet supported."); } +@Deprecated @Override public void deleteTimer(StateNamespace namespace, String timerId) { throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported."); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java index 5fcd088..5ddd5a7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java @@ -107,6 +107,7 @@ public class InMemoryTimerInternals implements TimerInternals { throw new UnsupportedOperationException("Setting a timer by ID is not yet supported."); } + @Deprecated @Override public void setTimer(TimerData timerData) { WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData); @@ -116,10 +117,17 @@ public class InMemoryTimerInternals implements TimerInternals { } @Override + public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) { +throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported."); + } + + @Deprecated + @Override public void deleteTimer(StateNamespace namespace, String timerId) { throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported."); } + @Deprecated @Override public void deleteTimer(TimerData timer) { WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
[13/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-io-kinesis module to TestPipeline as a JUnit rule.
Migrated the beam-sdks-java-io-kinesis module to TestPipeline as a JUnit rule. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/950aa7e1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/950aa7e1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/950aa7e1 Branch: refs/heads/python-sdk Commit: 950aa7e1d9c50167933eb192a16e15700e483377 Parents: 12be8b1 Author: Stas LevinAuthored: Tue Dec 20 17:44:15 2016 +0200 Committer: Kenneth Knowles Committed: Tue Dec 20 09:55:46 2016 -0800 -- .../org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java | 7 +-- .../java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java | 6 -- 2 files changed, 9 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/950aa7e1/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java -- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java index f0ab46c..075805e 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java @@ -22,19 +22,23 @@ import static com.google.common.collect.Lists.newArrayList; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.google.common.collect.Iterables; import java.util.List; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.joda.time.DateTime; +import org.junit.Rule; import org.junit.Test; /** * Tests {@link AmazonKinesisMock}. */ public class KinesisMockReadTest { + +@Rule +public final transient TestPipeline p = TestPipeline.create(); + @Test public void readsDataFromMockKinesis() { int noOfShards = 3; @@ -42,7 +46,6 @@ public class KinesisMockReadTest { List testData = provideTestData(noOfShards, noOfEventsPerShard); -final Pipeline p = TestPipeline.create(); PCollection result = p. apply( KinesisIO.Read. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/950aa7e1/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java -- diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java index 73a2455..690cc11 100644 --- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java +++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java @@ -31,7 +31,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; @@ -43,6 +42,7 @@ import org.apache.commons.lang.RandomStringUtils; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; /** @@ -53,6 +53,8 @@ public class KinesisReaderIT { private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10); private ExecutorService singleThreadExecutor = newSingleThreadExecutor(); +@Rule +public final transient TestPipeline p = TestPipeline.create(); @Ignore @Test @@ -76,7 +78,7 @@ public class KinesisReaderIT { private Future startTestPipeline(List testData, KinesisTestOptions options) throws InterruptedException { -final Pipeline p = TestPipeline.create(); + PCollection result = p. apply(KinesisIO.Read. from(options.getAwsKinesisStream(), Instant.now()).
[24/51] [abbrv] incubator-beam git commit: Provide a better error message for non-existing gcpTempLocation
Provide a better error message for non-existing gcpTempLocation gcpTempLocation will default to using the value for tmpLocation, as long as the value is a valid GCP path. Non-valid GCP paths are silently discarded. This change removes existence validation from the default value logic such that downstream validation can provide a better error message. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ef1a8583 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ef1a8583 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ef1a8583 Branch: refs/heads/python-sdk Commit: ef1a858347e475cd15f7dcd8873464f506527b2a Parents: 2f4b803 Author: Scott WegnerAuthored: Tue Dec 6 14:19:12 2016 -0800 Committer: Luke Cwik Committed: Tue Dec 20 13:04:24 2016 -0800 -- .../beam/runners/dataflow/DataflowRunner.java | 25 .../options/DataflowPipelineOptions.java| 19 - .../runners/dataflow/DataflowRunnerTest.java| 42 +++- .../options/DataflowPipelineOptionsTest.java| 20 ++ .../org/apache/beam/sdk/options/GcpOptions.java | 19 + .../apache/beam/sdk/util/GcsPathValidator.java | 3 +- .../apache/beam/sdk/options/GcpOptionsTest.java | 32 +-- .../beam/sdk/util/GcsPathValidatorTest.java | 15 +-- 8 files changed, 114 insertions(+), 61 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1a8583/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 711b1b0..1a15eaf 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -244,14 +244,23 @@ public class DataflowRunner extends PipelineRunner { } PathValidator validator = dataflowOptions.getPathValidator(); -checkArgument( -!isNullOrEmpty(dataflowOptions.getGcpTempLocation()), -"DataflowRunner requires gcpTempLocation, and it is missing in PipelineOptions."); - validator.validateOutputFilePrefixSupported(dataflowOptions.getGcpTempLocation()); -checkArgument( -!isNullOrEmpty(dataflowOptions.getStagingLocation()), -"DataflowRunner requires stagingLocation, and it is missing in PipelineOptions."); - validator.validateOutputFilePrefixSupported(dataflowOptions.getStagingLocation()); +String gcpTempLocation; +try { + gcpTempLocation = dataflowOptions.getGcpTempLocation(); +} catch (Exception e) { + throw new IllegalArgumentException("DataflowRunner requires gcpTempLocation, " + + "but failed to retrieve a value from PipelineOptions", e); +} +validator.validateOutputFilePrefixSupported(gcpTempLocation); + +String stagingLocation; +try { + stagingLocation = dataflowOptions.getStagingLocation(); +} catch (Exception e) { + throw new IllegalArgumentException("DataflowRunner requires stagingLocation, " + + "but failed to retrieve a value from PipelineOptions", e); +} +validator.validateOutputFilePrefixSupported(stagingLocation); if (!Strings.isNullOrEmpty(dataflowOptions.getSaveProfilesToGcs())) { validator.validateOutputFilePrefixSupported(dataflowOptions.getSaveProfilesToGcs()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ef1a8583/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index 66632ad..5ddc5d0 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -17,9 +17,6 @@ */ package org.apache.beam.runners.dataflow.options; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Strings.isNullOrEmpty; - import java.io.IOException; import
[07/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-io-jdbc module to TestPipeline as a JUnit rule.
Migrated the beam-sdks-java-io-jdbc module to TestPipeline as a JUnit rule. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5ccbe679 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5ccbe679 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5ccbe679 Branch: refs/heads/python-sdk Commit: 5ccbe6791af7e75c166ed877391e8c86bba5fe56 Parents: 6dea099 Author: Stas LevinAuthored: Tue Dec 20 17:26:51 2016 +0200 Committer: Kenneth Knowles Committed: Tue Dec 20 09:55:45 2016 -0800 -- .../test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java| 8 1 file changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5ccbe679/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java -- diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java index aa93a22..eec7cb8 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java @@ -48,6 +48,7 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.slf4j.Logger; @@ -64,6 +65,9 @@ public class JdbcIOTest implements Serializable { private static int port; + @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + @BeforeClass public static void startDatabase() throws Exception { ServerSocket socket = new ServerSocket(0); @@ -207,7 +211,6 @@ public class JdbcIOTest implements Serializable { @Test @Category(NeedsRunner.class) public void testRead() throws Exception { -TestPipeline pipeline = TestPipeline.create(); PCollection > output = pipeline.apply( JdbcIO. >read() @@ -245,7 +248,6 @@ public class JdbcIOTest implements Serializable { @Test @Category(NeedsRunner.class) public void testReadWithSingleStringParameter() throws Exception { - TestPipeline pipeline = TestPipeline.create(); PCollection > output = pipeline.apply( JdbcIO. >read() @@ -278,7 +280,6 @@ public class JdbcIOTest implements Serializable { @Test @Category(NeedsRunner.class) public void testWrite() throws Exception { -TestPipeline pipeline = TestPipeline.create(); ArrayList > data = new ArrayList<>(); for (int i = 0; i < 1000; i++) { @@ -316,7 +317,6 @@ public class JdbcIOTest implements Serializable { @Test @Category(NeedsRunner.class) public void testWriteWithEmptyPCollection() throws Exception { -TestPipeline pipeline = TestPipeline.create(); pipeline.apply(Create.of(new ArrayList >())) .apply(JdbcIO. >write()
[03/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-core module to TestPipeline as a JUnit rule. Plus, fixed some checkstyle errors from previous modules' migration.
Migrated the beam-sdks-java-core module to TestPipeline as a JUnit rule. Plus, fixed some checkstyle errors from previous modules' migration. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/75a4c918 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/75a4c918 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/75a4c918 Branch: refs/heads/python-sdk Commit: 75a4c918346b5a04213a54bf7d1bf6507655342a Parents: 09c404a Author: Stas LevinAuthored: Mon Dec 19 23:54:47 2016 +0200 Committer: Kenneth Knowles Committed: Tue Dec 20 09:55:45 2016 -0800 -- .../UnboundedReadFromBoundedSourceTest.java | 1 - .../direct/CloningBundleFactoryTest.java| 2 +- .../CopyOnAccessInMemoryStateInternalsTest.java | 6 +- .../ImmutabilityCheckingBundleFactoryTest.java | 2 +- .../direct/ImmutableListBundleFactoryTest.java | 2 +- .../direct/WriteWithShardingFactoryTest.java| 2 +- .../java/org/apache/beam/sdk/PipelineTest.java | 37 +++--- .../apache/beam/sdk/coders/AvroCoderTest.java | 11 +- .../beam/sdk/coders/CoderRegistryTest.java | 6 +- .../beam/sdk/coders/SerializableCoderTest.java | 7 +- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 16 +-- .../io/BoundedReadFromUnboundedSourceTest.java | 6 +- .../beam/sdk/io/CompressedSourceTest.java | 12 +- .../apache/beam/sdk/io/CountingInputTest.java | 12 +- .../apache/beam/sdk/io/CountingSourceTest.java | 13 +- .../apache/beam/sdk/io/FileBasedSourceTest.java | 4 +- .../beam/sdk/io/PubsubUnboundedSinkTest.java| 10 +- .../beam/sdk/io/PubsubUnboundedSourceTest.java | 12 +- .../java/org/apache/beam/sdk/io/TextIOTest.java | 29 +++-- .../java/org/apache/beam/sdk/io/WriteTest.java | 2 +- .../org/apache/beam/sdk/io/XmlSourceTest.java | 10 +- .../sdk/options/ProxyInvocationHandlerTest.java | 5 +- .../sdk/runners/TransformHierarchyTest.java | 6 +- .../beam/sdk/runners/TransformTreeTest.java | 11 +- .../beam/sdk/testing/GatherAllPanesTest.java| 7 +- .../apache/beam/sdk/testing/PAssertTest.java| 32 ++--- .../apache/beam/sdk/testing/TestStreamTest.java | 7 +- .../transforms/ApproximateQuantilesTest.java| 12 +- .../sdk/transforms/ApproximateUniqueTest.java | 6 +- .../beam/sdk/transforms/CombineFnsTest.java | 5 +- .../apache/beam/sdk/transforms/CombineTest.java | 25 +--- .../apache/beam/sdk/transforms/CountTest.java | 13 +- .../apache/beam/sdk/transforms/CreateTest.java | 27 + .../beam/sdk/transforms/DistinctTest.java | 12 +- .../apache/beam/sdk/transforms/DoFnTest.java| 4 +- .../beam/sdk/transforms/DoFnTesterTest.java | 6 +- .../apache/beam/sdk/transforms/FilterTest.java | 18 +-- .../sdk/transforms/FlatMapElementsTest.java | 10 +- .../apache/beam/sdk/transforms/FlattenTest.java | 35 +- .../beam/sdk/transforms/GroupByKeyTest.java | 30 ++--- .../apache/beam/sdk/transforms/KeysTest.java| 9 +- .../apache/beam/sdk/transforms/KvSwapTest.java | 9 +- .../apache/beam/sdk/transforms/LatestTest.java | 12 +- .../beam/sdk/transforms/MapElementsTest.java| 14 +-- .../beam/sdk/transforms/ParDoLifecycleTest.java | 17 +-- .../apache/beam/sdk/transforms/ParDoTest.java | 118 +++ .../beam/sdk/transforms/PartitionTest.java | 8 +- .../apache/beam/sdk/transforms/RegexTest.java | 25 +--- .../apache/beam/sdk/transforms/SampleTest.java | 34 +++--- .../beam/sdk/transforms/SplittableDoFnTest.java | 12 +- .../org/apache/beam/sdk/transforms/TopTest.java | 15 ++- .../apache/beam/sdk/transforms/ValuesTest.java | 7 +- .../apache/beam/sdk/transforms/ViewTest.java| 84 - .../beam/sdk/transforms/WithKeysTest.java | 8 +- .../beam/sdk/transforms/WithTimestampsTest.java | 9 +- .../sdk/transforms/join/CoGroupByKeyTest.java | 11 +- .../sdk/transforms/windowing/WindowTest.java| 22 ++-- .../sdk/transforms/windowing/WindowingTest.java | 11 +- .../org/apache/beam/sdk/util/ReshuffleTest.java | 11 +- .../beam/sdk/values/PCollectionTupleTest.java | 12 +- .../org/apache/beam/sdk/values/PDoneTest.java | 9 +- .../apache/beam/sdk/values/TypedPValueTest.java | 10 +- 62 files changed, 353 insertions(+), 587 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java -- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
[22/51] [abbrv] incubator-beam git commit: This closes #1581: [BEAM-1117] Port direct runner StatefulParDo to KeyedWorkItem
This closes #1581: [BEAM-1117] Port direct runner StatefulParDo to KeyedWorkItem Port direct runner StatefulParDo to KeyedWorkItem Propagate key through ParDo if DoFn is key-preserving Move responsibility for knowing about keyedness into EvaluationContext Add some key-preserving to KeyedPValueTrackingVisitor Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2f4b8031 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2f4b8031 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2f4b8031 Branch: refs/heads/python-sdk Commit: 2f4b80312c69da00df82aaa37d17cc2f6a742648 Parents: a526adb 1f018ab Author: Kenneth KnowlesAuthored: Tue Dec 20 12:39:34 2016 -0800 Committer: Kenneth Knowles Committed: Tue Dec 20 12:39:34 2016 -0800 -- .../beam/runners/direct/DirectRunner.java | 13 +- .../beam/runners/direct/EvaluationContext.java | 26 +++- .../direct/ExecutorServiceParallelExecutor.java | 8 +- .../direct/KeyedPValueTrackingVisitor.java | 44 -- .../beam/runners/direct/ParDoEvaluator.java | 13 +- .../runners/direct/ParDoEvaluatorFactory.java | 3 + .../direct/ParDoMultiOverrideFactory.java | 94 +++-- ...littableProcessElementsEvaluatorFactory.java | 1 + .../direct/StatefulParDoEvaluatorFactory.java | 36 ++--- .../runners/direct/EvaluationContextTest.java | 9 +- .../direct/KeyedPValueTrackingVisitorTest.java | 135 +-- .../beam/runners/direct/ParDoEvaluatorTest.java | 1 + .../StatefulParDoEvaluatorFactoryTest.java | 51 --- 13 files changed, 281 insertions(+), 153 deletions(-) --
[06/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-io-jms module to TestPipeline as a JUnit rule.
Migrated the beam-sdks-java-io-jms module to TestPipeline as a JUnit rule. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8d478c0f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8d478c0f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8d478c0f Branch: refs/heads/python-sdk Commit: 8d478c0f38c656d3533d590a65c6ed95da229f81 Parents: 5ccbe67 Author: Stas LevinAuthored: Tue Dec 20 17:31:23 2016 +0200 Committer: Kenneth Knowles Committed: Tue Dec 20 09:55:45 2016 -0800 -- .../src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java | 9 - 1 file changed, 4 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8d478c0f/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java -- diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 4c3be6d..7259ce8 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -28,7 +28,6 @@ import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -38,6 +37,7 @@ import org.apache.beam.sdk.values.PCollection; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -55,6 +55,9 @@ public class JmsIOTest { private BrokerService broker; private ConnectionFactory connectionFactory; + @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + @Before public void startBroker() throws Exception { broker = new BrokerService(); @@ -92,8 +95,6 @@ public class JmsIOTest { session.close(); connection.close(); -Pipeline pipeline = TestPipeline.create(); - // read from the queue PCollection output = pipeline.apply( JmsIO.read() @@ -117,8 +118,6 @@ public class JmsIOTest { @Category(NeedsRunner.class) public void testWriteMessage() throws Exception { -Pipeline pipeline = TestPipeline.create(); - ArrayList data = new ArrayList<>(); for (int i = 0; i < 100; i++) { data.add("Message " + i);
[11/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-io-java8tests module to TestPipeline as a JUnit rule + fixed WithTimestampsJava8Test.withTimestampsLambdaShouldApplyTimestamps.
Migrated the beam-sdks-java-io-java8tests module to TestPipeline as a JUnit rule + fixed WithTimestampsJava8Test.withTimestampsLambdaShouldApplyTimestamps. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4b23d42c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4b23d42c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4b23d42c Branch: refs/heads/python-sdk Commit: 4b23d42c31c95bed0d64bfc393fa193311e93498 Parents: fce4f65 Author: Stas LevinAuthored: Tue Dec 20 18:57:57 2016 +0200 Committer: Kenneth Knowles Committed: Tue Dec 20 09:55:46 2016 -0800 -- .../apache/beam/sdk/transforms/CombineJava8Test.java | 8 +++- .../apache/beam/sdk/transforms/DistinctJava8Test.java | 5 +++-- .../apache/beam/sdk/transforms/FilterJava8Test.java | 9 +++-- .../beam/sdk/transforms/FlatMapElementsJava8Test.java | 7 --- .../beam/sdk/transforms/MapElementsJava8Test.java | 9 ++--- .../beam/sdk/transforms/PartitionJava8Test.java | 7 --- .../apache/beam/sdk/transforms/WithKeysJava8Test.java | 6 -- .../beam/sdk/transforms/WithTimestampsJava8Test.java | 14 ++ 8 files changed, 37 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b23d42c/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java -- diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java index 98d99ce..a0f7ce6 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/CombineJava8Test.java @@ -23,7 +23,6 @@ import static org.hamcrest.Matchers.not; import com.google.common.collect.Iterables; import java.io.Serializable; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -45,6 +44,9 @@ import org.junit.runners.JUnit4; public class CombineJava8Test implements Serializable { @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + + @Rule public transient ExpectedException thrown = ExpectedException.none(); /** @@ -65,7 +67,6 @@ public class CombineJava8Test implements Serializable { */ @Test public void testCombineGloballyLambda() { -Pipeline pipeline = TestPipeline.create(); PCollection output = pipeline .apply(Create.of(1, 2, 3, 4)) @@ -86,7 +87,6 @@ public class CombineJava8Test implements Serializable { */ @Test public void testCombineGloballyInstanceMethodReference() { -Pipeline pipeline = TestPipeline.create(); PCollection output = pipeline .apply(Create.of(1, 2, 3, 4)) @@ -101,7 +101,6 @@ public class CombineJava8Test implements Serializable { */ @Test public void testCombinePerKeyLambda() { -Pipeline pipeline = TestPipeline.create(); PCollection > output = pipeline .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4))) @@ -125,7 +124,6 @@ public class CombineJava8Test implements Serializable { */ @Test public void testCombinePerKeyInstanceMethodReference() { -Pipeline pipeline = TestPipeline.create(); PCollection > output = pipeline .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4))) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b23d42c/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java -- diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java index 99ef232..790f51e 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/DistinctJava8Test.java @@ -44,11 +44,13 @@ import org.junit.runners.JUnit4; public class DistinctJava8Test { @Rule + public final transient TestPipeline p = TestPipeline.create(); + + @Rule public ExpectedException thrown = ExpectedException.none(); @Test public void withLambdaRepresentativeValuesFnAndTypeDescriptorShouldApplyFn() { -TestPipeline p = TestPipeline.create(); Multimap
[18/51] [abbrv] incubator-beam git commit: Port direct runner StatefulParDo to KeyedWorkItem
Port direct runner StatefulParDo to KeyedWorkItem Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1f018ab6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1f018ab6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1f018ab6 Branch: refs/heads/python-sdk Commit: 1f018ab69fdcc720a10e2aeb8ec1eea1c06e1cbc Parents: d040b7f Author: Kenneth KnowlesAuthored: Mon Dec 12 19:49:58 2016 -0800 Committer: Kenneth Knowles Committed: Tue Dec 20 11:19:07 2016 -0800 -- .../direct/KeyedPValueTrackingVisitor.java | 13 ++- .../direct/ParDoMultiOverrideFactory.java | 94 +--- .../direct/StatefulParDoEvaluatorFactory.java | 36 .../direct/KeyedPValueTrackingVisitorTest.java | 69 -- .../StatefulParDoEvaluatorFactoryTest.java | 51 +++ 5 files changed, 205 insertions(+), 58 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f018ab6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java index e91a768..65c41e0 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PValue; /** @@ -105,7 +106,15 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor { } private static boolean isKeyPreserving(PTransform transform) { -// There are currently no key-preserving transforms; this lays the infrastructure for them -return false; +// This is a hacky check for what is considered key-preserving to the direct runner. +// The most obvious alternative would be a package-private marker interface, but +// better to make this obviously hacky so it is less likely to proliferate. Meanwhile +// we intend to allow explicit expression of key-preserving DoFn in the model. +if (transform instanceof ParDo.BoundMulti) { + ParDo.BoundMulti parDo = (ParDo.BoundMulti) transform; + return parDo.getFn() instanceof ParDoMultiOverrideFactory.ToKeyedWorkItem; +} else { + return false; +} } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f018ab6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index c5bc069..2cea999 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -17,9 +17,15 @@ */ package org.apache.beam.runners.direct; +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.KeyedWorkItemCoder; +import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; @@ -28,6 +34,8 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo.BoundMulti; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -84,16 +92,41 @@ class ParDoMultiOverrideFactory @Override public PCollectionTuple
[17/51] [abbrv] incubator-beam git commit: Move responsibility for knowing about keyedness into EvaluationContext
Move responsibility for knowing about keyedness into EvaluationContext This will allow transform evaluators to inquire about whether various collections are keyed. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b26ceaa3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b26ceaa3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b26ceaa3 Branch: refs/heads/python-sdk Commit: b26ceaa347c4bc50abfb4c3c138167a25a99cf57 Parents: 81702e6 Author: Kenneth KnowlesAuthored: Thu Dec 8 13:28:44 2016 -0800 Committer: Kenneth Knowles Committed: Tue Dec 20 11:18:04 2016 -0800 -- .../beam/runners/direct/DirectRunner.java | 4 +-- .../beam/runners/direct/EvaluationContext.java | 26 +--- .../direct/ExecutorServiceParallelExecutor.java | 8 +- .../runners/direct/EvaluationContextTest.java | 9 ++- 4 files changed, 34 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b26ceaa3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index afa43ff..7e6ea15 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -315,14 +315,14 @@ public class DirectRunner extends PipelineRunner { getPipelineOptions(), clockSupplier.get(), Enforcement.bundleFactoryFor(enabledEnforcements, graph), -graph); +graph, +keyedPValueVisitor.getKeyedPValues()); RootProviderRegistry rootInputProvider = RootProviderRegistry.defaultRegistry(context); TransformEvaluatorRegistry registry = TransformEvaluatorRegistry.defaultRegistry(context); PipelineExecutor executor = ExecutorServiceParallelExecutor.create( options.getTargetParallelism(), graph, -keyedPValueVisitor.getKeyedPValues(), rootInputProvider, registry, Enforcement.defaultModelEnforcements(enabledEnforcements), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b26ceaa3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index 230d91b..cb9ddd8 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -27,6 +27,7 @@ import java.util.Collection; import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import javax.annotation.Nullable; @@ -99,17 +100,28 @@ class EvaluationContext { private final DirectMetrics metrics; + private final Set keyedPValues; + public static EvaluationContext create( - DirectOptions options, Clock clock, BundleFactory bundleFactory, DirectGraph graph) { -return new EvaluationContext(options, clock, bundleFactory, graph); + DirectOptions options, + Clock clock, + BundleFactory bundleFactory, + DirectGraph graph, + Set keyedPValues) { +return new EvaluationContext(options, clock, bundleFactory, graph, keyedPValues); } private EvaluationContext( - DirectOptions options, Clock clock, BundleFactory bundleFactory, DirectGraph graph) { + DirectOptions options, + Clock clock, + BundleFactory bundleFactory, + DirectGraph graph, + Set keyedPValues) { this.options = checkNotNull(options); this.clock = clock; this.bundleFactory = checkNotNull(bundleFactory); this.graph = checkNotNull(graph); +this.keyedPValues = keyedPValues; this.watermarkManager = WatermarkManager.create(clock, graph); this.sideInputContainer = SideInputContainer.create(this, graph.getViews()); @@ -244,6 +256,14 @@ class EvaluationContext { } /** + * Indicate whether or not this {@link PCollection} has been determined to be + * keyed. + */ + public boolean isKeyed(PValue pValue) { +return keyedPValues.contains(pValue); + } + + /** * Create a {@link
[26/51] [abbrv] incubator-beam git commit: Remove deprecated AggregatorFactory from SDK
Remove deprecated AggregatorFactory from SDK Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aab46a0e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aab46a0e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aab46a0e Branch: refs/heads/python-sdk Commit: aab46a0ec6e0e45208f64de7aabb9af643acd0ec Parents: a3f68d3 Author: Kenneth KnowlesAuthored: Thu Dec 15 20:13:25 2016 -0800 Committer: Kenneth Knowles Committed: Tue Dec 20 14:05:35 2016 -0800 -- .../apache/beam/sdk/transforms/Aggregator.java | 19 --- 1 file changed, 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aab46a0e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java index 43f53a8..4119c53 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Aggregator.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.transforms; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.util.ExecutionContext; /** * An {@code Aggregator} enables monitoring of values of type {@code InputT}, @@ -68,22 +67,4 @@ public interface Aggregator { * aggregator. */ CombineFn getCombineFn(); - - /** - * @deprecated this is for use only by runners and exists only for a migration period. Please - * use the identical interface in org.apache.beam.runners.core - */ - @Deprecated - interface AggregatorFactory { -/** - * Create an aggregator with the given {@code name} and {@link CombineFn}. - * - * This method is called to create an aggregator for a {@link DoFn}. It receives the - * class of the {@link DoFn} being executed and the context of the step it is being - * executed in. - */ - Aggregator createAggregatorForDoFn( -Class fnClass, ExecutionContext.StepContext stepContext, -String aggregatorName, CombineFn combine); - } }
[32/51] [abbrv] incubator-beam git commit: This closes #1668: Remove deprecated InMemoryTimerInternals from SDK
This closes #1668: Remove deprecated InMemoryTimerInternals from SDK Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0d0a5e28 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0d0a5e28 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0d0a5e28 Branch: refs/heads/python-sdk Commit: 0d0a5e2872aeba7a1069927408b3a9607709cf11 Parents: aadcf3a 9f1d3d1 Author: Kenneth KnowlesAuthored: Wed Dec 21 08:16:00 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 08:16:00 2016 -0800 -- .../sdk/util/state/InMemoryTimerInternals.java | 275 --- 1 file changed, 275 deletions(-) --
[15/51] [abbrv] incubator-beam git commit: Add some key-preserving to KeyedPValueTrackingVisitor
Add some key-preserving to KeyedPValueTrackingVisitor Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/81702e67 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/81702e67 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/81702e67 Branch: refs/heads/python-sdk Commit: 81702e67b92a23849cbc8f4a16b2a619e4b477a1 Parents: 22e25a4 Author: Kenneth KnowlesAuthored: Thu Dec 8 11:49:15 2016 -0800 Committer: Kenneth Knowles Committed: Tue Dec 20 11:18:02 2016 -0800 -- .../beam/runners/direct/DirectRunner.java | 9 +-- .../direct/KeyedPValueTrackingVisitor.java | 35 +--- .../direct/KeyedPValueTrackingVisitorTest.java | 84 +++- 3 files changed, 37 insertions(+), 91 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81702e67/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 78163c0..afa43ff 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -31,8 +31,6 @@ import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.runners.core.SplittableParDo; -import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow; -import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory; import org.apache.beam.runners.direct.ViewEvaluatorFactory.ViewOverrideFactory; @@ -306,12 +304,7 @@ public class DirectRunner extends PipelineRunner { graphVisitor.finishSpecifyingRemainder(); @SuppressWarnings("rawtypes") -KeyedPValueTrackingVisitor keyedPValueVisitor = -KeyedPValueTrackingVisitor.create( -ImmutableSet.of( -SplittableParDo.GBKIntoKeyedWorkItems.class, -DirectGroupByKeyOnly.class, -DirectGroupAlsoByWindow.class)); +KeyedPValueTrackingVisitor keyedPValueVisitor = KeyedPValueTrackingVisitor.create(); pipeline.traverseTopologically(keyedPValueVisitor); DisplayDataValidator.validatePipeline(pipeline); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81702e67/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java index 7f85169..e91a768 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java @@ -18,9 +18,15 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Predicates.in; +import static com.google.common.collect.Iterables.all; +import com.google.common.collect.ImmutableSet; import java.util.HashSet; import java.util.Set; +import org.apache.beam.runners.core.SplittableParDo; +import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow; +import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.GroupByKey; @@ -38,19 +44,21 @@ import org.apache.beam.sdk.values.PValue; // TODO: Handle Key-preserving transforms when appropriate and more aggressively make PTransforms // unkeyed class KeyedPValueTrackingVisitor implements PipelineVisitor { - @SuppressWarnings("rawtypes") - private final Set producesKeyedOutputs; + + private static final Set PRODUCES_KEYED_OUTPUTS = + ImmutableSet.of( + SplittableParDo.GBKIntoKeyedWorkItems.class, + DirectGroupByKeyOnly.class, + DirectGroupAlsoByWindow.class); + private final Set keyedValues; private boolean finalized; - public static KeyedPValueTrackingVisitor create( - @SuppressWarnings("rawtypes") Set producesKeyedOutputs) { -return new
[20/51] [abbrv] incubator-beam git commit: Move InMemoryTimerInternals to runners-core
Move InMemoryTimerInternals to runners-core Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/445c1205 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/445c1205 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/445c1205 Branch: refs/heads/python-sdk Commit: 445c120510948fb23e6d35b502da1e5a4f0ffdfb Parents: 22e25a4 Author: Kenneth KnowlesAuthored: Thu Dec 15 20:45:56 2016 -0800 Committer: Kenneth Knowles Committed: Tue Dec 20 11:21:52 2016 -0800 -- .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 1 - .../runners/core/InMemoryTimerInternals.java| 273 ++ .../core/InMemoryTimerInternalsTest.java| 155 +++ .../beam/runners/core/ReduceFnTester.java | 1 - .../beam/runners/core/SplittableParDoTest.java | 16 +- .../triggers/TriggerStateMachineTester.java | 2 +- .../translation/SparkGroupAlsoByWindowFn.java | 2 +- .../apache/beam/sdk/transforms/DoFnTester.java | 36 --- .../sdk/util/state/InMemoryTimerInternals.java | 275 --- .../util/state/InMemoryTimerInternalsTest.java | 153 --- 10 files changed, 443 insertions(+), 471 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java index 9189191..efcd771 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java new file mode 100644 index 000..5fcd088 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.MoreObjects; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowTracing; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.joda.time.Instant; + +/** {@link TimerInternals} with all watermarks and processing clock simulated in-memory. */ +public class InMemoryTimerInternals implements TimerInternals { + + /** At most one timer per timestamp is kept. */ + private Set existingTimers = new HashSet<>(); + + /** Pending input watermark timers, in timestamp order. */ + private PriorityQueue
[25/51] [abbrv] incubator-beam git commit: [BEAM-1097] Provide a better error message for non-existing gcpTempLocation
[BEAM-1097] Provide a better error message for non-existing gcpTempLocation This closes #1522 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/acd2196c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/acd2196c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/acd2196c Branch: refs/heads/python-sdk Commit: acd2196cf54e30e18e69c4dd30b57e6179909ecf Parents: 2f4b803 96d3931 Author: Luke CwikAuthored: Tue Dec 20 13:04:31 2016 -0800 Committer: Luke Cwik Committed: Tue Dec 20 13:04:31 2016 -0800 -- .../beam/runners/dataflow/DataflowRunner.java | 25 ++ .../options/DataflowPipelineOptions.java| 19 .../runners/dataflow/DataflowRunnerTest.java| 48 ++-- .../options/DataflowPipelineOptionsTest.java| 20 +--- .../org/apache/beam/sdk/options/GcpOptions.java | 19 .../apache/beam/sdk/util/GcsPathValidator.java | 3 +- .../apache/beam/sdk/options/GcpOptionsTest.java | 32 +++-- .../beam/sdk/util/GcsPathValidatorTest.java | 15 +- 8 files changed, 117 insertions(+), 64 deletions(-) --
[19/51] [abbrv] incubator-beam git commit: Restore SDK's InMemoryTimerInternals, deprecated
Restore SDK's InMemoryTimerInternals, deprecated Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/69d2c47b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/69d2c47b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/69d2c47b Branch: refs/heads/python-sdk Commit: 69d2c47b6a476099535e9cefe62d4cce5ccafbc1 Parents: 445c120 Author: Kenneth KnowlesAuthored: Fri Dec 16 20:22:59 2016 -0800 Committer: Kenneth Knowles Committed: Tue Dec 20 11:21:52 2016 -0800 -- .../sdk/util/state/InMemoryTimerInternals.java | 275 +++ 1 file changed, 275 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69d2c47b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java new file mode 100644 index 000..a910d64 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.MoreObjects; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowTracing; +import org.joda.time.Instant; + +/** + * @deprecated use {@code org.apache.beam.runners.core.InMemoryTimerInternals}. + */ +@Deprecated +public class InMemoryTimerInternals implements TimerInternals { + + /** At most one timer per timestamp is kept. */ + private Set existingTimers = new HashSet<>(); + + /** Pending input watermark timers, in timestamp order. */ + private PriorityQueue watermarkTimers = new PriorityQueue<>(11); + + /** Pending processing time timers, in timestamp order. */ + private PriorityQueue processingTimers = new PriorityQueue<>(11); + + /** Pending synchronized processing time timers, in timestamp order. */ + private PriorityQueue synchronizedProcessingTimers = new PriorityQueue<>(11); + + /** Current input watermark. */ + private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + + /** Current output watermark. */ + @Nullable private Instant outputWatermarkTime = null; + + /** Current processing time. */ + private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + + /** Current synchronized processing time. */ + private Instant synchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + + @Override + @Nullable + public Instant currentOutputWatermarkTime() { +return outputWatermarkTime; + } + + /** + * Returns when the next timer in the given time domain will fire, or {@code null} + * if there are no timers scheduled in that time domain. + */ + @Nullable + public Instant getNextTimer(TimeDomain domain) { +final TimerData data; +switch (domain) { + case EVENT_TIME: +data = watermarkTimers.peek(); +break; + case PROCESSING_TIME: +data = processingTimers.peek(); +break; + case SYNCHRONIZED_PROCESSING_TIME: +data = synchronizedProcessingTimers.peek(); +break; + default: +throw new IllegalArgumentException("Unexpected time domain: " + domain); +} +return (data == null) ? null : data.getTimestamp(); + } + + private PriorityQueue queue(TimeDomain domain) { +switch (domain) { + case EVENT_TIME: +return watermarkTimers; + case
[21/51] [abbrv] incubator-beam git commit: This closes #1652: Move InMemoryTimerInternals to runners-core
This closes #1652: Move InMemoryTimerInternals to runners-core * github/pr/1652: Restore SDK's InMemoryTimerInternals, deprecated Move InMemoryTimerInternals to runners-core Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a526adb3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a526adb3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a526adb3 Branch: refs/heads/python-sdk Commit: a526adb33c1eeece866f464e3dfd2cdbc3be6dea Parents: 22e25a4 69d2c47 Author: Kenneth KnowlesAuthored: Tue Dec 20 11:22:24 2016 -0800 Committer: Kenneth Knowles Committed: Tue Dec 20 11:22:24 2016 -0800 -- .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 1 - .../runners/core/InMemoryTimerInternals.java| 273 +++ .../core/InMemoryTimerInternalsTest.java| 155 +++ .../beam/runners/core/ReduceFnTester.java | 1 - .../beam/runners/core/SplittableParDoTest.java | 16 +- .../triggers/TriggerStateMachineTester.java | 2 +- .../translation/SparkGroupAlsoByWindowFn.java | 2 +- .../apache/beam/sdk/transforms/DoFnTester.java | 36 --- .../sdk/util/state/InMemoryTimerInternals.java | 4 +- .../util/state/InMemoryTimerInternalsTest.java | 153 --- 10 files changed, 445 insertions(+), 198 deletions(-) --
[12/51] [abbrv] incubator-beam git commit: Migrated the beam-sdks-java-io-mongodb module to TestPipeline as a JUnit rule.
Migrated the beam-sdks-java-io-mongodb module to TestPipeline as a JUnit rule. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fce4f658 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fce4f658 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fce4f658 Branch: refs/heads/python-sdk Commit: fce4f6584ca2fd3c2c258405b9f3014be9da3514 Parents: 950aa7e Author: Stas LevinAuthored: Tue Dec 20 18:09:30 2016 +0200 Committer: Kenneth Knowles Committed: Tue Dec 20 09:55:46 2016 -0800 -- .../org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java | 9 - .../java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java | 7 --- 2 files changed, 8 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fce4f658/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java -- diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java index df05c93..994be87 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java @@ -55,7 +55,6 @@ import java.util.List; import java.util.Random; import java.util.Scanner; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -79,6 +78,7 @@ import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.slf4j.Logger; @@ -100,6 +100,9 @@ public class MongoDBGridFSIOTest implements Serializable { private static int port; + @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + @BeforeClass public static void setup() throws Exception { try (ServerSocket serverSocket = new ServerSocket(0)) { @@ -182,7 +185,6 @@ public class MongoDBGridFSIOTest implements Serializable { @Test @Category(NeedsRunner.class) public void testFullRead() throws Exception { -TestPipeline pipeline = TestPipeline.create(); PCollection output = pipeline.apply( MongoDbGridFSIO.read() @@ -212,7 +214,6 @@ public class MongoDBGridFSIOTest implements Serializable { @Test @Category(NeedsRunner.class) public void testReadWithParser() throws Exception { -TestPipeline pipeline = TestPipeline.create(); PCollection > output = pipeline.apply( MongoDbGridFSIO. >read() @@ -297,8 +298,6 @@ public class MongoDBGridFSIOTest implements Serializable { @Category(NeedsRunner.class) public void testWriteMessage() throws Exception { -Pipeline pipeline = TestPipeline.create(); - ArrayList data = new ArrayList<>(100); ArrayList intData = new ArrayList<>(100); for (int i = 0; i < 1000; i++) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fce4f658/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java -- diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java index 5faa618..e7ff712 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java @@ -57,6 +57,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.slf4j.Logger; @@ -80,6 +81,9 @@ public class MongoDbIOTest implements Serializable { private static int port; + @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + /** * Looking for an available network port. */ @@ -143,7 +147,6 @@ public class MongoDbIOTest implements Serializable { @Test @Category(NeedsRunner.class) public void testFullRead() throws Exception { -TestPipeline pipeline = TestPipeline.create(); PCollection output = pipeline.apply( MongoDbIO.read() @@ -177,7 +180,6 @@ public class MongoDbIOTest implements Serializable { @Test
[jira] [Commented] (BEAM-1198) ViewFn: explicitly decouple runner materialization of side inputs from SDK-specific mapping
[ https://issues.apache.org/jira/browse/BEAM-1198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768387#comment-15768387 ] ASF GitHub Bot commented on BEAM-1198: -- GitHub user kennknowles opened a pull request: https://github.com/apache/incubator-beam/pull/1678 [BEAM-1198, BEAM-846, BEAM-260] Refactor Dataflow translator to decouple input and output graphs more easily Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [x] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [x] Replace `` in the title with the actual Jira issue number, if there is one. - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- This is preparatory work to make it possible for the translator to have a more loosely coupled relationship between its input and output. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kennknowles/incubator-beam Dataflow-Translator Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1678.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1678 commit 8ed4bb68660c537e4a12c1077ecfa104f9a82eaa Author: Kenneth KnowlesDate: 2016-12-21T22:21:50Z Inline needless interface DataflowTranslator.TranslationContext The only implementation was DataflowTranslator.Translator. This class needs some updating and the extra layer of the interface simply obscures that work. commit 272d06d7507ad7162616dd1b613efa7c8f5f4069 Author: Kenneth Knowles Date: 2016-12-21T22:34:27Z Explicitly pass Step to mutate in Dataflow translator Previously, there was always a "current" step that was the most recent step created. This makes it cumbersome or impossible to do things like translate one primitive transform into a small subgraph of steps. Thus we added hacks like CreatePCollectionView which are not actually part of the model at all - in fact, we should be able to add the needed CollectionToSingleton steps simply by looking at the side inputs of a ParDo node. > ViewFn: explicitly decouple runner materialization of side inputs from > SDK-specific mapping > --- > > Key: BEAM-1198 > URL: https://issues.apache.org/jira/browse/BEAM-1198 > Project: Beam > Issue Type: New Feature > Components: beam-model-fn-api, beam-model-runner-api >Reporter: Kenneth Knowles >Assignee: Kenneth Knowles > > For side inputs, the field {{PCollectionView.fromIterableInternal}} implies > an "iterable" materialization of the contents of a PCollection, which is > adapted to the desired user-facing type within a UDF (today the > PCollectionView "is" the UDF) > In practice, runners get adequate performance by special casing just a couple > of types of PCollectionView in a rather cumbersome manner. > The design to improve this is https://s.apache.org/beam-side-inputs-1-pager > and this makes the de facto standard approach the actual model. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request #1678: [BEAM-1198, BEAM-846, BEAM-260] Refactor ...
GitHub user kennknowles opened a pull request: https://github.com/apache/incubator-beam/pull/1678 [BEAM-1198, BEAM-846, BEAM-260] Refactor Dataflow translator to decouple input and output graphs more easily Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [x] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [x] Replace `` in the title with the actual Jira issue number, if there is one. - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- This is preparatory work to make it possible for the translator to have a more loosely coupled relationship between its input and output. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kennknowles/incubator-beam Dataflow-Translator Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1678.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1678 commit 8ed4bb68660c537e4a12c1077ecfa104f9a82eaa Author: Kenneth KnowlesDate: 2016-12-21T22:21:50Z Inline needless interface DataflowTranslator.TranslationContext The only implementation was DataflowTranslator.Translator. This class needs some updating and the extra layer of the interface simply obscures that work. commit 272d06d7507ad7162616dd1b613efa7c8f5f4069 Author: Kenneth Knowles Date: 2016-12-21T22:34:27Z Explicitly pass Step to mutate in Dataflow translator Previously, there was always a "current" step that was the most recent step created. This makes it cumbersome or impossible to do things like translate one primitive transform into a small subgraph of steps. Thus we added hacks like CreatePCollectionView which are not actually part of the model at all - in fact, we should be able to add the needed CollectionToSingleton steps simply by looking at the side inputs of a ParDo node. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (BEAM-1198) ViewFn: explicitly decouple runner materialization of side inputs from SDK-specific mapping
[ https://issues.apache.org/jira/browse/BEAM-1198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles reassigned BEAM-1198: - Assignee: Kenneth Knowles > ViewFn: explicitly decouple runner materialization of side inputs from > SDK-specific mapping > --- > > Key: BEAM-1198 > URL: https://issues.apache.org/jira/browse/BEAM-1198 > Project: Beam > Issue Type: New Feature > Components: beam-model-fn-api, beam-model-runner-api >Reporter: Kenneth Knowles >Assignee: Kenneth Knowles > > For side inputs, the field {{PCollectionView.fromIterableInternal}} implies > an "iterable" materialization of the contents of a PCollection, which is > adapted to the desired user-facing type within a UDF (today the > PCollectionView "is" the UDF) > In practice, runners get adequate performance by special casing just a couple > of types of PCollectionView in a rather cumbersome manner. > The design to improve this is https://s.apache.org/beam-side-inputs-1-pager > and this makes the de facto standard approach the actual model. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request #1677: Add a Test for Flatten with Heterogeneous...
GitHub user tgroh opened a pull request: https://github.com/apache/incubator-beam/pull/1677 Add a Test for Flatten with Heterogeneous Coders Add a category, and suppress in the Flink and Apex runners Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/incubator-beam flatten_multiple_coders Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1677.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1677 commit 13a8da998c9c03a8ccf914e1dcc16c2aed038930 Author: Thomas GrohDate: 2016-12-21T21:53:48Z Add a Test for Flatten with Heterogeneous Coders Add a category, and suppress in the Flink and Apex runners --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1676: Merge remote-tracking branch 'origin/mast...
GitHub user aaltay opened a pull request: https://github.com/apache/incubator-beam/pull/1676 Merge remote-tracking branch 'origin/master' into python-sdk There are frequent Apex runner failures the python-sdk branch, bringing up pom.xml changes to update those. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aaltay/incubator-beam python-sdk Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1676.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1676 commit c22e2a435113c9653b58f1040a4e9266059767f4 Author: Kenneth KnowlesDate: 2016-12-08T04:04:51Z No longer reject timers in ParDo commit 274f17f0c0df08785a78d9a60c22d5556e46584a Author: Kenneth Knowles Date: 2016-12-08T04:37:33Z Reject timers for ParDo in DirectRunner commit 96f9fce782d7ccc5257eff8993b4f9b8261651a6 Author: Kenneth Knowles Date: 2016-12-08T17:53:09Z This closes #1550 commit 95e2c53db535952aaf0c335e0d3d27a721c6b55d Author: Sela Date: 2016-12-08T18:29:35Z [BEAM-] Reject timers for ParDo in SparkRunner streaming evaluators commit 0bfa02dd26a7fb80753da1ed130acff1265d093a Author: Kenneth Knowles Date: 2016-12-08T19:54:33Z This closes #1553 commit a11eb637f6f1c7bdc41b24154cb3893bb194b096 Author: Kenneth Knowles Date: 2016-12-08T20:42:03Z Fix exclusion of UsesTimersInParDo for FlinkRunner streaming tests commit e292032cbe3defc5c61fb171a00dd8391a27afd8 Author: Kenneth Knowles Date: 2016-12-08T20:46:37Z This closes #1555 commit 4a12cd303df6d4c49425d38a6290f496349cc312 Author: Pei He Date: 2016-11-11T05:53:57Z [BEAM-978] Support bulk get file size in GcsUtil. commit 409b5dfcf6e9a9699eff43b041e9726bbb979b89 Author: Kenneth Knowles Date: 2016-12-08T23:47:48Z This closes #1359 commit bf1fba450e6b5fd6c98d006b381472eee8db7b72 Author: Eugene Kirpichov Date: 2016-12-07T02:00:03Z Fix a bug in SplittableDoFn Checkpointing Call checkpoint() only once if the SDF emits output several times per claim call. Calling checkpoint multiple times would clobber an existing checkpoint, and the second call would only ever return an empty residual, losing all of the initial residual. commit 40bd27602ebe2269ccaba2685addb8e5e3ba533e Author: Thomas Groh Date: 2016-12-09T01:17:56Z This closes #1525 commit 30ff1ee17bb290f2b50fd082d8cb63d48280c5c2 Author: Luke Cwik Date: 2016-12-08T23:22:35Z Add support for having an empty CountingInput/CountingSource commit ddb59125aeacb809b7695c203fe8b1a40e36aed2 Author: Luke Cwik Date: 2016-12-09T02:41:42Z Add support for having an empty CountingInput/CountingSource This closes #1557 commit 5169e492578a3759e20b50e36ace61bc86636ad2 Author: Luke Cwik Date: 2016-12-09T03:17:21Z fixup! Fix CountingInput naming commit 9bab78b55fe4661e9a221969441d8a2a4716f7e1 Author: Luke Cwik Date: 2016-12-09T03:20:22Z fixup! Fix extraneous brace commit 9bcba398c7516437c00517e03d75e27544b01166 Author: Dan Halperin Date: 2016-12-09T07:15:19Z DataflowRunner: bump environment major version commit 63d197cd0cff332b62a5f4398b1693b6839a348b Author: Dan Halperin Date: 2016-12-09T09:43:16Z Closes #1560 commit e48b0e6bc20d8eba2968decf7ac2b4ee7503a4df Author: Kenneth Knowles Date: 2016-12-09T07:33:40Z Remove misc uses of OldDoFn commit d9a6311734064b1c7171b943eeb511c4d648187a Author: Sela Date: 2016-12-09T16:01:12Z This closes #1561 commit f83370d69d193818e5be9ff613498f9f3296a658 Author: Thomas Groh Date: 2016-12-09T18:52:46Z Enable the DirectRunner by default in Examples Archetype This ensures a runner will be on the classpath if no profile is specified. This matches the generated examples with the quickstart. commit 49215106d818b6f0585fef5f37851fad5a2d4102 Author: Davor Bonaci Date: 2016-12-09T19:41:48Z This closes #1563 commit 9875b52cc3b6e1af2cbde850c6d3f5b1e75e313e Author: manuzhang Date: 2016-12-09T01:49:49Z [BEAM-1093] Change Dataflow to Beam in StateInternals' Javadocs commit 080dbaa3e2ea688fe04c3cad9b2612c39daba477 Author: Davor Bonaci Date: 2016-12-09T20:15:48Z This closes #1559 commit 1e44cb12c2663b1353717bf9237618df74684102 Author: bchambers Date: 2016-12-08T18:40:17Z Change Dataflow profiling option to
[jira] [Created] (BEAM-1200) PubsubIO should allow for a user to supply the function which computes the watermark that is reported
Luke Cwik created BEAM-1200: --- Summary: PubsubIO should allow for a user to supply the function which computes the watermark that is reported Key: BEAM-1200 URL: https://issues.apache.org/jira/browse/BEAM-1200 Project: Beam Issue Type: Improvement Components: sdk-java-gcp Reporter: Luke Cwik Assignee: Daniel Halperin Priority: Minor A user wanted to build a watermark function which tracked the datas watermark but never falls behind current time more than Y minutes. PubsubIO does not support specifying the function which computes and reports the watermark. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-1117) Support for new Timer API in Direct runner
[ https://issues.apache.org/jira/browse/BEAM-1117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768294#comment-15768294 ] ASF GitHub Bot commented on BEAM-1117: -- Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/1669 > Support for new Timer API in Direct runner > -- > > Key: BEAM-1117 > URL: https://issues.apache.org/jira/browse/BEAM-1117 > Project: Beam > Issue Type: New Feature > Components: runner-direct >Reporter: Kenneth Knowles >Assignee: Kenneth Knowles > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request #1669: [BEAM-1117] Preliminaries for timers in t...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/1669 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[5/7] incubator-beam git commit: Add static Window.withOutputTimeFn to match build method
Add static Window.withOutputTimeFn to match build method Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8188040d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8188040d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8188040d Branch: refs/heads/master Commit: 8188040d930b1fa49efd4ed7d5f821d05d6f28ef Parents: fa4958a Author: Kenneth KnowlesAuthored: Tue Dec 20 13:57:55 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 13:45:37 2016 -0800 -- .../org/apache/beam/sdk/transforms/windowing/Window.java| 9 + 1 file changed, 9 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8188040d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index 0c430d0..1241abe 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -223,6 +223,15 @@ public class Window { } /** + * (Experimental) Override the default {@link OutputTimeFn}, to control + * the output timestamp of values output from a {@link GroupByKey} operation. + */ + @Experimental(Kind.OUTPUT_TIME) + public static Bound withOutputTimeFn(OutputTimeFn outputTimeFn) { +return new Bound(null).withOutputTimeFn(outputTimeFn); + } + + /** * A {@code PTransform} that windows the elements of a {@code PCollection}, * into finite windows according to a user-specified {@code WindowFn}. *
[6/7] incubator-beam git commit: Add UsesTestStream for use with JUnit @Category
Add UsesTestStream for use with JUnit @Category Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4d71924c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4d71924c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4d71924c Branch: refs/heads/master Commit: 4d71924ccda9dae97c7cc9535a9780df9457cc3f Parents: 8188040 Author: Kenneth KnowlesAuthored: Tue Dec 20 14:20:07 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 13:45:37 2016 -0800 -- .../apache/beam/sdk/testing/UsesTestStream.java | 24 .../apache/beam/sdk/testing/TestStreamTest.java | 12 +- 2 files changed, 30 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d71924c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java new file mode 100644 index 000..8debb46 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTestStream.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +/** + * Category tag for tests that use {@link TestStream}, which is not a part of the Beam model + * but a special feature currently only implemented by the direct runner. + */ +public interface UsesTestStream {} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4d71924c/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java index 64aeca3..c12e9f3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java @@ -69,7 +69,7 @@ public class TestStreamTest implements Serializable { @Rule public transient ExpectedException thrown = ExpectedException.none(); @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, UsesTestStream.class}) public void testLateDataAccumulating() { Instant instant = new Instant(0); TestStream source = TestStream.create(VarIntCoder.of()) @@ -136,7 +136,7 @@ public class TestStreamTest implements Serializable { } @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, UsesTestStream.class}) public void testProcessingTimeTrigger() { TestStream source = TestStream.create(VarLongCoder.of()) .addElements(TimestampedValue.of(1L, new Instant(1000L)), @@ -159,7 +159,7 @@ public class TestStreamTest implements Serializable { } @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, UsesTestStream.class}) public void testDiscardingMode() { TestStream stream = TestStream.create(StringUtf8Coder.of()) @@ -208,7 +208,7 @@ public class TestStreamTest implements Serializable { } @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, UsesTestStream.class}) public void testFirstElementLate() { Instant lateElementTimestamp = new Instant(-1_000_000); TestStream stream = @@ -238,7 +238,7 @@ public class TestStreamTest implements Serializable { } @Test - @Category(NeedsRunner.class) + @Category({NeedsRunner.class, UsesTestStream.class}) public void testElementsAtAlmostPositiveInfinity() { Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp(); TestStream stream = TestStream.create(StringUtf8Coder.of()) @@ -261,7 +261,7 @@ public class TestStreamTest implements Serializable { } @Test - @Category(NeedsRunner.class) +
[2/7] incubator-beam git commit: Hold output watermark according to pending timers
Hold output watermark according to pending timers Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dfe2e62d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dfe2e62d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dfe2e62d Branch: refs/heads/master Commit: dfe2e62d103595583e3ca4594cc03885fe1bba16 Parents: 7f14c46 Author: Kenneth KnowlesAuthored: Tue Dec 20 13:37:40 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 13:45:37 2016 -0800 -- .../beam/runners/direct/WatermarkManager.java | 59 1 file changed, 48 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dfe2e62d/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index f7bafd1..248fafd 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -211,12 +211,18 @@ public class WatermarkManager { private static class AppliedPTransformInputWatermark implements Watermark { private final Collection inputWatermarks; private final SortedMultiset pendingElements; -private final Map objectTimers; + +// This tracks only the quantity of timers at each timestamp, for quickly getting the cross-key +// minimum +private final SortedMultiset pendingTimers; // Entries in this table represent the authoritative timestamp for which // a per-key-and-StateNamespace timer is set. private final Map > existingTimers; +// This per-key sorted set allows quick retrieval of timers that should fire for a key +private final Map objectTimers; + private AtomicReference currentWatermark; public AppliedPTransformInputWatermark(Collection inputWatermarks) { @@ -224,10 +230,13 @@ public class WatermarkManager { // The ordering must order elements by timestamp, and must not compare two distinct elements // as equal. This is built on the assumption that any element added as a pending element will // be consumed without modifications. + // + // The same logic is applied for pending timers Ordering pendingBundleComparator = new BundleByElementTimestampComparator().compound(Ordering.arbitrary()); this.pendingElements = TreeMultiset.create(pendingBundleComparator); + this.pendingTimers = TreeMultiset.create(); this.objectTimers = new HashMap<>(); this.existingTimers = new HashMap<>(); currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE); @@ -278,6 +287,14 @@ public class WatermarkManager { pendingElements.remove(completed); } +private synchronized Instant getEarliestTimerTimestamp() { + if (pendingTimers.isEmpty()) { +return BoundedWindow.TIMESTAMP_MAX_VALUE; + } else { +return pendingTimers.firstEntry().getElement(); + } +} + private synchronized void updateTimers(TimerUpdate update) { NavigableSet keyTimers = objectTimers.get(update.key); if (keyTimers == null) { @@ -291,27 +308,43 @@ public class WatermarkManager { existingTimers.put(update.key, existingTimersForKey); } - for (TimerData timer : update.setTimers) { + for (TimerData timer : update.getSetTimers()) { +if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { + @Nullable + TimerData existingTimer = + existingTimersForKey.get(timer.getNamespace(), timer.getTimerId()); + + if (existingTimer == null) { +pendingTimers.add(timer.getTimestamp()); +keyTimers.add(timer); + } else if (!existingTimer.equals(timer)) { +keyTimers.remove(existingTimer); +keyTimers.add(timer); + } // else the timer is already set identically, so noop + + existingTimersForKey.put(timer.getNamespace(), timer.getTimerId(), timer); +} + } + + for (TimerData timer : update.getDeletedTimers()) { if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { @Nullable TimerData existingTimer = existingTimersForKey.get(timer.getNamespace(), timer.getTimerId());
[3/7] incubator-beam git commit: Use informative Instant formatter in WatermarkHold
Use informative Instant formatter in WatermarkHold Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa4958a6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa4958a6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa4958a6 Branch: refs/heads/master Commit: fa4958a6140eb00ceee08b2468f7d88f17538794 Parents: 280a6a8 Author: Kenneth KnowlesAuthored: Mon Dec 19 20:40:47 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 13:45:37 2016 -0800 -- .../apache/beam/runners/core/WatermarkHold.java | 4 +++- .../sdk/transforms/windowing/BoundedWindow.java | 19 +++ 2 files changed, 22 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa4958a6/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java -- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java index 7f1afcc..5e5f44d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java @@ -207,7 +207,9 @@ class WatermarkHold implements Serializable { Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window); checkState(!shifted.isBefore(timestamp), "OutputTimeFn moved element from %s to earlier time %s for window %s", -timestamp, shifted, window); +BoundedWindow.formatTimestamp(timestamp), +BoundedWindow.formatTimestamp(shifted), +window); checkState(timestamp.isAfter(window.maxTimestamp()) || !shifted.isAfter(window.maxTimestamp()), "OutputTimeFn moved element from %s to %s which is beyond end of " http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa4958a6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java index 6da2495..74223b5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java @@ -34,11 +34,30 @@ import org.joda.time.Instant; public abstract class BoundedWindow { // The min and max timestamps that won't overflow when they are converted to // usec. + + /** + * The minimum value for any Beam timestamp. Often referred to as "-infinity". + * + * This value and {@link #TIMESTAMP_MAX_VALUE} are chosen so that their + * microseconds-since-epoch can be safely represented with a {@code long}. + */ public static final Instant TIMESTAMP_MIN_VALUE = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE)); + + /** + * The maximum value for any Beam timestamp. Often referred to as "+infinity". + * + * This value and {@link #TIMESTAMP_MIN_VALUE} are chosen so that their + * microseconds-since-epoch can be safely represented with a {@code long}. + */ public static final Instant TIMESTAMP_MAX_VALUE = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE)); + /** + * Formats a {@link Instant} timestamp with additional Beam-specific metadata, such as indicating + * whether the timestamp is the end of the global window or one of the distinguished values {@link + * #TIMESTAMP_MIN_VALUE} or {@link #TIMESTAMP_MIN_VALUE}. + */ public static String formatTimestamp(Instant timestamp) { if (timestamp.equals(TIMESTAMP_MIN_VALUE)) { return timestamp.toString() + " (TIMESTAMP_MIN_VALUE)";
[7/7] incubator-beam git commit: This closes #1669: Preliminaries for timers in the direct runner
This closes #1669: Preliminaries for timers in the direct runner Hold output watermark according to pending timers Allow setting timer by ID in DirectTimerInternals Add UsesTestStream for use with JUnit @Category Add static Window.withOutputTimeFn to match build method Use informative Instant formatter in WatermarkHold Add informative Instant formatter to BoundedWindow Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/57d9bbd7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/57d9bbd7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/57d9bbd7 Branch: refs/heads/master Commit: 57d9bbd797edfcf32fdd9284b802fc4f9694e8d2 Parents: ff39516 dfe2e62 Author: Kenneth KnowlesAuthored: Wed Dec 21 13:46:34 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 13:46:34 2016 -0800 -- .../apache/beam/runners/core/WatermarkHold.java | 4 +- .../runners/direct/DirectTimerInternals.java| 2 +- .../beam/runners/direct/WatermarkManager.java | 78 ++-- .../apache/beam/sdk/testing/UsesTestStream.java | 24 ++ .../sdk/transforms/windowing/BoundedWindow.java | 31 .../beam/sdk/transforms/windowing/Window.java | 9 +++ .../apache/beam/sdk/testing/TestStreamTest.java | 12 +-- 7 files changed, 144 insertions(+), 16 deletions(-) --
[4/7] incubator-beam git commit: Allow setting timer by ID in DirectTimerInternals
Allow setting timer by ID in DirectTimerInternals Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7f14c463 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7f14c463 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7f14c463 Branch: refs/heads/master Commit: 7f14c463acd2ae5b86ac81a9528ac4aa7dff765f Parents: 4d71924 Author: Kenneth KnowlesAuthored: Wed Dec 7 20:18:44 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 13:45:37 2016 -0800 -- .../runners/direct/DirectTimerInternals.java| 2 +- .../beam/runners/direct/WatermarkManager.java | 25 2 files changed, 26 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f14c463/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java index 5ca276d..80e0721 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java @@ -49,7 +49,7 @@ class DirectTimerInternals implements TimerInternals { @Override public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) { -throw new UnsupportedOperationException("Setting timer by ID not yet supported."); +timerUpdateBuilder.setTimer(TimerData.of(timerId, namespace, target, timeDomain)); } @Deprecated http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f14c463/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index 7bed751..f7bafd1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -23,11 +23,13 @@ import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.collect.ComparisonChain; +import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.common.collect.SortedMultiset; +import com.google.common.collect.Table; import com.google.common.collect.TreeMultiset; import java.io.Serializable; import java.util.ArrayList; @@ -56,6 +58,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TaggedPValue; import org.joda.time.Instant; @@ -210,6 +213,10 @@ public class WatermarkManager { private final SortedMultiset pendingElements; private final Map objectTimers; +// Entries in this table represent the authoritative timestamp for which +// a per-key-and-StateNamespace timer is set. +private final Map > existingTimers; + private AtomicReference currentWatermark; public AppliedPTransformInputWatermark(Collection inputWatermarks) { @@ -222,6 +229,7 @@ public class WatermarkManager { this.pendingElements = TreeMultiset.create(pendingBundleComparator); this.objectTimers = new HashMap<>(); + this.existingTimers = new HashMap<>(); currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE); } @@ -276,14 +284,31 @@ public class WatermarkManager { keyTimers = new TreeSet<>(); objectTimers.put(update.key, keyTimers); } + Table existingTimersForKey = + existingTimers.get(update.key); + if (existingTimersForKey == null) { +existingTimersForKey = HashBasedTable.create(); +existingTimers.put(update.key, existingTimersForKey); + } +
[1/7] incubator-beam git commit: Add informative Instant formatter to BoundedWindow
Repository: incubator-beam Updated Branches: refs/heads/master ff3951699 -> 57d9bbd79 Add informative Instant formatter to BoundedWindow Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/280a6a8f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/280a6a8f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/280a6a8f Branch: refs/heads/master Commit: 280a6a8f729cb382616ad65f71860b61277cbd6f Parents: ff39516 Author: Kenneth KnowlesAuthored: Mon Dec 19 20:40:11 2016 -0800 Committer: Kenneth Knowles Committed: Wed Dec 21 13:45:36 2016 -0800 -- .../beam/sdk/transforms/windowing/BoundedWindow.java| 12 1 file changed, 12 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/280a6a8f/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java index 3654074..6da2495 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java @@ -39,6 +39,18 @@ public abstract class BoundedWindow { public static final Instant TIMESTAMP_MAX_VALUE = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE)); + public static String formatTimestamp(Instant timestamp) { +if (timestamp.equals(TIMESTAMP_MIN_VALUE)) { + return timestamp.toString() + " (TIMESTAMP_MIN_VALUE)"; +} else if (timestamp.equals(TIMESTAMP_MAX_VALUE)) { + return timestamp.toString() + " (TIMESTAMP_MAX_VALUE)"; +} else if (timestamp.equals(GlobalWindow.INSTANCE.maxTimestamp())) { + return timestamp.toString() + " (end of global window)"; +} else { + return timestamp.toString(); +} + } + /** * Returns the inclusive upper bound of timestamps for values in this window. */
[jira] [Commented] (BEAM-1194) DataflowRunner should test a variety of valid tempLocation/stagingLocation/etc formats.
[ https://issues.apache.org/jira/browse/BEAM-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15768284#comment-15768284 ] ASF GitHub Bot commented on BEAM-1194: -- Github user dhalperi closed the pull request at: https://github.com/apache/incubator-beam/pull/1671 > DataflowRunner should test a variety of valid > tempLocation/stagingLocation/etc formats. > --- > > Key: BEAM-1194 > URL: https://issues.apache.org/jira/browse/BEAM-1194 > Project: Beam > Issue Type: Test > Components: runner-dataflow >Reporter: Daniel Halperin >Assignee: Daniel Halperin >Priority: Minor > > Cloud Dataflow has a minor history of small bugs related to various code > paths expecting there to be or not be a trailing forward-slash in these > location fields. The way that Beam's integration tests are set up, we are > likely to only have one of these two cases tested (there is a single set of > integration test pipeline options). > We should add a dedicated DataflowRunner integration test to handle this case. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request #1671: [BEAM-1194] Add DataflowLocationIT
Github user dhalperi closed the pull request at: https://github.com/apache/incubator-beam/pull/1671 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1671: [BEAM-1194] Add DataflowLocationIT
GitHub user dhalperi reopened a pull request: https://github.com/apache/incubator-beam/pull/1671 [BEAM-1194] Add DataflowLocationIT Opening PR to test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dhalperi/incubator-beam trailing-slash Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1671.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1671 commit 7ad2ebd8e96a469e38c156de2d2701e500c3d955 Author: Dan HalperinDate: 2016-12-21T01:56:00Z Add DataflowLocationIT --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1675: [Beam-1186] Migrating the remaining tests...
GitHub user staslev opened a pull request: https://github.com/apache/incubator-beam/pull/1675 [Beam-1186] Migrating the remaining tests to use TestPipeline as a JUnit rule. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/staslev/incubator-beam BEAM-1186-migrating-remaining-tests-to-TestPipeline-rule Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1675.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1675 commit e6fd0b308583222b37e055476f34d020563893f5 Author: Stas LevinDate: 2016-12-21T15:58:35Z [BEAM-1186] Broke AvroIOGeneratedClassTest into 2 parametrised test classes that support TestPipeline as a JUnit rule. commit 52ca16451f807c026276dba9448d4c5db12c866e Author: Stas Levin Date: 2016-12-21T21:00:39Z [BEAM-1186] Broke ApproximateUniqueTest into 3 test classes that support TestPipeline as a JUnit rule. commit d46f66d8c5f0f63f783561ba684dc003b3327b3a Author: Stas Levin Date: 2016-12-21T21:21:11Z [BEAM-1186] Broke SampleTest into 2 test classes that support TestPipeline as a JUnit rule. commit 96da02a1c9014d186ce264c745e4fd48883dd4c3 Author: Stas Levin Date: 2016-12-21T21:32:27Z [BEAM-1186] Migrated BigtableIOTest to use TestPipeline as a JUnit rule by stabilizing runReadTest's (read) transform name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1639: [BEAM-1112] Python E2E Test Framework And...
Github user markflyhigh closed the pull request at: https://github.com/apache/incubator-beam/pull/1639 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (BEAM-1199) Condense recordAsOutput, finishSpecifyingOutput from POutput
Thomas Groh created BEAM-1199: - Summary: Condense recordAsOutput, finishSpecifyingOutput from POutput Key: BEAM-1199 URL: https://issues.apache.org/jira/browse/BEAM-1199 Project: Beam Issue Type: Improvement Components: sdk-java-core Reporter: Thomas Groh Assignee: Davor Bonaci Priority: Minor {{recordAsOutput}} and {{finishSpecifyingOutput}} are both methods which are called after an output has been attached to a PTransform application. They can be combined to only have one method that does any after-production work (such as the initial run of Coder inference) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request #1639: [BEAM-1112] Python E2E Test Framework And...
GitHub user markflyhigh reopened a pull request: https://github.com/apache/incubator-beam/pull/1639 [BEAM-1112] Python E2E Test Framework And Wordcount E2E Test Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [x] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- - E2e test framework that supports TestRunner start and verify pipeline job. - add `TestOptions` which defined `on_success_matcher` that is used to verify state/output of pipeline job. - validate `on_success_matcher` before pipeline execution to make sure it's unpicklable to a subclass of BaseMatcher. - create a `TestDataflowRunner` which provide functionalities of `DataflowRunner` plus result verification. - provide a test verifier `PipelineStateMatcher` that can verify pipeline job finished in DONE or not. - Add wordcount_it (it = integration test) that build e2e test based on existing wordcount pipeline. - include wordcount_it to nose collector, so that wordcount_it can be collected and run by nose. - skip ITs when running unit tests from tox in precommit and postcommit. Current changes will not change behavior of existing pre/postcommit. Test is done by running `tox -e py27 -c sdks/python/tox.ini` for unit test and running wordcount_it with `TestDataflowRunner` on service ([link](https://pantheon.corp.google.com/dataflow/job/2016-12-15_17_36_16-3857167705491723621?project=google.com:clouddfe)). TODO: - Output data verifier that verify pipeline output that stores in filesystem. - Add wordcount_it to precommit and replace existing wordcount execution command in postcommit with a better structured nose command. You can merge this pull request into a Git repository by running: $ git pull https://github.com/markflyhigh/incubator-beam e2e-testrunner Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1639.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1639 commit e1e1fa3a60e1fe234829432d144d6689e240b6f0 Author: Mark LiuDate: 2016-12-16T01:41:20Z [BEAM-1112] Python E2E Test Framework And Wordcount E2E Test commit 0e7007879ee082e3afe5db36107f51c03274f3f5 Author: Mark Liu Date: 2016-12-16T02:55:53Z fixup! Fix Code Style commit d6d71a717e8ed7ab32ffa02621c837c797f66cd7 Author: Mark Liu Date: 2016-12-20T19:15:59Z fixup! Address Ahmet comments commit 669077fff2032cf5d48e891a097f3e33bef75679 Author: Mark Liu Date: 2016-12-20T22:18:52Z Add Hamcrest To Tox For autocomplete_test Execution commit 601956fba1bee486e1aadc4fd1193848867c50d6 Author: Mark Liu Date: 2016-12-21T19:16:22Z fixup! Fix Unit Test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (BEAM-846) Decouple side input window mapping from WindowFn
[ https://issues.apache.org/jira/browse/BEAM-846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15613384#comment-15613384 ] Kenneth Knowles edited comment on BEAM-846 at 12/21/16 9:19 PM: Design 1-pager is https://s.apache.org/beam-windowmappingfn-1-pager was (Author: kenn): Design 1-pager is https://s.apache.org/beam-side-inputs-1-pager and a couple PRs have been authored ([#520|https://github.com/apache/incubator-beam/pull/520] and [#1076|https://github.com/apache/incubator-beam/pull/1076]) attributed to BEAM-115 (the "all of the Runner API" ticket) > Decouple side input window mapping from WindowFn > > > Key: BEAM-846 > URL: https://issues.apache.org/jira/browse/BEAM-846 > Project: Beam > Issue Type: New Feature > Components: beam-model-runner-api, sdk-java-core >Reporter: Robert Bradshaw >Assignee: Kenneth Knowles > Labels: backward-incompatible > > Currently the main WindowFn provides as getSideInputWindow method. Instead, > this mapping should be specified per-side-input (thought the default mapping > would remain the same). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-210) Allow control of empty ON_TIME panes analogous to final panes
[ https://issues.apache.org/jira/browse/BEAM-210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles updated BEAM-210: - Component/s: (was: beam-model) > Allow control of empty ON_TIME panes analogous to final panes > - > > Key: BEAM-210 > URL: https://issues.apache.org/jira/browse/BEAM-210 > Project: Beam > Issue Type: Bug > Components: beam-model-runner-api, sdk-java-core >Reporter: Mark Shields >Assignee: Thomas Groh > > Today, ON_TIME panes are emitted whether or not they are empty. We had > decided that for final panes the user would want to control this behavior, to > control data volume. But for ON_TIME panes no such control exists. The > rationale is perhaps that the ON_TIME pane is a fundamental result that > should not be elided. To be considered: whether this is what we want. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-260) WindowMappingFn: Know the getSideInputWindow upper bound to release side input resources
[ https://issues.apache.org/jira/browse/BEAM-260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles updated BEAM-260: - Component/s: (was: beam-model) beam-model-fn-api > WindowMappingFn: Know the getSideInputWindow upper bound to release side > input resources > > > Key: BEAM-260 > URL: https://issues.apache.org/jira/browse/BEAM-260 > Project: Beam > Issue Type: Bug > Components: beam-model-fn-api, beam-model-runner-api >Reporter: Mark Shields >Assignee: Kenneth Knowles > > We currently have no static knowledge about the getSideInputWindow function, > and runners are thus forced to hold on to all side input state / elements in > case a future element reaches back into an earlier side input element. > Maybe we need an upper bound on lag from current to result of > getSideInputWindow so we can have a progressing gc horizon as we do for GKB > window state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-846) Decouple side input window mapping from WindowFn
[ https://issues.apache.org/jira/browse/BEAM-846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles updated BEAM-846: - Component/s: (was: beam-model) > Decouple side input window mapping from WindowFn > > > Key: BEAM-846 > URL: https://issues.apache.org/jira/browse/BEAM-846 > Project: Beam > Issue Type: New Feature > Components: beam-model-runner-api, sdk-java-core >Reporter: Robert Bradshaw >Assignee: Kenneth Knowles > Labels: backward-incompatible > > Currently the main WindowFn provides as getSideInputWindow method. Instead, > this mapping should be specified per-side-input (thought the default mapping > would remain the same). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-1193) Give Coders URNs and document their binary formats outside the Java code base
[ https://issues.apache.org/jira/browse/BEAM-1193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles updated BEAM-1193: -- Component/s: (was: beam-model) > Give Coders URNs and document their binary formats outside the Java code base > - > > Key: BEAM-1193 > URL: https://issues.apache.org/jira/browse/BEAM-1193 > Project: Beam > Issue Type: New Feature > Components: beam-model-runner-api >Reporter: Kenneth Knowles >Assignee: Frances Perry > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (BEAM-653) Refine specification for WindowFn.isCompatible()
[ https://issues.apache.org/jira/browse/BEAM-653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles updated BEAM-653: - Component/s: beam-model-runner-api > Refine specification for WindowFn.isCompatible() > - > > Key: BEAM-653 > URL: https://issues.apache.org/jira/browse/BEAM-653 > Project: Beam > Issue Type: New Feature > Components: beam-model, beam-model-runner-api >Reporter: Kenneth Knowles > > {{WindowFn#isCompatible}} doesn't really have a spec. In practice, it is used > primarily when flattening together multiple PCollections. All of the > WindowFns must be compatible, and then just a single WindowFn is selected > arbitrarily for the output PCollection. > In consequence, downstream of the Flatten, the merging behavior will be taken > from this WindowFn. > Currently, there are some mismatches: > - Sessions with different gap durations _are_ compatible today, but probably > shouldn't be since merging makes little sense. (The use of tiny proto-windows > is an implementation detail anyhow) > - SlidingWindows and FixedWindows _could_ reasonably be compatible if they > had the same duration, though it might be odd. > Either way, we should just nail down what we actually mean so we can arrive > at a verdict in these cases. -- This message was sent by Atlassian JIRA (v6.3.4#6332)