[GitHub] [beam] pabloem commented on pull request #11637: Waiting for BQ Query and Export jobs for more than 5 minutes.
pabloem commented on pull request #11637: URL: https://github.com/apache/beam/pull/11637#issuecomment-626277647 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11086: [BEAM-8910] Make custom BQ source read from Avro
pabloem commented on pull request #11086: URL: https://github.com/apache/beam/pull/11086#issuecomment-626277631 Run Python 3.7 PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] stale[bot] commented on pull request #11019: Reducing the number of API calls to BQ table.get
stale[bot] commented on pull request #11019: URL: https://github.com/apache/beam/pull/11019#issuecomment-626265059 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11634: Change TestStreamImpl to a producer/consumer pattern
pabloem commented on pull request #11634: URL: https://github.com/apache/beam/pull/11634#issuecomment-626259982 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11637: Waiting for BQ Query and Export jobs for more than 5 minutes.
pabloem commented on pull request #11637: URL: https://github.com/apache/beam/pull/11637#issuecomment-626259977 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11086: [BEAM-8910] Make custom BQ source read from Avro
pabloem commented on pull request #11086: URL: https://github.com/apache/beam/pull/11086#issuecomment-626259948 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mszb commented on a change in pull request #11210: [BEAM-8949] SpannerIO integration tests
mszb commented on a change in pull request #11210: URL: https://github.com/apache/beam/pull/11210#discussion_r422567587 ## File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py ## @@ -1008,31 +1007,30 @@ def _reset_count(self): self._cells = 0 def process(self, element): -mg_info = element.info +for elem in element: Review comment: Done! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mszb commented on a change in pull request #11210: [BEAM-8949] SpannerIO integration tests
mszb commented on a change in pull request #11210: URL: https://github.com/apache/beam/pull/11210#discussion_r422567566 ## File path: sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py ## @@ -499,6 +499,7 @@ def test_batch_byte_size( # and each bach should contains 25 mutations. res = ( p | beam.Create(mutation_group) + | 'combine to list' >> beam.combiners.ToList() Review comment: Done! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mszb commented on pull request #11210: [BEAM-8949] SpannerIO integration tests
mszb commented on pull request #11210: URL: https://github.com/apache/beam/pull/11210#issuecomment-626257027 @chamikaramj: i've made some changes, could you please trigger the tests! -Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mszb commented on a change in pull request #11210: [BEAM-8949] SpannerIO integration tests
mszb commented on a change in pull request #11210: URL: https://github.com/apache/beam/pull/11210#discussion_r422567474 ## File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py ## @@ -1008,31 +1007,30 @@ def _reset_count(self): self._cells = 0 def process(self, element): -mg_info = element.info +for elem in element: + mg_info = elem.info + if mg_info['byte_size'] + self._size_in_bytes > \ Review comment: Since i've reverted the changes from the connector, there is no need to create new tickets. Resolving this conversation! - Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11637: Waiting for BQ Query and Export jobs for more than 5 minutes.
pabloem commented on pull request #11637: URL: https://github.com/apache/beam/pull/11637#issuecomment-626214601 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11634: Change TestStreamImpl to a producer/consumer pattern
pabloem commented on pull request #11634: URL: https://github.com/apache/beam/pull/11634#issuecomment-626214641 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11086: [BEAM-8910] Make custom BQ source read from Avro
pabloem commented on pull request #11086: URL: https://github.com/apache/beam/pull/11086#issuecomment-626214612 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] darshanj commented on a change in pull request #11610: [BEAM-9825] | Implement Intersect,Union,Except transforms
darshanj commented on a change in pull request #11610: URL: https://github.com/apache/beam/pull/11610#discussion_r422465290 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SetFns.java ## @@ -187,83 +237,112 @@ * left.apply(SetFns.unionAll(right)); // results will be PCollection containing: "1","1","1","2","3","4","4" * } */ - public static SetUnionAllImpl unionAll(PCollection rightCollection) { + public static SetImpl unionAll(PCollection rightCollection) { checkNotNull(rightCollection, "rightCollection argument is null"); - -return new SetUnionAllImpl(rightCollection); +return new SetImpl<>(rightCollection, unionAll()); } - private static PCollection performSetOperation( - PCollection leftCollection, - PCollection rightCollection, - SerializableBiFunction fn) { - -TupleTag leftCollectionTag = new TupleTag<>(); -TupleTag rightCollectionTag = new TupleTag<>(); - -MapElements> elementToVoid = -MapElements.via( -new SimpleFunction>() { - @Override - public KV apply(T element) { -return KV.of(element, null); - } -}); - -PCollection> left = leftCollection.apply("PrepareLeftKV", elementToVoid); -PCollection> right = rightCollection.apply("PrepareRightKV", elementToVoid); - -PCollection> coGbkResults = -KeyedPCollectionTuple.of(leftCollectionTag, left) -.and(rightCollectionTag, right) -.apply(CoGroupByKey.create()); -// TODO: lift combiners through the CoGBK. -return coGbkResults.apply( -ParDo.of( -new DoFn, T>() { - - @ProcessElement - public void processElement(ProcessContext c) { -KV elementGroups = c.element(); - -CoGbkResult value = elementGroups.getValue(); -long inFirstSize = Iterables.size(value.getAll(leftCollectionTag)); -long inSecondSize = Iterables.size(value.getAll(rightCollectionTag)); - -T element = elementGroups.getKey(); -for (long i = 0L; i < fn.apply(inFirstSize, inSecondSize); i++) { - c.output(element); -} - } -})); + public static Flatten.PCollections unionAll() { +return Flatten.pCollections(); } public static class SetImpl extends PTransform, PCollection> { -private final PCollection rightCollection; -private final SerializableBiFunction fn; -private SetImpl(PCollection rightCollection, SerializableBiFunction fn) { - this.rightCollection = rightCollection; - this.fn = fn; +private final transient PCollection right; +private final PTransform, PCollection> listTransformFn; + +private SetImpl( +PCollection rightCollection, +PTransform, PCollection> listTransformFn) { + this.right = rightCollection; + this.listTransformFn = listTransformFn; } @Override public PCollection expand(PCollection leftCollection) { - return performSetOperation(leftCollection, rightCollection, fn) - .setCoder(leftCollection.getCoder()); + return PCollectionList.of(leftCollection).and(right).apply(listTransformFn); } } - public static class SetUnionAllImpl extends PTransform, PCollection> { -private final PCollection rightCollection; + public static class SetImplCollections extends PTransform, PCollection> { + +private final transient SerializableBiFunction fn; + +private SetImplCollections(SerializableBiFunction fn) { + this.fn = fn; +} + +private static PCollection performSetOperationCollectionList( +PCollectionList inputs, SerializableBiFunction fn) { + List> all = inputs.getAll(); + int size = all.size(); + if (size == 1) { +return inputs.get(0); // Handle only one PCollection in list. Coder is already specified + } else { +PCollection accum = inputs.get(0); +List> tail = all.subList(1, size); -private SetUnionAllImpl(PCollection rightCollection) { - this.rightCollection = rightCollection; +for (PCollection second : tail) { + accum = performSetOperation(accum, second, fn); Review comment: renamed all methods with explicit Distinct and All as suffix This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] stale[bot] commented on pull request #9977: [BEAM-7434] [BEAM-5895] and [BEAM-5894] Fix upgrade to rabbit amqp-client 5.x
stale[bot] commented on pull request #9977: URL: https://github.com/apache/beam/pull/9977#issuecomment-626193330 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] darshanj commented on a change in pull request #11610: [BEAM-9825] | Implement Intersect,Union,Except transforms
darshanj commented on a change in pull request #11610: URL: https://github.com/apache/beam/pull/11610#discussion_r422504818 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SetFns.java ## @@ -187,83 +237,112 @@ * left.apply(SetFns.unionAll(right)); // results will be PCollection containing: "1","1","1","2","3","4","4" * } */ - public static SetUnionAllImpl unionAll(PCollection rightCollection) { + public static SetImpl unionAll(PCollection rightCollection) { checkNotNull(rightCollection, "rightCollection argument is null"); - -return new SetUnionAllImpl(rightCollection); +return new SetImpl<>(rightCollection, unionAll()); } - private static PCollection performSetOperation( - PCollection leftCollection, - PCollection rightCollection, - SerializableBiFunction fn) { - -TupleTag leftCollectionTag = new TupleTag<>(); -TupleTag rightCollectionTag = new TupleTag<>(); - -MapElements> elementToVoid = -MapElements.via( -new SimpleFunction>() { - @Override - public KV apply(T element) { -return KV.of(element, null); - } -}); - -PCollection> left = leftCollection.apply("PrepareLeftKV", elementToVoid); -PCollection> right = rightCollection.apply("PrepareRightKV", elementToVoid); - -PCollection> coGbkResults = -KeyedPCollectionTuple.of(leftCollectionTag, left) -.and(rightCollectionTag, right) -.apply(CoGroupByKey.create()); -// TODO: lift combiners through the CoGBK. -return coGbkResults.apply( -ParDo.of( -new DoFn, T>() { - - @ProcessElement - public void processElement(ProcessContext c) { -KV elementGroups = c.element(); - -CoGbkResult value = elementGroups.getValue(); -long inFirstSize = Iterables.size(value.getAll(leftCollectionTag)); -long inSecondSize = Iterables.size(value.getAll(rightCollectionTag)); - -T element = elementGroups.getKey(); -for (long i = 0L; i < fn.apply(inFirstSize, inSecondSize); i++) { - c.output(element); -} - } -})); + public static Flatten.PCollections unionAll() { +return Flatten.pCollections(); } public static class SetImpl extends PTransform, PCollection> { -private final PCollection rightCollection; -private final SerializableBiFunction fn; -private SetImpl(PCollection rightCollection, SerializableBiFunction fn) { - this.rightCollection = rightCollection; - this.fn = fn; +private final transient PCollection right; +private final PTransform, PCollection> listTransformFn; + +private SetImpl( +PCollection rightCollection, +PTransform, PCollection> listTransformFn) { + this.right = rightCollection; + this.listTransformFn = listTransformFn; } @Override public PCollection expand(PCollection leftCollection) { - return performSetOperation(leftCollection, rightCollection, fn) - .setCoder(leftCollection.getCoder()); + return PCollectionList.of(leftCollection).and(right).apply(listTransformFn); } } - public static class SetUnionAllImpl extends PTransform, PCollection> { -private final PCollection rightCollection; + public static class SetImplCollections extends PTransform, PCollection> { + +private final transient SerializableBiFunction fn; + +private SetImplCollections(SerializableBiFunction fn) { + this.fn = fn; +} + +private static PCollection performSetOperationCollectionList( +PCollectionList inputs, SerializableBiFunction fn) { + List> all = inputs.getAll(); + int size = all.size(); + if (size == 1) { +return inputs.get(0); // Handle only one PCollection in list. Coder is already specified Review comment: I have removed this check now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] darshanj commented on a change in pull request #11610: [BEAM-9825] | Implement Intersect,Union,Except transforms
darshanj commented on a change in pull request #11610: URL: https://github.com/apache/beam/pull/11610#discussion_r422465074 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SetFns.java ## @@ -187,83 +237,112 @@ * left.apply(SetFns.unionAll(right)); // results will be PCollection containing: "1","1","1","2","3","4","4" * } */ - public static SetUnionAllImpl unionAll(PCollection rightCollection) { + public static SetImpl unionAll(PCollection rightCollection) { checkNotNull(rightCollection, "rightCollection argument is null"); - -return new SetUnionAllImpl(rightCollection); +return new SetImpl<>(rightCollection, unionAll()); } - private static PCollection performSetOperation( - PCollection leftCollection, - PCollection rightCollection, - SerializableBiFunction fn) { - -TupleTag leftCollectionTag = new TupleTag<>(); -TupleTag rightCollectionTag = new TupleTag<>(); - -MapElements> elementToVoid = -MapElements.via( -new SimpleFunction>() { - @Override - public KV apply(T element) { -return KV.of(element, null); - } -}); - -PCollection> left = leftCollection.apply("PrepareLeftKV", elementToVoid); -PCollection> right = rightCollection.apply("PrepareRightKV", elementToVoid); - -PCollection> coGbkResults = -KeyedPCollectionTuple.of(leftCollectionTag, left) -.and(rightCollectionTag, right) -.apply(CoGroupByKey.create()); -// TODO: lift combiners through the CoGBK. -return coGbkResults.apply( -ParDo.of( -new DoFn, T>() { - - @ProcessElement - public void processElement(ProcessContext c) { -KV elementGroups = c.element(); - -CoGbkResult value = elementGroups.getValue(); -long inFirstSize = Iterables.size(value.getAll(leftCollectionTag)); -long inSecondSize = Iterables.size(value.getAll(rightCollectionTag)); - -T element = elementGroups.getKey(); -for (long i = 0L; i < fn.apply(inFirstSize, inSecondSize); i++) { - c.output(element); -} - } -})); + public static Flatten.PCollections unionAll() { +return Flatten.pCollections(); } public static class SetImpl extends PTransform, PCollection> { -private final PCollection rightCollection; -private final SerializableBiFunction fn; -private SetImpl(PCollection rightCollection, SerializableBiFunction fn) { - this.rightCollection = rightCollection; - this.fn = fn; +private final transient PCollection right; +private final PTransform, PCollection> listTransformFn; + +private SetImpl( +PCollection rightCollection, +PTransform, PCollection> listTransformFn) { + this.right = rightCollection; + this.listTransformFn = listTransformFn; } @Override public PCollection expand(PCollection leftCollection) { - return performSetOperation(leftCollection, rightCollection, fn) - .setCoder(leftCollection.getCoder()); + return PCollectionList.of(leftCollection).and(right).apply(listTransformFn); } } - public static class SetUnionAllImpl extends PTransform, PCollection> { -private final PCollection rightCollection; + public static class SetImplCollections extends PTransform, PCollection> { + +private final transient SerializableBiFunction fn; + +private SetImplCollections(SerializableBiFunction fn) { + this.fn = fn; +} + +private static PCollection performSetOperationCollectionList( +PCollectionList inputs, SerializableBiFunction fn) { + List> all = inputs.getAll(); + int size = all.size(); + if (size == 1) { +return inputs.get(0); // Handle only one PCollection in list. Coder is already specified Review comment: Yes. I need to check this as i can't set coder using `setCoder` if there is only one element in the list. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik removed a comment on pull request #11646: [BEAM-9941] Add a BeamJava test with Flatten with different input and output Coders
lukecwik removed a comment on pull request #11646: URL: https://github.com/apache/beam/pull/11646#issuecomment-626095827 Craig, has the fix been rolled out to Dataflow service production? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] darshanj commented on pull request #11610: [BEAM-9825] | Implement Intersect,Union,Except transforms
darshanj commented on pull request #11610: URL: https://github.com/apache/beam/pull/11610#issuecomment-626165907 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on pull request #11653: [BEAM-9935] Respect allowed split points in Python.
robertwb commented on pull request #11653: URL: https://github.com/apache/beam/pull/11653#issuecomment-626144465 R: @lukecwik This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb opened a new pull request #11653: [BEAM-9935] Respect allowed split points in Python.
robertwb opened a new pull request #11653: URL: https://github.com/apache/beam/pull/11653 Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/
[GitHub] [beam] rahul8383 commented on pull request #11609: [BEAM-9887] Throw IllegalArgumentException when building Row with logical types with Invalid input
rahul8383 commented on pull request #11609: URL: https://github.com/apache/beam/pull/11609#issuecomment-626140088 > I'd be +1 for just dropping the padding logic. I don't think it should be the responsibility of the LogicalType to coerce values like this. What do you think @reuvenlax? @TheNeuralBit @reuvenlax I have taken care of this for `FixedBytes` and other standard logical types in #11581 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rahul8383 commented on pull request #11609: [BEAM-9887] Throw IllegalArgumentException when building Row with logical types with Invalid input
rahul8383 commented on pull request #11609: URL: https://github.com/apache/beam/pull/11609#issuecomment-626139746 If there are no comments, can we close this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rahul8383 commented on pull request #11581: [BEAM-8307] NPE in Calcite dialect when input PCollection has logical…
rahul8383 commented on pull request #11581: URL: https://github.com/apache/beam/pull/11581#issuecomment-626139548 R: @TheNeuralBit I found a bug while implementing this feature and raised PR #11609 to fix the bug as I thought that the bug fix could be cherry-picked in 2.21.0 release. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mwalenia commented on pull request #11566: [BEAM-9723] Add DLP integration transforms
mwalenia commented on pull request #11566: URL: https://github.com/apache/beam/pull/11566#issuecomment-626131577 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on pull request #11652: [BEAM-9945] Report data channel progress via a designated counter.
robertwb commented on pull request #11652: URL: https://github.com/apache/beam/pull/11652#issuecomment-626128559 R: @lukecwik This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb opened a new pull request #11652: [BEAM-9945] Report data channel progress via a designated counter.
robertwb opened a new pull request #11652: URL: https://github.com/apache/beam/pull/11652 This turned out to make things a lot cleaner IMHO than depending on the (more distant) output PCollection element counts. Also the index here is now the same space as that used for splitting. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_P
[GitHub] [beam] darshanj commented on a change in pull request #11610: [BEAM-9825] | Implement Intersect,Union,Except transforms
darshanj commented on a change in pull request #11610: URL: https://github.com/apache/beam/pull/11610#discussion_r422465074 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SetFns.java ## @@ -187,83 +237,112 @@ * left.apply(SetFns.unionAll(right)); // results will be PCollection containing: "1","1","1","2","3","4","4" * } */ - public static SetUnionAllImpl unionAll(PCollection rightCollection) { + public static SetImpl unionAll(PCollection rightCollection) { checkNotNull(rightCollection, "rightCollection argument is null"); - -return new SetUnionAllImpl(rightCollection); +return new SetImpl<>(rightCollection, unionAll()); } - private static PCollection performSetOperation( - PCollection leftCollection, - PCollection rightCollection, - SerializableBiFunction fn) { - -TupleTag leftCollectionTag = new TupleTag<>(); -TupleTag rightCollectionTag = new TupleTag<>(); - -MapElements> elementToVoid = -MapElements.via( -new SimpleFunction>() { - @Override - public KV apply(T element) { -return KV.of(element, null); - } -}); - -PCollection> left = leftCollection.apply("PrepareLeftKV", elementToVoid); -PCollection> right = rightCollection.apply("PrepareRightKV", elementToVoid); - -PCollection> coGbkResults = -KeyedPCollectionTuple.of(leftCollectionTag, left) -.and(rightCollectionTag, right) -.apply(CoGroupByKey.create()); -// TODO: lift combiners through the CoGBK. -return coGbkResults.apply( -ParDo.of( -new DoFn, T>() { - - @ProcessElement - public void processElement(ProcessContext c) { -KV elementGroups = c.element(); - -CoGbkResult value = elementGroups.getValue(); -long inFirstSize = Iterables.size(value.getAll(leftCollectionTag)); -long inSecondSize = Iterables.size(value.getAll(rightCollectionTag)); - -T element = elementGroups.getKey(); -for (long i = 0L; i < fn.apply(inFirstSize, inSecondSize); i++) { - c.output(element); -} - } -})); + public static Flatten.PCollections unionAll() { +return Flatten.pCollections(); } public static class SetImpl extends PTransform, PCollection> { -private final PCollection rightCollection; -private final SerializableBiFunction fn; -private SetImpl(PCollection rightCollection, SerializableBiFunction fn) { - this.rightCollection = rightCollection; - this.fn = fn; +private final transient PCollection right; +private final PTransform, PCollection> listTransformFn; + +private SetImpl( +PCollection rightCollection, +PTransform, PCollection> listTransformFn) { + this.right = rightCollection; + this.listTransformFn = listTransformFn; } @Override public PCollection expand(PCollection leftCollection) { - return performSetOperation(leftCollection, rightCollection, fn) - .setCoder(leftCollection.getCoder()); + return PCollectionList.of(leftCollection).and(right).apply(listTransformFn); } } - public static class SetUnionAllImpl extends PTransform, PCollection> { -private final PCollection rightCollection; + public static class SetImplCollections extends PTransform, PCollection> { + +private final transient SerializableBiFunction fn; + +private SetImplCollections(SerializableBiFunction fn) { + this.fn = fn; +} + +private static PCollection performSetOperationCollectionList( +PCollectionList inputs, SerializableBiFunction fn) { + List> all = inputs.getAll(); + int size = all.size(); + if (size == 1) { +return inputs.get(0); // Handle only one PCollection in list. Coder is already specified Review comment: Yes. I need to check this as i can't set coder using `setCoder` if there is only one element in the list. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] darshanj commented on a change in pull request #11610: [BEAM-9825] | Implement Intersect,Union,Except transforms
darshanj commented on a change in pull request #11610: URL: https://github.com/apache/beam/pull/11610#discussion_r422465290 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SetFns.java ## @@ -187,83 +237,112 @@ * left.apply(SetFns.unionAll(right)); // results will be PCollection containing: "1","1","1","2","3","4","4" * } */ - public static SetUnionAllImpl unionAll(PCollection rightCollection) { + public static SetImpl unionAll(PCollection rightCollection) { checkNotNull(rightCollection, "rightCollection argument is null"); - -return new SetUnionAllImpl(rightCollection); +return new SetImpl<>(rightCollection, unionAll()); } - private static PCollection performSetOperation( - PCollection leftCollection, - PCollection rightCollection, - SerializableBiFunction fn) { - -TupleTag leftCollectionTag = new TupleTag<>(); -TupleTag rightCollectionTag = new TupleTag<>(); - -MapElements> elementToVoid = -MapElements.via( -new SimpleFunction>() { - @Override - public KV apply(T element) { -return KV.of(element, null); - } -}); - -PCollection> left = leftCollection.apply("PrepareLeftKV", elementToVoid); -PCollection> right = rightCollection.apply("PrepareRightKV", elementToVoid); - -PCollection> coGbkResults = -KeyedPCollectionTuple.of(leftCollectionTag, left) -.and(rightCollectionTag, right) -.apply(CoGroupByKey.create()); -// TODO: lift combiners through the CoGBK. -return coGbkResults.apply( -ParDo.of( -new DoFn, T>() { - - @ProcessElement - public void processElement(ProcessContext c) { -KV elementGroups = c.element(); - -CoGbkResult value = elementGroups.getValue(); -long inFirstSize = Iterables.size(value.getAll(leftCollectionTag)); -long inSecondSize = Iterables.size(value.getAll(rightCollectionTag)); - -T element = elementGroups.getKey(); -for (long i = 0L; i < fn.apply(inFirstSize, inSecondSize); i++) { - c.output(element); -} - } -})); + public static Flatten.PCollections unionAll() { +return Flatten.pCollections(); } public static class SetImpl extends PTransform, PCollection> { -private final PCollection rightCollection; -private final SerializableBiFunction fn; -private SetImpl(PCollection rightCollection, SerializableBiFunction fn) { - this.rightCollection = rightCollection; - this.fn = fn; +private final transient PCollection right; +private final PTransform, PCollection> listTransformFn; + +private SetImpl( +PCollection rightCollection, +PTransform, PCollection> listTransformFn) { + this.right = rightCollection; + this.listTransformFn = listTransformFn; } @Override public PCollection expand(PCollection leftCollection) { - return performSetOperation(leftCollection, rightCollection, fn) - .setCoder(leftCollection.getCoder()); + return PCollectionList.of(leftCollection).and(right).apply(listTransformFn); } } - public static class SetUnionAllImpl extends PTransform, PCollection> { -private final PCollection rightCollection; + public static class SetImplCollections extends PTransform, PCollection> { + +private final transient SerializableBiFunction fn; + +private SetImplCollections(SerializableBiFunction fn) { + this.fn = fn; +} + +private static PCollection performSetOperationCollectionList( +PCollectionList inputs, SerializableBiFunction fn) { + List> all = inputs.getAll(); + int size = all.size(); + if (size == 1) { +return inputs.get(0); // Handle only one PCollection in list. Coder is already specified + } else { +PCollection accum = inputs.get(0); +List> tail = all.subList(1, size); -private SetUnionAllImpl(PCollection rightCollection) { - this.rightCollection = rightCollection; +for (PCollection second : tail) { + accum = performSetOperation(accum, second, fn); Review comment: renamed all methods with explicit Distinct and All as prefix This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] darshanj commented on a change in pull request #11610: [BEAM-9825] | Implement Intersect,Union,Except transforms
darshanj commented on a change in pull request #11610: URL: https://github.com/apache/beam/pull/11610#discussion_r422465224 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SetFns.java ## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; + +public class SetFns { + + /** + * Returns a new {@code SetFns.SetImpl} transform that compute the intersection with provided + * {@code PCollection}. + * + * The argument should not be modified after this is called. + * + * The elements of the output {@link PCollection} will all distinct elements that present in + * both pipeline is constructed and provided {@link PCollection}. + * + * {@code + * Pipeline p = ...; + * + * PCollection left = p.apply(Create.of("1", "2", "3", "4", "5")); + * PCollection right = p.apply(Create.of("1", "3", "4", "6")); + * + * PCollection results = + * left.apply(SetFns.intersect(right)); + * } + */ + public static SetImpl intersect(PCollection rightCollection) { +checkNotNull(rightCollection, "rightCollection argument is null"); +SerializableBiFunction intersectFn = +(numberOfElementsinLeft, numberOfElementsinRight) -> (numberOfElementsinLeft > 0 && numberOfElementsinRight > 0) ? 1L : 0L; +return new SetImpl<>(rightCollection, intersectFn); + } + + /** + * Returns a new {@code SetFns.SetImpl} transform that compute the intersection all with + * provided {@code PCollection}. + * + * The argument should not be modified after this is called. + * + * The elements of the output {@link PCollection} which will follow EXCEPT_ALL Semantics as + * follows: Given there are m elements on pipeline which is constructed {@link PCollection} + * (left) and n elements on in provided {@link PCollection} (right): - it will output MIN(m - + * n, 0) elements of left for all elements which are present in both left and right. + * + * {@code + * Pipeline p = ...; + * + * PCollection left = p.apply(Create.of("1", "2", "3", "4", "5")); + * PCollection right = p.apply(Create.of("1", "3", "4", "6")); + * + * PCollection results = + * left.apply(SetFns.intersectAll(right)); + * } + */ + public static SetImpl intersectAll(PCollection rightCollection) { +checkNotNull(rightCollection, "rightCollection argument is null"); +SerializableBiFunction intersectFn = +(numberOfElementsinLeft, numberOfElementsinRight) -> (numberOfElementsinLeft > 0 && numberOfElementsinRight > 0) ? Math.min(numberOfElementsinLeft, numberOfElementsinRight) : 0L; +return new SetImpl<>(rightCollection, intersectFn); + } + + /** + * Returns a new {@code SetFns.SetImpl} transform that compute the difference (except) with + * provided {@code PCollection}. + * + * The argument should not be modified after this is called. + * + * The elements of the output {@link PCollection} will all distinct elements that present in + * pipeline is constructed {@link PCollection} but not present in provided {@link + * PCollection}. + * + * {@code + * Pipeline p = ...; + * + * PCollection left = p.apply(Create.of("1", "2", "3", "4", "5")); + * PCollection right = p.apply(Create.of("1", "3", "4", "6")); + * + * PCollection results = + * left.apply(SetFns.except(right)); + * } + */ + public static SetImpl except(PCollection rightCollection) { +checkNotNull(rightCollection, "rightCollection argument is null"); +SerializableBiFunction exceptFn = +(numberOfElementsinLeft, numberOfElementsinRight) -> numberOfElementsinLeft > 0 && numberOfElementsinRight == 0 ? 1L :
[GitHub] [beam] darshanj commented on a change in pull request #11610: [BEAM-9825] | Implement Intersect,Union,Except transforms
darshanj commented on a change in pull request #11610: URL: https://github.com/apache/beam/pull/11610#discussion_r422465074 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SetFns.java ## @@ -187,83 +237,112 @@ * left.apply(SetFns.unionAll(right)); // results will be PCollection containing: "1","1","1","2","3","4","4" * } */ - public static SetUnionAllImpl unionAll(PCollection rightCollection) { + public static SetImpl unionAll(PCollection rightCollection) { checkNotNull(rightCollection, "rightCollection argument is null"); - -return new SetUnionAllImpl(rightCollection); +return new SetImpl<>(rightCollection, unionAll()); } - private static PCollection performSetOperation( - PCollection leftCollection, - PCollection rightCollection, - SerializableBiFunction fn) { - -TupleTag leftCollectionTag = new TupleTag<>(); -TupleTag rightCollectionTag = new TupleTag<>(); - -MapElements> elementToVoid = -MapElements.via( -new SimpleFunction>() { - @Override - public KV apply(T element) { -return KV.of(element, null); - } -}); - -PCollection> left = leftCollection.apply("PrepareLeftKV", elementToVoid); -PCollection> right = rightCollection.apply("PrepareRightKV", elementToVoid); - -PCollection> coGbkResults = -KeyedPCollectionTuple.of(leftCollectionTag, left) -.and(rightCollectionTag, right) -.apply(CoGroupByKey.create()); -// TODO: lift combiners through the CoGBK. -return coGbkResults.apply( -ParDo.of( -new DoFn, T>() { - - @ProcessElement - public void processElement(ProcessContext c) { -KV elementGroups = c.element(); - -CoGbkResult value = elementGroups.getValue(); -long inFirstSize = Iterables.size(value.getAll(leftCollectionTag)); -long inSecondSize = Iterables.size(value.getAll(rightCollectionTag)); - -T element = elementGroups.getKey(); -for (long i = 0L; i < fn.apply(inFirstSize, inSecondSize); i++) { - c.output(element); -} - } -})); + public static Flatten.PCollections unionAll() { +return Flatten.pCollections(); } public static class SetImpl extends PTransform, PCollection> { -private final PCollection rightCollection; -private final SerializableBiFunction fn; -private SetImpl(PCollection rightCollection, SerializableBiFunction fn) { - this.rightCollection = rightCollection; - this.fn = fn; +private final transient PCollection right; +private final PTransform, PCollection> listTransformFn; + +private SetImpl( +PCollection rightCollection, +PTransform, PCollection> listTransformFn) { + this.right = rightCollection; + this.listTransformFn = listTransformFn; } @Override public PCollection expand(PCollection leftCollection) { - return performSetOperation(leftCollection, rightCollection, fn) - .setCoder(leftCollection.getCoder()); + return PCollectionList.of(leftCollection).and(right).apply(listTransformFn); } } - public static class SetUnionAllImpl extends PTransform, PCollection> { -private final PCollection rightCollection; + public static class SetImplCollections extends PTransform, PCollection> { + +private final transient SerializableBiFunction fn; + +private SetImplCollections(SerializableBiFunction fn) { + this.fn = fn; +} + +private static PCollection performSetOperationCollectionList( +PCollectionList inputs, SerializableBiFunction fn) { + List> all = inputs.getAll(); + int size = all.size(); + if (size == 1) { +return inputs.get(0); // Handle only one PCollection in list. Coder is already specified Review comment: Yes. I need to check this as i can't bindly call `setCoder` if there is only one element in the list. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] darshanj commented on a change in pull request #11610: [BEAM-9825] | Implement Intersect,Union,Except transforms
darshanj commented on a change in pull request #11610: URL: https://github.com/apache/beam/pull/11610#discussion_r422464890 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SetFns.java ## @@ -187,83 +237,112 @@ * left.apply(SetFns.unionAll(right)); // results will be PCollection containing: "1","1","1","2","3","4","4" * } */ - public static SetUnionAllImpl unionAll(PCollection rightCollection) { + public static SetImpl unionAll(PCollection rightCollection) { checkNotNull(rightCollection, "rightCollection argument is null"); - -return new SetUnionAllImpl(rightCollection); +return new SetImpl<>(rightCollection, unionAll()); } - private static PCollection performSetOperation( - PCollection leftCollection, - PCollection rightCollection, - SerializableBiFunction fn) { - -TupleTag leftCollectionTag = new TupleTag<>(); -TupleTag rightCollectionTag = new TupleTag<>(); - -MapElements> elementToVoid = -MapElements.via( -new SimpleFunction>() { - @Override - public KV apply(T element) { -return KV.of(element, null); - } -}); - -PCollection> left = leftCollection.apply("PrepareLeftKV", elementToVoid); -PCollection> right = rightCollection.apply("PrepareRightKV", elementToVoid); - -PCollection> coGbkResults = -KeyedPCollectionTuple.of(leftCollectionTag, left) -.and(rightCollectionTag, right) -.apply(CoGroupByKey.create()); -// TODO: lift combiners through the CoGBK. -return coGbkResults.apply( -ParDo.of( -new DoFn, T>() { - - @ProcessElement - public void processElement(ProcessContext c) { -KV elementGroups = c.element(); - -CoGbkResult value = elementGroups.getValue(); -long inFirstSize = Iterables.size(value.getAll(leftCollectionTag)); -long inSecondSize = Iterables.size(value.getAll(rightCollectionTag)); - -T element = elementGroups.getKey(); -for (long i = 0L; i < fn.apply(inFirstSize, inSecondSize); i++) { - c.output(element); -} - } -})); + public static Flatten.PCollections unionAll() { +return Flatten.pCollections(); } public static class SetImpl extends PTransform, PCollection> { -private final PCollection rightCollection; -private final SerializableBiFunction fn; -private SetImpl(PCollection rightCollection, SerializableBiFunction fn) { - this.rightCollection = rightCollection; - this.fn = fn; +private final transient PCollection right; +private final PTransform, PCollection> listTransformFn; + +private SetImpl( +PCollection rightCollection, +PTransform, PCollection> listTransformFn) { + this.right = rightCollection; + this.listTransformFn = listTransformFn; } @Override public PCollection expand(PCollection leftCollection) { - return performSetOperation(leftCollection, rightCollection, fn) - .setCoder(leftCollection.getCoder()); + return PCollectionList.of(leftCollection).and(right).apply(listTransformFn); } } - public static class SetUnionAllImpl extends PTransform, PCollection> { -private final PCollection rightCollection; + public static class SetImplCollections extends PTransform, PCollection> { + +private final transient SerializableBiFunction fn; + +private SetImplCollections(SerializableBiFunction fn) { + this.fn = fn; +} + +private static PCollection performSetOperationCollectionList( +PCollectionList inputs, SerializableBiFunction fn) { + List> all = inputs.getAll(); + int size = all.size(); + if (size == 1) { +return inputs.get(0); // Handle only one PCollection in list. Coder is already specified + } else { +PCollection accum = inputs.get(0); +List> tail = all.subList(1, size); -private SetUnionAllImpl(PCollection rightCollection) { - this.rightCollection = rightCollection; +for (PCollection second : tail) { + accum = performSetOperation(accum, second, fn); Review comment: Thanks. Thats very good idea. I have done it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org