Repository: beam Updated Branches: refs/heads/master aaa5e55dc -> 3bd8a0f9f
BigQuery: swap from asSingleton to asIterable for Cleanup asIterable can be simpler for runners to implement as it does not require semantically that the PCollection being viewed contains exactly one element. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/059b351e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/059b351e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/059b351e Branch: refs/heads/master Commit: 059b351e58ab746ee699ee5d8ff746a27ec7586e Parents: aaa5e55 Author: Dan Halperin <dhalp...@google.com> Authored: Tue May 2 10:37:11 2017 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Tue May 2 13:13:52 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/059b351e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java index 75f7b93..f49c4e1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java @@ -53,9 +53,9 @@ class PassThroughThenCleanup<T> extends PTransform<PCollection<T>, PCollection<T PCollectionTuple outputs = input.apply(ParDo.of(new IdentityFn<T>()) .withOutputTags(mainOutput, TupleTagList.of(cleanupSignal))); - PCollectionView<Void> cleanupSignalView = outputs.get(cleanupSignal) + PCollectionView<Iterable<Void>> cleanupSignalView = outputs.get(cleanupSignal) .setCoder(VoidCoder.of()) - .apply(View.<Void>asSingleton().withDefaultValue(null)); + .apply(View.<Void>asIterable()); input.getPipeline() .apply("Create(CleanupOperation)", Create.of(cleanupOperation))