[1/2] incubator-beam git commit: Change PAssert's dummy inputs from (Void) null to integer 0
Repository: incubator-beam Updated Branches: refs/heads/master 6721bd584 -> 93a5d390b Change PAssert's dummy inputs from (Void) null to integer 0 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9a5503db Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9a5503db Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9a5503db Branch: refs/heads/master Commit: 9a5503db954eccfe0215ee473417bfafb495b61e Parents: 6721bd5 Author: Kenneth KnowlesAuthored: Fri May 6 11:19:33 2016 -0700 Committer: Kenneth Knowles Committed: Thu May 12 17:45:43 2016 -0700 -- .../java/org/apache/beam/sdk/testing/PAssert.java | 18 -- 1 file changed, 12 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a5503db/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index 1265acd..c2cd598 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.MapCoder; -import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Aggregator; @@ -593,7 +593,7 @@ public class PAssert { final PCollectionView actual = input.apply("CreateActual", createActual); input - .apply(Create.of((Void) null).withCoder(VoidCoder.of())) + .apply(Create.of(0).withCoder(VarIntCoder.of())) .apply(ParDo.named("RunChecks").withSideInputs(actual) .of(new CheckerDoFn<>(checkerFn, actual))); @@ -604,8 +604,11 @@ public class PAssert { /** * A {@link DoFn} that runs a checking {@link SerializableFunction} on the contents of * a {@link PCollectionView}, and adjusts counters and thrown exceptions for use in testing. + * + * The input is ignored, but is {@link Integer} to be usable on runners that do not support + * null values. */ - private static class CheckerDoFn extends DoFn { + private static class CheckerDoFn extends DoFn { private final SerializableFunction checkerFn; private final Aggregator success = createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn()); @@ -669,14 +672,17 @@ public class PAssert { final PCollectionView expected = input.apply("CreateExpected", createExpected); input - .apply(Create.of((Void) null).withCoder(VoidCoder.of())) - .apply(ParDo.named("RunChecks").withSideInputs(actual, expected) + .apply(Create.of(0).withCoder(VarIntCoder.of())) + .apply("RunChecks", ParDo.withSideInputs(actual, expected) .of(new CheckerDoFn<>(relation, actual, expected))); return PDone.in(input.getPipeline()); } -private static class CheckerDoFn extends DoFn { +/** + * Input is ignored, but is {@link Integer} for runners that do not support null values. + */ +private static class CheckerDoFn extends DoFn { private final Aggregator success = createAggregator(SUCCESS_COUNTER, new Sum.SumIntegerFn()); private final Aggregator failure =
[GitHub] incubator-beam pull request: Change PAssert's dummy inputs from (V...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/329 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: This closes #329
This closes #329 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/93a5d390 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/93a5d390 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/93a5d390 Branch: refs/heads/master Commit: 93a5d390b8a4f2b52246b3d73c04a6f5af451de6 Parents: 6721bd5 9a5503d Author: Kenneth KnowlesAuthored: Thu May 12 18:51:31 2016 -0700 Committer: Kenneth Knowles Committed: Thu May 12 18:51:31 2016 -0700 -- .../java/org/apache/beam/sdk/testing/PAssert.java | 18 -- 1 file changed, 12 insertions(+), 6 deletions(-) --
[jira] [Created] (BEAM-280) TestPipeline should be constructible without a runner
Kenneth Knowles created BEAM-280: Summary: TestPipeline should be constructible without a runner Key: BEAM-280 URL: https://issues.apache.org/jira/browse/BEAM-280 Project: Beam Issue Type: Bug Components: sdk-java-core Reporter: Kenneth Knowles Assignee: Thomas Groh Today, one cannot create a {{Pipeline}} without a runner, as the runner is wired in to do transform expansions. However, we want to remove the {{DirectPipelineRunner}} from the SDK, so a {{TestPipeline}} should default to a no-op runner that performs no expansion, but crashes upon {{run()}}, or some such, in order to execute tests that do not really require a runner. (As soon as possible, this expansion wiring will be removed, but if we keep the {{Pipeline.run()}} convenience method, we may still need some optional runner set up) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-22) DirectPipelineRunner: support for unbounded collections
[ https://issues.apache.org/jira/browse/BEAM-22?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15282308#comment-15282308 ] Thomas Groh commented on BEAM-22: - As of the merging of https://github.com/apache/incubator-beam/pull/319, the implementation is complete, pending the swapping of the defaults, removal of the legacy DirectPipelineRunner, and rename. > DirectPipelineRunner: support for unbounded collections > --- > > Key: BEAM-22 > URL: https://issues.apache.org/jira/browse/BEAM-22 > Project: Beam > Issue Type: Improvement > Components: runner-direct >Reporter: Davor Bonaci >Assignee: Thomas Groh > > DirectPipelineRunner currently runs over bounded PCollections only, and > implements only a portion of the Beam Model. > We should improve it to faithfully implement the full Beam Model, such as add > ability to run over unbounded PCollections, and better resemble execution > model in a distributed system. > This further enables features such as a testing source which may simulate > late data and test triggers in the pipeline. Finally, we may want to expose > an option to select between "debug" (single threaded), "chaos monkey" (test > as many model requirements as possible), and "performance" (multi-threaded). > more testing (chaos monkey) > Once this is done, we should update this StackOverflow question: > http://stackoverflow.com/questions/35350113/testing-triggers-with-processing-time/35401426#35401426 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request: VoidCoder doesn't get special treatme...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/331 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: VoidCoder doesn't get special treatment in create() for streaming anymore
Repository: incubator-beam Updated Branches: refs/heads/master a760b4c83 -> 6721bd584 VoidCoder doesn't get special treatment in create() for streaming anymore Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9d521151 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9d521151 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9d521151 Branch: refs/heads/master Commit: 9d5211513c10a19d61e9c05c85967163beb61710 Parents: a760b4c Author: SelaAuthored: Fri May 13 02:04:06 2016 +0300 Committer: Sela Committed: Fri May 13 02:11:59 2016 +0300 -- .../streaming/StreamingTransformTranslator.java | 14 +- 1 file changed, 1 insertion(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d521151/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java -- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index c1ecc43..8154cd7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -28,7 +28,6 @@ import org.apache.beam.runners.spark.translation.TransformEvaluator; import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.runners.spark.translation.WindowingHelpers; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -129,18 +128,7 @@ public final class StreamingTransformTranslator { StreamingEvaluationContext sec = (StreamingEvaluationContext) context; Iterable elems = transform.getElements(); Coder coder = sec.getOutput(transform).getCoder(); -if (coder != VoidCoder.of()) { - // actual create - sec.setOutputRDDFromValues(transform, elems, coder); -} else { - // fake create as an input - // creates a stream with a single batch containing a single null element - // to invoke following transformations once - // to support PAssert - sec.setDStreamFromQueue(transform, - Collections. singletonList(Collections.singletonList((Void) null)), - (Coder) coder); -} +sec.setDStreamFromQueue(transform, Collections.singletonList(elems), coder); } }; }
[GitHub] incubator-beam pull request: [BEAM-53] PubsubUnboundedSource
GitHub user mshields822 opened a pull request: https://github.com/apache/incubator-beam/pull/332 [BEAM-53] PubsubUnboundedSource This adds a PubsubUnboundedSource with supporting utilities and tests. Not yet wired into PubsubIO.Read, I'll do that in next PR so we can switch to source and sink atomically. R: @dhalperi You can merge this pull request into a Git repository by running: $ git pull https://github.com/mshields822/incubator-beam pubsub Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/332.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #332 commit 280dc56fe0391394349e6768dae6a8b2812eb711 Author: Mark ShieldsDate: 2016-04-05T01:10:02Z Pub/sub unbounded source commit cdf1841716cfd036578586e26ab4d2a42ae32620 Author: Mark Shields Date: 2016-05-13T00:30:32Z Add labels. Proper unit test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request: VoidCoder doesn't get special treatme...
GitHub user amitsela opened a pull request: https://github.com/apache/incubator-beam/pull/331 VoidCoder doesn't get special treatment in create() for streaming any⦠Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- â¦more You can merge this pull request into a Git repository by running: $ git pull https://github.com/amitsela/incubator-beam PAssert-VoidCoder Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/331.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #331 commit 9d5211513c10a19d61e9c05c85967163beb61710 Author: SelaDate: 2016-05-12T23:04:06Z VoidCoder doesn't get special treatment in create() for streaming anymore --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: [BEAM-117] Window display data should be safe for null windowFn
Repository: incubator-beam Updated Branches: refs/heads/master 9f105ec17 -> a760b4c83 [BEAM-117] Window display data should be safe for null windowFn Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/66a5d0d4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/66a5d0d4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/66a5d0d4 Branch: refs/heads/master Commit: 66a5d0d4e6a7f6686f11eec9b713d2de21fdda93 Parents: 9f105ec Author: Scott WegnerAuthored: Thu May 12 11:32:30 2016 -0700 Committer: Dan Halperin Committed: Thu May 12 14:30:47 2016 -0700 -- .../org/apache/beam/sdk/transforms/windowing/Window.java | 10 ++ .../apache/beam/sdk/transforms/windowing/WindowTest.java | 11 ++- 2 files changed, 12 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66a5d0d4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index b9dd451..ab6b7f0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -599,14 +599,16 @@ public class Window { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder - .add(DisplayData.item("windowFn", windowFn.getClass())) - .include(windowFn); + + if (windowFn != null) { +builder +.add(DisplayData.item("windowFn", windowFn.getClass())) +.include(windowFn); + } if (allowedLateness != null) { builder.addIfNotDefault(DisplayData.item("allowedLateness", allowedLateness), Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); - } if (trigger != null && !(trigger instanceof DefaultTrigger)) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/66a5d0d4/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index 885f549..cd5eb2d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -272,16 +272,17 @@ public class WindowTest implements Serializable { @Test public void testDisplayDataExcludesUnspecifiedProperties() { -Window.Bound window = Window.into(new GlobalWindows()); - -DisplayData displayData = DisplayData.from(window); -assertThat(displayData, not(hasDisplayItem(hasKey(isOneOf( +Window.Bound onlyHasAccumulationMode = Window.named("foobar").discardingFiredPanes(); +assertThat(DisplayData.from(onlyHasAccumulationMode), not(hasDisplayItem(hasKey(isOneOf( +"windowFn", "trigger", "outputTimeFn", -"accumulationMode", "allowedLateness", "closingBehavior"); +Window.Bound noAccumulationMode = Window.into(new GlobalWindows()); +assertThat(DisplayData.from(noAccumulationMode), +not(hasDisplayItem(hasKey("accumulationMode"; } @Test
[GitHub] incubator-beam pull request: [BEAM-117] Window display data should...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/330 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: Closes #330
Closes #330 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a760b4c8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a760b4c8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a760b4c8 Branch: refs/heads/master Commit: a760b4c8312bbad1878038326df8a809bad0f847 Parents: 9f105ec 66a5d0d Author: Dan HalperinAuthored: Thu May 12 14:30:52 2016 -0700 Committer: Dan Halperin Committed: Thu May 12 14:30:52 2016 -0700 -- .../org/apache/beam/sdk/transforms/windowing/Window.java | 10 ++ .../apache/beam/sdk/transforms/windowing/WindowTest.java | 11 ++- 2 files changed, 12 insertions(+), 9 deletions(-) --
[jira] [Commented] (BEAM-279) Make sure we unit test bounded sessions
[ https://issues.apache.org/jira/browse/BEAM-279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15282023#comment-15282023 ] Kenneth Knowles commented on BEAM-279: -- Your final sentence is important. The best way to implement bounded sessions today is probably with a special WindowFn. More broadly, the idea of a bounded sessions is compatible with event time processing, but neither the trigger-based approach nor the WindowFn based approach actually express it. So I don't think we need to work too hard to make either one perfect (but both should work heuristically, as you say). Here is an approach that I think respects event time processing: the full unbounded session is buffered, with the ability to emit & discard bounded prefixes as they expire (thus no further data could alter it). The latency of output will be poor if allowed lateness is high. But, in fact, it is also OK to emit the ON_TIME pane for a bounded prefix; further late data merging in with it would just make the session's EOW earlier, and triggering sooner, which is allowed by monotonicity. > Make sure we unit test bounded sessions > --- > > Key: BEAM-279 > URL: https://issues.apache.org/jira/browse/BEAM-279 > Project: Beam > Issue Type: Bug >Reporter: Mark Shields > > A few customers have been using Window.into(Sessions...) and of course > quickly realize they are exposed to unbounded sessions. > We should have unit tests to confirm various combinations of > AfterPane.elementCountAtLeast and AfterProcessingTime... correctly force > sessions to be broken apart. > We should also check this all works with repeated messages with the same > timestamp (since they will create the exact same session window and can thus > see trigger state from previous sessions). > At some point we may may flow into reworking bounded sessions to be done > directly rather than via Sessions plus triggers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-279) Make sure we unit test bounded sessions
Mark Shields created BEAM-279: - Summary: Make sure we unit test bounded sessions Key: BEAM-279 URL: https://issues.apache.org/jira/browse/BEAM-279 Project: Beam Issue Type: Bug Reporter: Mark Shields A few customers have been using Window.into(Sessions...) and of course quickly realize they are exposed to unbounded sessions. We should have unit tests to confirm various combinations of AfterPane.elementCountAtLeast and AfterProcessingTime... correctly force sessions to be broken apart. We should also check this all works with repeated messages with the same timestamp (since they will create the exact same session window and can thus see trigger state from previous sessions). At some point we may may flow into reworking bounded sessions to be done directly rather than via Sessions plus triggers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request: [BEAM-117] Runners should be resilien...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/315 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: [BEAM-117] Runners should be resilient to DisplayData failure
[BEAM-117] Runners should be resilient to DisplayData failure Display data is collected from PTransforms at Pipeline construction time. Collecting display data runs user code from provided transforms and fn's. These components should be designed not to throw during pipeline construction, however we also shouldn't fail a pipeline if this code does fail. This PR adds resiliency to the DataflowPipelineTranslator, where we collect display data for the Dataflow runner, and also a RunnableOnService test to verify that all runners are resilient to display data failures. Other runners are not yet using display data, but will get this validation for free when they do. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/58fa1556 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/58fa1556 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/58fa1556 Branch: refs/heads/master Commit: 58fa15565b0feb35f2570ae9523a2a26b859eb56 Parents: d2ad0ed Author: Scott WegnerAuthored: Tue May 10 11:19:14 2016 -0700 Committer: Dan Halperin Committed: Thu May 12 11:51:21 2016 -0700 -- .../dataflow/DataflowPipelineTranslator.java| 56 - .../DataflowPipelineTranslatorTest.java | 64 +++- .../sdk/transforms/display/DisplayData.java | 4 ++ .../org/apache/beam/sdk/io/DatastoreIOTest.java | 4 +- .../sdk/transforms/display/DisplayDataTest.java | 45 +- 5 files changed, 165 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/58fa1556/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java -- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 05879d9..f58ceff 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -29,6 +29,7 @@ import static org.apache.beam.sdk.util.Structs.addString; import static org.apache.beam.sdk.util.Structs.getString; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; import org.apache.beam.runners.dataflow.DataflowPipelineRunner.GroupByKeyAndSortValuesOnly; import org.apache.beam.runners.dataflow.internal.BigQueryIOTranslator; @@ -55,6 +56,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.AppliedCombineFn; @@ -89,6 +91,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -547,7 +551,7 @@ public class DataflowPipelineTranslator { currentStep.setKind(type); steps.add(currentStep); addInput(PropertyNames.USER_NAME, getFullName(transform)); - addDisplayData(PropertyNames.DISPLAY_DATA, DisplayData.from(transform)); + addDisplayData(stepName, transform); } @Override @@ -725,9 +729,21 @@ public class DataflowPipelineTranslator { outputInfoList.add(outputInfo); } -private void addDisplayData(String name, DisplayData displayData) { +private void addDisplayData(String stepName, HasDisplayData hasDisplayData) { + DisplayData displayData; + try { +displayData = DisplayData.from(hasDisplayData); + } catch (Exception e) { +String msg = String.format("Exception thrown while collecting display data for step: %s. " ++ "Display data will be not be available for this step.", stepName); +DisplayDataException displayDataException = new DisplayDataException(msg, e); +LOG.warn(msg, displayDataException); + +displayData = displayDataException.asDisplayData(); + } + List
[1/2] incubator-beam git commit: Closes #315
Repository: incubator-beam Updated Branches: refs/heads/master d2ad0ed31 -> 9f105ec17 Closes #315 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f105ec1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f105ec1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f105ec1 Branch: refs/heads/master Commit: 9f105ec1760954fc4cd881015d74088796028875 Parents: d2ad0ed 58fa155 Author: Dan HalperinAuthored: Thu May 12 11:51:21 2016 -0700 Committer: Dan Halperin Committed: Thu May 12 11:51:21 2016 -0700 -- .../dataflow/DataflowPipelineTranslator.java| 56 - .../DataflowPipelineTranslatorTest.java | 64 +++- .../sdk/transforms/display/DisplayData.java | 4 ++ .../org/apache/beam/sdk/io/DatastoreIOTest.java | 4 +- .../sdk/transforms/display/DisplayDataTest.java | 45 +- 5 files changed, 165 insertions(+), 8 deletions(-) --
[GitHub] incubator-beam pull request: [BEAM-117] Window display data should...
GitHub user swegner opened a pull request: https://github.com/apache/incubator-beam/pull/330 [BEAM-117] Window display data should be safe for null windowFn Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/swegner/incubator-beam displaydata-windowfn Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/330.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #330 commit 37d06b9108ba589f7f7ad51abd223b16c3ddf64b Author: Scott WegnerDate: 2016-05-12T18:32:30Z Window display data should be safe for null windowFn --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[4/4] incubator-beam git commit: This closes #319
This closes #319 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d2ad0ed3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d2ad0ed3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d2ad0ed3 Branch: refs/heads/master Commit: d2ad0ed31a1bb99a4b173cc0998f8df96d6e63e3 Parents: 2fbc0ea e406949 Author: Kenneth KnowlesAuthored: Thu May 12 11:22:14 2016 -0700 Committer: Kenneth Knowles Committed: Thu May 12 11:22:14 2016 -0700 -- examples/java/pom.xml | 11 + runners/direct-java/pom.xml | 19 - .../CachedThreadPoolExecutorServiceFactory.java | 44 --- .../FixedThreadPoolExecutorServiceFactory.java | 45 .../direct/InProcessPipelineOptions.java| 4 +- sdks/java/core/pom.xml | 10 + 6 files changed, 85 insertions(+), 48 deletions(-) --
[3/4] incubator-beam git commit: Stop running RunnableOnService tests in the Core SDK
Stop running RunnableOnService tests in the Core SDK With the direct runner executing all of this category (in runners/direct-java), we maintain this test coverage without running these tests while building the Core SDK. Required to remove the legacy direct runner. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e406949e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e406949e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e406949e Branch: refs/heads/master Commit: e406949eea82c705343736d7fc685182ddd96921 Parents: 113e257 Author: Thomas GrohAuthored: Thu May 12 10:08:33 2016 -0700 Committer: Thomas Groh Committed: Thu May 12 10:10:06 2016 -0700 -- sdks/java/core/pom.xml | 10 ++ 1 file changed, 10 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e406949e/sdks/java/core/pom.xml -- diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml index 07fd0b1..aa7edb5 100644 --- a/sdks/java/core/pom.xml +++ b/sdks/java/core/pom.xml @@ -126,6 +126,16 @@ org.apache.maven.plugins +maven-surefire-plugin + + +org.apache.beam.sdk.testing.RunnableOnService + + + + + +org.apache.maven.plugins maven-dependency-plugin
[GitHub] incubator-beam pull request: [Failure demo] Change PAssert's dummy...
GitHub user kennknowles opened a pull request: https://github.com/apache/incubator-beam/pull/329 [Failure demo] Change PAssert's dummy inputs from (Void) null to integer 0 Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [x] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [x] Replace `` in the title with the actual Jira issue number, if there is one. - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/kennknowles/incubator-beam passert-voids Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/329.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #329 commit af9d20f1e7020d63e968c1c38368d8ee4cef7c03 Author: Kenneth KnowlesDate: 2016-05-06T18:19:33Z Change PAssert's dummy inputs from (Void) null to integer 0 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request: [BEAM-53] Java-only Pubsub sink for s...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/171 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: [BEAM-53] Add PubsubUnboundedSink and tests
[BEAM-53] Add PubsubUnboundedSink and tests Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/040f8f98 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/040f8f98 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/040f8f98 Branch: refs/heads/master Commit: 040f8f986bb22c8adbe29b1dbb8465bf38d9476f Parents: 123674f Author: Mark ShieldsAuthored: Tue Apr 26 18:42:00 2016 -0700 Committer: Dan Halperin Committed: Thu May 12 09:56:52 2016 -0700 -- .../apache/beam/sdk/io/PubsubUnboundedSink.java | 392 +++ .../beam/sdk/util/PubsubApiaryClient.java | 14 +- .../org/apache/beam/sdk/util/PubsubClient.java | 19 +- .../apache/beam/sdk/util/PubsubGrpcClient.java | 59 +-- .../apache/beam/sdk/util/PubsubTestClient.java | 10 + .../beam/sdk/io/PubsubUnboundedSinkTest.java| 145 +++ 6 files changed, 612 insertions(+), 27 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/040f8f98/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java new file mode 100644 index 000..6d08a70 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io; + +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.options.PubsubOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayData.Builder; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.PubsubClient; +import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.util.PubsubClient.PubsubClientFactory; +import org.apache.beam.sdk.util.PubsubClient.TopicPath; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +import com.google.common.annotations.VisibleForTesting; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import javax.annotation.Nullable; + +/** + * A PTransform which streams messages to Pubsub. + * + * The underlying implementation is just a {@link GroupByKey} followed by a {@link ParDo} which + * publishes as a side effect. (In the future we want to design and switch to a custom + * {@code UnboundedSink} implementation so as to gain access to system watermark and + * end-of-pipeline cleanup.) + * We try to
[1/2] incubator-beam git commit: Closes #171
Repository: incubator-beam Updated Branches: refs/heads/master 123674f4b -> 2fbc0ea3a Closes #171 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2fbc0ea3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2fbc0ea3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2fbc0ea3 Branch: refs/heads/master Commit: 2fbc0ea3adb63363952a6bb3d89733dd1b5cf961 Parents: 123674f 040f8f9 Author: Dan HalperinAuthored: Thu May 12 09:56:52 2016 -0700 Committer: Dan Halperin Committed: Thu May 12 09:56:52 2016 -0700 -- .../apache/beam/sdk/io/PubsubUnboundedSink.java | 392 +++ .../beam/sdk/util/PubsubApiaryClient.java | 14 +- .../org/apache/beam/sdk/util/PubsubClient.java | 19 +- .../apache/beam/sdk/util/PubsubGrpcClient.java | 59 +-- .../apache/beam/sdk/util/PubsubTestClient.java | 10 + .../beam/sdk/io/PubsubUnboundedSinkTest.java| 145 +++ 6 files changed, 612 insertions(+), 27 deletions(-) --
[GitHub] incubator-beam pull request: [BEAM-270] Support Timestamps/Windows...
GitHub user aljoscha opened a pull request: https://github.com/apache/incubator-beam/pull/328 [BEAM-270] Support Timestamps/Windows in Flink Batch This change sits on top of #291 With this we can run all of the `RunnableOnService` tests on the Flink batch runner. Almost all of our own Flink-specific IT cases fail now because they verified the older, non-complete support for the Beam model. I did not remove them, since we should discuss whether we want to keep them, or keep only those that test a specific integration with Beam. If we want to keep them they have to be fixed. For now `CombineTest.testSessionsCombineWithContext` fails. I'm inserting a pre-shuffle combine phase but with merging windows the correct, final window in which an element will reside is not yet know in the pre-shuffle combine. This means that we don't get the correct side input for those values in the `CombineFnWithContext`. To fix the failing test I can get rid of the pre-shuffle combine, this means that we have more network traffic but are correct. What are the thought on this? For GroupByKey/Combine.PerKey I'm doing a shuffle by key and then an in-memory pass over the windows. For non-merging windows this could also be changed to do a shuffle by key-and-window but then we would have to explode all windows on the send side. **This is the text from the commit in question:** [BEAM-270] Support Timestamps/Windows in Flink Batch With this change we always use WindowedValue for the underlying Flink DataSets instead of just T. This allows us to support windowing as well. This changes also a lot of other stuff enabled by the above: - Use WindowedValue throughout - Add proper translation for Window.into() - Make side inputs window aware - Make GroupByKey and Combine transformations window aware, this includes support for merging windows. GroupByKey is implemented as a Combine with a concatenating CombineFn, for simplicity This removes Flink specific transformations for things that are handled by builtin sources/sinks, among other things this: - Removes special translation for AvroIO.Read/Write and TextIO.Read/Write - Removes special support for Write.Bound, this was not working properly and is now handled by the Beam machinery that uses DoFns for this - Removes special translation for binary Co-Group, the code was still in there but was never used With this change all RunnableOnService tests run on Flink Batch. R: @mxm for Flink review R: @kennknowles, you are probably interested in how the shuffle/reduce works You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/incubator-beam flink-windowed-value-batch Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/328.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #328 commit b7335f9176e72c1dbd8f30f0b770d290e508dd9c Author: Kenneth KnowlesDate: 2016-05-02T20:11:12Z Add TestFlinkPipelineRunner to FlinkRunnerRegistrar This makes the runner available for selection by integration tests. commit 093dc3ec186bc16fd070b09c1678a87dd8f6b47e Author: Kenneth Knowles Date: 2016-05-02T21:04:20Z Configure RunnableOnService tests for Flink in batch mode Today Flink batch supports only global windows. This is a situation we intend our build to allow, eventually via JUnit category filtering. For now all the test classes that use non-global windows are excluded entirely via maven configuration. In the future, it should be on a per-test-method basis. commit 3cb997342fc6a8230afcbbb84fb1742a53ef1683 Author: Kenneth Knowles Date: 2016-05-02T21:29:30Z Add Window.Bound translator to Flink batch This adds a Window.Bound translator that allows only GlobalWindows. It is a temporary measure, but one that brings the Flink batch translator in line with the Beam model - instead of "ignoring" windows, the GBK is a perfectly valid GBK for GlobalWindows. Previously, the SDK's runner test suite would fail due to the lack of a translator - now some of them will fail due to windowing support, but others have a chance. commit 6d00cba87a16ff8f2318db87dab23cc09b7d1c20 Author: Aljoscha Krettek Date: 2016-05-06T06:26:50Z Fix Dangling Flink DataSets commit af7f8580024466dd85cca8a2c070fd1db67490d4 Author: Aljoscha Krettek Date: 2016-05-06T07:38:55Z Add hamcrest dep to
[GitHub] incubator-beam pull request: [BEAM-272][flink] remove dependency o...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/324 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: This closes #324
This closes #324 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/123674f4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/123674f4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/123674f4 Branch: refs/heads/master Commit: 123674f4b5a823cc593514a131943fbcc462ab7a Parents: 6ec9e96 50edd23 Author: Maximilian MichelsAuthored: Thu May 12 10:57:33 2016 +0200 Committer: Maximilian Michels Committed: Thu May 12 10:57:33 2016 +0200 -- runners/flink/runner/pom.xml| 10 --- .../runners/flink/FlinkPipelineOptions.java | 30 ++-- .../beam/runners/flink/FlinkPipelineRunner.java | 4 +-- 3 files changed, 28 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/123674f4/runners/flink/runner/pom.xml --
[jira] [Commented] (BEAM-242) Enable Checkstyle check for the Flink Runner
[ https://issues.apache.org/jira/browse/BEAM-242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15281330#comment-15281330 ] Maximilian Michels commented on BEAM-242: - That's fine. We are thinking about some package refactoring before the release as well. These would very likely conflict with your changes. So let us know if you experiencing any problems on the way. > Enable Checkstyle check for the Flink Runner > - > > Key: BEAM-242 > URL: https://issues.apache.org/jira/browse/BEAM-242 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Maximilian Michels >Assignee: Jean-Baptiste Onofré >Priority: Minor > > We don't have a Checkstyle check in place for the Flink Runner. I would like > to use the SDK's checkstyle rules. > We could also think about a unified Checkstyle for all Runners. -- This message was sent by Atlassian JIRA (v6.3.4#6332)