[GitHub] [beam] pabloem commented on pull request #11637: Waiting for BQ Query and Export jobs for more than 5 minutes.

2020-05-09 Thread GitBox


pabloem commented on pull request #11637:
URL: https://github.com/apache/beam/pull/11637#issuecomment-626277647







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11086: [BEAM-8910] Make custom BQ source read from Avro

2020-05-09 Thread GitBox


pabloem commented on pull request #11086:
URL: https://github.com/apache/beam/pull/11086#issuecomment-626277631


   Run Python 3.7 PostCommit
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] stale[bot] commented on pull request #11019: Reducing the number of API calls to BQ table.get

2020-05-09 Thread GitBox


stale[bot] commented on pull request #11019:
URL: https://github.com/apache/beam/pull/11019#issuecomment-626265059


   This pull request has been closed due to lack of activity. If you think that 
is incorrect, or the pull request requires review, you can revive the PR at any 
time.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11634: Change TestStreamImpl to a producer/consumer pattern

2020-05-09 Thread GitBox


pabloem commented on pull request #11634:
URL: https://github.com/apache/beam/pull/11634#issuecomment-626259982


   retest this please
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11637: Waiting for BQ Query and Export jobs for more than 5 minutes.

2020-05-09 Thread GitBox


pabloem commented on pull request #11637:
URL: https://github.com/apache/beam/pull/11637#issuecomment-626259977


   retest this please
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11086: [BEAM-8910] Make custom BQ source read from Avro

2020-05-09 Thread GitBox


pabloem commented on pull request #11086:
URL: https://github.com/apache/beam/pull/11086#issuecomment-626259948


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mszb commented on a change in pull request #11210: [BEAM-8949] SpannerIO integration tests

2020-05-09 Thread GitBox


mszb commented on a change in pull request #11210:
URL: https://github.com/apache/beam/pull/11210#discussion_r422567587



##
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py
##
@@ -1008,31 +1007,30 @@ def _reset_count(self):
 self._cells = 0
 
   def process(self, element):
-mg_info = element.info
+for elem in element:

Review comment:
   Done!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mszb commented on a change in pull request #11210: [BEAM-8949] SpannerIO integration tests

2020-05-09 Thread GitBox


mszb commented on a change in pull request #11210:
URL: https://github.com/apache/beam/pull/11210#discussion_r422567566



##
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py
##
@@ -499,6 +499,7 @@ def test_batch_byte_size(
   # and each bach should contains 25 mutations.
   res = (
   p | beam.Create(mutation_group)
+  | 'combine to list' >> beam.combiners.ToList()

Review comment:
   Done!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mszb commented on pull request #11210: [BEAM-8949] SpannerIO integration tests

2020-05-09 Thread GitBox


mszb commented on pull request #11210:
URL: https://github.com/apache/beam/pull/11210#issuecomment-626257027


   @chamikaramj: i've made some changes, could you please trigger the tests! 
   -Thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mszb commented on a change in pull request #11210: [BEAM-8949] SpannerIO integration tests

2020-05-09 Thread GitBox


mszb commented on a change in pull request #11210:
URL: https://github.com/apache/beam/pull/11210#discussion_r422567474



##
File path: sdks/python/apache_beam/io/gcp/experimental/spannerio.py
##
@@ -1008,31 +1007,30 @@ def _reset_count(self):
 self._cells = 0
 
   def process(self, element):
-mg_info = element.info
+for elem in element:
+  mg_info = elem.info
+  if mg_info['byte_size'] + self._size_in_bytes > \

Review comment:
   Since i've reverted the changes from the connector, there is no need to 
create new tickets. Resolving this conversation! - Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11637: Waiting for BQ Query and Export jobs for more than 5 minutes.

2020-05-09 Thread GitBox


pabloem commented on pull request #11637:
URL: https://github.com/apache/beam/pull/11637#issuecomment-626214601


   retest this please
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11634: Change TestStreamImpl to a producer/consumer pattern

2020-05-09 Thread GitBox


pabloem commented on pull request #11634:
URL: https://github.com/apache/beam/pull/11634#issuecomment-626214641


   retest this please
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] pabloem commented on pull request #11086: [BEAM-8910] Make custom BQ source read from Avro

2020-05-09 Thread GitBox


pabloem commented on pull request #11086:
URL: https://github.com/apache/beam/pull/11086#issuecomment-626214612


   retest this please
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] darshanj commented on a change in pull request #11610: [BEAM-9825] | Implement Intersect,Union,Except transforms

2020-05-09 Thread GitBox


darshanj commented on a change in pull request #11610:
URL: https://github.com/apache/beam/pull/11610#discussion_r422465290



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SetFns.java
##
@@ -187,83 +237,112 @@
* left.apply(SetFns.unionAll(right)); // results will be 
PCollection containing: "1","1","1","2","3","4","4"
* }
*/
-  public static  SetUnionAllImpl unionAll(PCollection 
rightCollection) {
+  public static  SetImpl unionAll(PCollection rightCollection) {
 checkNotNull(rightCollection, "rightCollection argument is null");
-
-return new SetUnionAllImpl(rightCollection);
+return new SetImpl<>(rightCollection, unionAll());
   }
 
-  private static  PCollection performSetOperation(
-  PCollection leftCollection,
-  PCollection rightCollection,
-  SerializableBiFunction fn) {
-
-TupleTag leftCollectionTag = new TupleTag<>();
-TupleTag rightCollectionTag = new TupleTag<>();
-
-MapElements> elementToVoid =
-MapElements.via(
-new SimpleFunction>() {
-  @Override
-  public KV apply(T element) {
-return KV.of(element, null);
-  }
-});
-
-PCollection> left = leftCollection.apply("PrepareLeftKV", 
elementToVoid);
-PCollection> right = rightCollection.apply("PrepareRightKV", 
elementToVoid);
-
-PCollection> coGbkResults =
-KeyedPCollectionTuple.of(leftCollectionTag, left)
-.and(rightCollectionTag, right)
-.apply(CoGroupByKey.create());
-// TODO: lift combiners through the CoGBK.
-return coGbkResults.apply(
-ParDo.of(
-new DoFn, T>() {
-
-  @ProcessElement
-  public void processElement(ProcessContext c) {
-KV elementGroups = c.element();
-
-CoGbkResult value = elementGroups.getValue();
-long inFirstSize = 
Iterables.size(value.getAll(leftCollectionTag));
-long inSecondSize = 
Iterables.size(value.getAll(rightCollectionTag));
-
-T element = elementGroups.getKey();
-for (long i = 0L; i < fn.apply(inFirstSize, inSecondSize); 
i++) {
-  c.output(element);
-}
-  }
-}));
+  public static  Flatten.PCollections unionAll() {
+return Flatten.pCollections();
   }
 
   public static class SetImpl extends PTransform, 
PCollection> {
-private final PCollection rightCollection;
-private final SerializableBiFunction fn;
 
-private SetImpl(PCollection rightCollection, 
SerializableBiFunction fn) {
-  this.rightCollection = rightCollection;
-  this.fn = fn;
+private final transient PCollection right;
+private final PTransform, PCollection> 
listTransformFn;
+
+private SetImpl(
+PCollection rightCollection,
+PTransform, PCollection> listTransformFn) {
+  this.right = rightCollection;
+  this.listTransformFn = listTransformFn;
 }
 
 @Override
 public PCollection expand(PCollection leftCollection) {
-  return performSetOperation(leftCollection, rightCollection, fn)
-  .setCoder(leftCollection.getCoder());
+  return 
PCollectionList.of(leftCollection).and(right).apply(listTransformFn);
 }
   }
 
-  public static class SetUnionAllImpl extends PTransform, 
PCollection> {
-private final PCollection rightCollection;
+  public static class SetImplCollections extends 
PTransform, PCollection> {
+
+private final transient SerializableBiFunction fn;
+
+private SetImplCollections(SerializableBiFunction fn) {
+  this.fn = fn;
+}
+
+private static  PCollection performSetOperationCollectionList(
+PCollectionList inputs, SerializableBiFunction fn) {
+  List> all = inputs.getAll();
+  int size = all.size();
+  if (size == 1) {
+return inputs.get(0); // Handle only one PCollection in list. Coder is 
already specified
+  } else {
+PCollection accum = inputs.get(0);
+List> tail = all.subList(1, size);
 
-private SetUnionAllImpl(PCollection rightCollection) {
-  this.rightCollection = rightCollection;
+for (PCollection second : tail) {
+  accum = performSetOperation(accum, second, fn);

Review comment:
   renamed all methods with explicit Distinct and All as suffix





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] stale[bot] commented on pull request #9977: [BEAM-7434] [BEAM-5895] and [BEAM-5894] Fix upgrade to rabbit amqp-client 5.x

2020-05-09 Thread GitBox


stale[bot] commented on pull request #9977:
URL: https://github.com/apache/beam/pull/9977#issuecomment-626193330


   This pull request has been closed due to lack of activity. If you think that 
is incorrect, or the pull request requires review, you can revive the PR at any 
time.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] darshanj commented on a change in pull request #11610: [BEAM-9825] | Implement Intersect,Union,Except transforms

2020-05-09 Thread GitBox


darshanj commented on a change in pull request #11610:
URL: https://github.com/apache/beam/pull/11610#discussion_r422504818



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SetFns.java
##
@@ -187,83 +237,112 @@
* left.apply(SetFns.unionAll(right)); // results will be 
PCollection containing: "1","1","1","2","3","4","4"
* }
*/
-  public static  SetUnionAllImpl unionAll(PCollection 
rightCollection) {
+  public static  SetImpl unionAll(PCollection rightCollection) {
 checkNotNull(rightCollection, "rightCollection argument is null");
-
-return new SetUnionAllImpl(rightCollection);
+return new SetImpl<>(rightCollection, unionAll());
   }
 
-  private static  PCollection performSetOperation(
-  PCollection leftCollection,
-  PCollection rightCollection,
-  SerializableBiFunction fn) {
-
-TupleTag leftCollectionTag = new TupleTag<>();
-TupleTag rightCollectionTag = new TupleTag<>();
-
-MapElements> elementToVoid =
-MapElements.via(
-new SimpleFunction>() {
-  @Override
-  public KV apply(T element) {
-return KV.of(element, null);
-  }
-});
-
-PCollection> left = leftCollection.apply("PrepareLeftKV", 
elementToVoid);
-PCollection> right = rightCollection.apply("PrepareRightKV", 
elementToVoid);
-
-PCollection> coGbkResults =
-KeyedPCollectionTuple.of(leftCollectionTag, left)
-.and(rightCollectionTag, right)
-.apply(CoGroupByKey.create());
-// TODO: lift combiners through the CoGBK.
-return coGbkResults.apply(
-ParDo.of(
-new DoFn, T>() {
-
-  @ProcessElement
-  public void processElement(ProcessContext c) {
-KV elementGroups = c.element();
-
-CoGbkResult value = elementGroups.getValue();
-long inFirstSize = 
Iterables.size(value.getAll(leftCollectionTag));
-long inSecondSize = 
Iterables.size(value.getAll(rightCollectionTag));
-
-T element = elementGroups.getKey();
-for (long i = 0L; i < fn.apply(inFirstSize, inSecondSize); 
i++) {
-  c.output(element);
-}
-  }
-}));
+  public static  Flatten.PCollections unionAll() {
+return Flatten.pCollections();
   }
 
   public static class SetImpl extends PTransform, 
PCollection> {
-private final PCollection rightCollection;
-private final SerializableBiFunction fn;
 
-private SetImpl(PCollection rightCollection, 
SerializableBiFunction fn) {
-  this.rightCollection = rightCollection;
-  this.fn = fn;
+private final transient PCollection right;
+private final PTransform, PCollection> 
listTransformFn;
+
+private SetImpl(
+PCollection rightCollection,
+PTransform, PCollection> listTransformFn) {
+  this.right = rightCollection;
+  this.listTransformFn = listTransformFn;
 }
 
 @Override
 public PCollection expand(PCollection leftCollection) {
-  return performSetOperation(leftCollection, rightCollection, fn)
-  .setCoder(leftCollection.getCoder());
+  return 
PCollectionList.of(leftCollection).and(right).apply(listTransformFn);
 }
   }
 
-  public static class SetUnionAllImpl extends PTransform, 
PCollection> {
-private final PCollection rightCollection;
+  public static class SetImplCollections extends 
PTransform, PCollection> {
+
+private final transient SerializableBiFunction fn;
+
+private SetImplCollections(SerializableBiFunction fn) {
+  this.fn = fn;
+}
+
+private static  PCollection performSetOperationCollectionList(
+PCollectionList inputs, SerializableBiFunction fn) {
+  List> all = inputs.getAll();
+  int size = all.size();
+  if (size == 1) {
+return inputs.get(0); // Handle only one PCollection in list. Coder is 
already specified

Review comment:
   I have removed this check now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] darshanj commented on a change in pull request #11610: [BEAM-9825] | Implement Intersect,Union,Except transforms

2020-05-09 Thread GitBox


darshanj commented on a change in pull request #11610:
URL: https://github.com/apache/beam/pull/11610#discussion_r422465074



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SetFns.java
##
@@ -187,83 +237,112 @@
* left.apply(SetFns.unionAll(right)); // results will be 
PCollection containing: "1","1","1","2","3","4","4"
* }
*/
-  public static  SetUnionAllImpl unionAll(PCollection 
rightCollection) {
+  public static  SetImpl unionAll(PCollection rightCollection) {
 checkNotNull(rightCollection, "rightCollection argument is null");
-
-return new SetUnionAllImpl(rightCollection);
+return new SetImpl<>(rightCollection, unionAll());
   }
 
-  private static  PCollection performSetOperation(
-  PCollection leftCollection,
-  PCollection rightCollection,
-  SerializableBiFunction fn) {
-
-TupleTag leftCollectionTag = new TupleTag<>();
-TupleTag rightCollectionTag = new TupleTag<>();
-
-MapElements> elementToVoid =
-MapElements.via(
-new SimpleFunction>() {
-  @Override
-  public KV apply(T element) {
-return KV.of(element, null);
-  }
-});
-
-PCollection> left = leftCollection.apply("PrepareLeftKV", 
elementToVoid);
-PCollection> right = rightCollection.apply("PrepareRightKV", 
elementToVoid);
-
-PCollection> coGbkResults =
-KeyedPCollectionTuple.of(leftCollectionTag, left)
-.and(rightCollectionTag, right)
-.apply(CoGroupByKey.create());
-// TODO: lift combiners through the CoGBK.
-return coGbkResults.apply(
-ParDo.of(
-new DoFn, T>() {
-
-  @ProcessElement
-  public void processElement(ProcessContext c) {
-KV elementGroups = c.element();
-
-CoGbkResult value = elementGroups.getValue();
-long inFirstSize = 
Iterables.size(value.getAll(leftCollectionTag));
-long inSecondSize = 
Iterables.size(value.getAll(rightCollectionTag));
-
-T element = elementGroups.getKey();
-for (long i = 0L; i < fn.apply(inFirstSize, inSecondSize); 
i++) {
-  c.output(element);
-}
-  }
-}));
+  public static  Flatten.PCollections unionAll() {
+return Flatten.pCollections();
   }
 
   public static class SetImpl extends PTransform, 
PCollection> {
-private final PCollection rightCollection;
-private final SerializableBiFunction fn;
 
-private SetImpl(PCollection rightCollection, 
SerializableBiFunction fn) {
-  this.rightCollection = rightCollection;
-  this.fn = fn;
+private final transient PCollection right;
+private final PTransform, PCollection> 
listTransformFn;
+
+private SetImpl(
+PCollection rightCollection,
+PTransform, PCollection> listTransformFn) {
+  this.right = rightCollection;
+  this.listTransformFn = listTransformFn;
 }
 
 @Override
 public PCollection expand(PCollection leftCollection) {
-  return performSetOperation(leftCollection, rightCollection, fn)
-  .setCoder(leftCollection.getCoder());
+  return 
PCollectionList.of(leftCollection).and(right).apply(listTransformFn);
 }
   }
 
-  public static class SetUnionAllImpl extends PTransform, 
PCollection> {
-private final PCollection rightCollection;
+  public static class SetImplCollections extends 
PTransform, PCollection> {
+
+private final transient SerializableBiFunction fn;
+
+private SetImplCollections(SerializableBiFunction fn) {
+  this.fn = fn;
+}
+
+private static  PCollection performSetOperationCollectionList(
+PCollectionList inputs, SerializableBiFunction fn) {
+  List> all = inputs.getAll();
+  int size = all.size();
+  if (size == 1) {
+return inputs.get(0); // Handle only one PCollection in list. Coder is 
already specified

Review comment:
   Yes. I need to check this as i can't set coder using `setCoder` if there 
is only one element in the list.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik removed a comment on pull request #11646: [BEAM-9941] Add a BeamJava test with Flatten with different input and output Coders

2020-05-09 Thread GitBox


lukecwik removed a comment on pull request #11646:
URL: https://github.com/apache/beam/pull/11646#issuecomment-626095827


   Craig, has the fix been rolled out to Dataflow service production?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] darshanj commented on pull request #11610: [BEAM-9825] | Implement Intersect,Union,Except transforms

2020-05-09 Thread GitBox


darshanj commented on pull request #11610:
URL: https://github.com/apache/beam/pull/11610#issuecomment-626165907


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb opened a new pull request #11653: [BEAM-9935] Respect allowed split points in Python.

2020-05-09 Thread GitBox


robertwb opened a new pull request #11653:
URL: https://github.com/apache/beam/pull/11653


   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 

[GitHub] [beam] robertwb commented on pull request #11653: [BEAM-9935] Respect allowed split points in Python.

2020-05-09 Thread GitBox


robertwb commented on pull request #11653:
URL: https://github.com/apache/beam/pull/11653#issuecomment-626144465


   R: @lukecwik 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] rahul8383 commented on pull request #11609: [BEAM-9887] Throw IllegalArgumentException when building Row with logical types with Invalid input

2020-05-09 Thread GitBox


rahul8383 commented on pull request #11609:
URL: https://github.com/apache/beam/pull/11609#issuecomment-626140088


   > I'd be +1 for just dropping the padding logic. I don't think it should be 
the responsibility of the LogicalType to coerce values like this. What do you 
think @reuvenlax?
   
   @TheNeuralBit @reuvenlax I have taken care of this for `FixedBytes` and 
other standard logical types in #11581 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] rahul8383 commented on pull request #11609: [BEAM-9887] Throw IllegalArgumentException when building Row with logical types with Invalid input

2020-05-09 Thread GitBox


rahul8383 commented on pull request #11609:
URL: https://github.com/apache/beam/pull/11609#issuecomment-626139746


   If there are no comments, can we close this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] rahul8383 commented on pull request #11581: [BEAM-8307] NPE in Calcite dialect when input PCollection has logical…

2020-05-09 Thread GitBox


rahul8383 commented on pull request #11581:
URL: https://github.com/apache/beam/pull/11581#issuecomment-626139548


   R: @TheNeuralBit 
   
   I found a bug while implementing this feature and raised PR #11609 to fix 
the bug as I thought that the bug fix could be cherry-picked in 2.21.0 release.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] mwalenia commented on pull request #11566: [BEAM-9723] Add DLP integration transforms

2020-05-09 Thread GitBox


mwalenia commented on pull request #11566:
URL: https://github.com/apache/beam/pull/11566#issuecomment-626131577


   Run Java PreCommit



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb commented on pull request #11652: [BEAM-9945] Report data channel progress via a designated counter.

2020-05-09 Thread GitBox


robertwb commented on pull request #11652:
URL: https://github.com/apache/beam/pull/11652#issuecomment-626128559


   R: @lukecwik 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] robertwb opened a new pull request #11652: [BEAM-9945] Report data channel progress via a designated counter.

2020-05-09 Thread GitBox


robertwb opened a new pull request #11652:
URL: https://github.com/apache/beam/pull/11652


   This turned out to make things a lot cleaner IMHO than depending on the 
(more distant) output PCollection element counts. Also the index here is now 
the same space as that used for splitting. 
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 

[GitHub] [beam] darshanj commented on a change in pull request #11610: [BEAM-9825] | Implement Intersect,Union,Except transforms

2020-05-09 Thread GitBox


darshanj commented on a change in pull request #11610:
URL: https://github.com/apache/beam/pull/11610#discussion_r422465074



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SetFns.java
##
@@ -187,83 +237,112 @@
* left.apply(SetFns.unionAll(right)); // results will be 
PCollection containing: "1","1","1","2","3","4","4"
* }
*/
-  public static  SetUnionAllImpl unionAll(PCollection 
rightCollection) {
+  public static  SetImpl unionAll(PCollection rightCollection) {
 checkNotNull(rightCollection, "rightCollection argument is null");
-
-return new SetUnionAllImpl(rightCollection);
+return new SetImpl<>(rightCollection, unionAll());
   }
 
-  private static  PCollection performSetOperation(
-  PCollection leftCollection,
-  PCollection rightCollection,
-  SerializableBiFunction fn) {
-
-TupleTag leftCollectionTag = new TupleTag<>();
-TupleTag rightCollectionTag = new TupleTag<>();
-
-MapElements> elementToVoid =
-MapElements.via(
-new SimpleFunction>() {
-  @Override
-  public KV apply(T element) {
-return KV.of(element, null);
-  }
-});
-
-PCollection> left = leftCollection.apply("PrepareLeftKV", 
elementToVoid);
-PCollection> right = rightCollection.apply("PrepareRightKV", 
elementToVoid);
-
-PCollection> coGbkResults =
-KeyedPCollectionTuple.of(leftCollectionTag, left)
-.and(rightCollectionTag, right)
-.apply(CoGroupByKey.create());
-// TODO: lift combiners through the CoGBK.
-return coGbkResults.apply(
-ParDo.of(
-new DoFn, T>() {
-
-  @ProcessElement
-  public void processElement(ProcessContext c) {
-KV elementGroups = c.element();
-
-CoGbkResult value = elementGroups.getValue();
-long inFirstSize = 
Iterables.size(value.getAll(leftCollectionTag));
-long inSecondSize = 
Iterables.size(value.getAll(rightCollectionTag));
-
-T element = elementGroups.getKey();
-for (long i = 0L; i < fn.apply(inFirstSize, inSecondSize); 
i++) {
-  c.output(element);
-}
-  }
-}));
+  public static  Flatten.PCollections unionAll() {
+return Flatten.pCollections();
   }
 
   public static class SetImpl extends PTransform, 
PCollection> {
-private final PCollection rightCollection;
-private final SerializableBiFunction fn;
 
-private SetImpl(PCollection rightCollection, 
SerializableBiFunction fn) {
-  this.rightCollection = rightCollection;
-  this.fn = fn;
+private final transient PCollection right;
+private final PTransform, PCollection> 
listTransformFn;
+
+private SetImpl(
+PCollection rightCollection,
+PTransform, PCollection> listTransformFn) {
+  this.right = rightCollection;
+  this.listTransformFn = listTransformFn;
 }
 
 @Override
 public PCollection expand(PCollection leftCollection) {
-  return performSetOperation(leftCollection, rightCollection, fn)
-  .setCoder(leftCollection.getCoder());
+  return 
PCollectionList.of(leftCollection).and(right).apply(listTransformFn);
 }
   }
 
-  public static class SetUnionAllImpl extends PTransform, 
PCollection> {
-private final PCollection rightCollection;
+  public static class SetImplCollections extends 
PTransform, PCollection> {
+
+private final transient SerializableBiFunction fn;
+
+private SetImplCollections(SerializableBiFunction fn) {
+  this.fn = fn;
+}
+
+private static  PCollection performSetOperationCollectionList(
+PCollectionList inputs, SerializableBiFunction fn) {
+  List> all = inputs.getAll();
+  int size = all.size();
+  if (size == 1) {
+return inputs.get(0); // Handle only one PCollection in list. Coder is 
already specified

Review comment:
   Yes. I need to check this as i can't set coder using `setCoder` if there 
is only one element in the list.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] darshanj commented on a change in pull request #11610: [BEAM-9825] | Implement Intersect,Union,Except transforms

2020-05-09 Thread GitBox


darshanj commented on a change in pull request #11610:
URL: https://github.com/apache/beam/pull/11610#discussion_r422465290



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SetFns.java
##
@@ -187,83 +237,112 @@
* left.apply(SetFns.unionAll(right)); // results will be 
PCollection containing: "1","1","1","2","3","4","4"
* }
*/
-  public static  SetUnionAllImpl unionAll(PCollection 
rightCollection) {
+  public static  SetImpl unionAll(PCollection rightCollection) {
 checkNotNull(rightCollection, "rightCollection argument is null");
-
-return new SetUnionAllImpl(rightCollection);
+return new SetImpl<>(rightCollection, unionAll());
   }
 
-  private static  PCollection performSetOperation(
-  PCollection leftCollection,
-  PCollection rightCollection,
-  SerializableBiFunction fn) {
-
-TupleTag leftCollectionTag = new TupleTag<>();
-TupleTag rightCollectionTag = new TupleTag<>();
-
-MapElements> elementToVoid =
-MapElements.via(
-new SimpleFunction>() {
-  @Override
-  public KV apply(T element) {
-return KV.of(element, null);
-  }
-});
-
-PCollection> left = leftCollection.apply("PrepareLeftKV", 
elementToVoid);
-PCollection> right = rightCollection.apply("PrepareRightKV", 
elementToVoid);
-
-PCollection> coGbkResults =
-KeyedPCollectionTuple.of(leftCollectionTag, left)
-.and(rightCollectionTag, right)
-.apply(CoGroupByKey.create());
-// TODO: lift combiners through the CoGBK.
-return coGbkResults.apply(
-ParDo.of(
-new DoFn, T>() {
-
-  @ProcessElement
-  public void processElement(ProcessContext c) {
-KV elementGroups = c.element();
-
-CoGbkResult value = elementGroups.getValue();
-long inFirstSize = 
Iterables.size(value.getAll(leftCollectionTag));
-long inSecondSize = 
Iterables.size(value.getAll(rightCollectionTag));
-
-T element = elementGroups.getKey();
-for (long i = 0L; i < fn.apply(inFirstSize, inSecondSize); 
i++) {
-  c.output(element);
-}
-  }
-}));
+  public static  Flatten.PCollections unionAll() {
+return Flatten.pCollections();
   }
 
   public static class SetImpl extends PTransform, 
PCollection> {
-private final PCollection rightCollection;
-private final SerializableBiFunction fn;
 
-private SetImpl(PCollection rightCollection, 
SerializableBiFunction fn) {
-  this.rightCollection = rightCollection;
-  this.fn = fn;
+private final transient PCollection right;
+private final PTransform, PCollection> 
listTransformFn;
+
+private SetImpl(
+PCollection rightCollection,
+PTransform, PCollection> listTransformFn) {
+  this.right = rightCollection;
+  this.listTransformFn = listTransformFn;
 }
 
 @Override
 public PCollection expand(PCollection leftCollection) {
-  return performSetOperation(leftCollection, rightCollection, fn)
-  .setCoder(leftCollection.getCoder());
+  return 
PCollectionList.of(leftCollection).and(right).apply(listTransformFn);
 }
   }
 
-  public static class SetUnionAllImpl extends PTransform, 
PCollection> {
-private final PCollection rightCollection;
+  public static class SetImplCollections extends 
PTransform, PCollection> {
+
+private final transient SerializableBiFunction fn;
+
+private SetImplCollections(SerializableBiFunction fn) {
+  this.fn = fn;
+}
+
+private static  PCollection performSetOperationCollectionList(
+PCollectionList inputs, SerializableBiFunction fn) {
+  List> all = inputs.getAll();
+  int size = all.size();
+  if (size == 1) {
+return inputs.get(0); // Handle only one PCollection in list. Coder is 
already specified
+  } else {
+PCollection accum = inputs.get(0);
+List> tail = all.subList(1, size);
 
-private SetUnionAllImpl(PCollection rightCollection) {
-  this.rightCollection = rightCollection;
+for (PCollection second : tail) {
+  accum = performSetOperation(accum, second, fn);

Review comment:
   renamed all methods with explicit Distinct and All as prefix





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] darshanj commented on a change in pull request #11610: [BEAM-9825] | Implement Intersect,Union,Except transforms

2020-05-09 Thread GitBox


darshanj commented on a change in pull request #11610:
URL: https://github.com/apache/beam/pull/11610#discussion_r422465224



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SetFns.java
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+
+public class SetFns {
+
+  /**
+   * Returns a new {@code SetFns.SetImpl} transform that compute the 
intersection with provided
+   * {@code PCollection}.
+   *
+   * The argument should not be modified after this is called.
+   *
+   * The elements of the output {@link PCollection} will all distinct 
elements that present in
+   * both pipeline is constructed and provided {@link PCollection}.
+   *
+   * {@code
+   * Pipeline p = ...;
+   *
+   * PCollection left = p.apply(Create.of("1", "2", "3", "4", "5"));
+   * PCollection right = p.apply(Create.of("1", "3", "4", "6"));
+   *
+   * PCollection results =
+   * left.apply(SetFns.intersect(right));
+   * }
+   */
+  public static  SetImpl intersect(PCollection rightCollection) {
+checkNotNull(rightCollection, "rightCollection argument is null");
+SerializableBiFunction intersectFn =
+(numberOfElementsinLeft, numberOfElementsinRight) -> 
(numberOfElementsinLeft > 0 && numberOfElementsinRight > 0) ? 1L : 0L;
+return new SetImpl<>(rightCollection, intersectFn);
+  }
+
+  /**
+   * Returns a new {@code SetFns.SetImpl} transform that compute the 
intersection all with
+   * provided {@code PCollection}.
+   *
+   * The argument should not be modified after this is called.
+   *
+   * The elements of the output {@link PCollection} which will follow 
EXCEPT_ALL Semantics as
+   * follows: Given there are m elements on pipeline which is constructed 
{@link PCollection}
+   * (left) and n elements on in provided {@link PCollection} (right): - it 
will output MIN(m -
+   * n, 0) elements of left for all elements which are present in both left 
and right.
+   *
+   * {@code
+   * Pipeline p = ...;
+   *
+   * PCollection left = p.apply(Create.of("1", "2", "3", "4", "5"));
+   * PCollection right = p.apply(Create.of("1", "3", "4", "6"));
+   *
+   * PCollection results =
+   * left.apply(SetFns.intersectAll(right));
+   * }
+   */
+  public static  SetImpl intersectAll(PCollection rightCollection) {
+checkNotNull(rightCollection, "rightCollection argument is null");
+SerializableBiFunction intersectFn =
+(numberOfElementsinLeft, numberOfElementsinRight) -> 
(numberOfElementsinLeft > 0 && numberOfElementsinRight > 0) ? 
Math.min(numberOfElementsinLeft, numberOfElementsinRight) : 0L;
+return new SetImpl<>(rightCollection, intersectFn);
+  }
+
+  /**
+   * Returns a new {@code SetFns.SetImpl} transform that compute the 
difference (except) with
+   * provided {@code PCollection}.
+   *
+   * The argument should not be modified after this is called.
+   *
+   * The elements of the output {@link PCollection} will all distinct 
elements that present in
+   * pipeline is constructed {@link PCollection} but not present in 
provided {@link
+   * PCollection}.
+   *
+   * {@code
+   * Pipeline p = ...;
+   *
+   * PCollection left = p.apply(Create.of("1", "2", "3", "4", "5"));
+   * PCollection right = p.apply(Create.of("1", "3", "4", "6"));
+   *
+   * PCollection results =
+   * left.apply(SetFns.except(right));
+   * }
+   */
+  public static  SetImpl except(PCollection rightCollection) {
+checkNotNull(rightCollection, "rightCollection argument is null");
+SerializableBiFunction exceptFn =
+(numberOfElementsinLeft, numberOfElementsinRight) -> 
numberOfElementsinLeft > 0 && numberOfElementsinRight == 0 ? 1L 

[GitHub] [beam] darshanj commented on a change in pull request #11610: [BEAM-9825] | Implement Intersect,Union,Except transforms

2020-05-09 Thread GitBox


darshanj commented on a change in pull request #11610:
URL: https://github.com/apache/beam/pull/11610#discussion_r422465074



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SetFns.java
##
@@ -187,83 +237,112 @@
* left.apply(SetFns.unionAll(right)); // results will be 
PCollection containing: "1","1","1","2","3","4","4"
* }
*/
-  public static  SetUnionAllImpl unionAll(PCollection 
rightCollection) {
+  public static  SetImpl unionAll(PCollection rightCollection) {
 checkNotNull(rightCollection, "rightCollection argument is null");
-
-return new SetUnionAllImpl(rightCollection);
+return new SetImpl<>(rightCollection, unionAll());
   }
 
-  private static  PCollection performSetOperation(
-  PCollection leftCollection,
-  PCollection rightCollection,
-  SerializableBiFunction fn) {
-
-TupleTag leftCollectionTag = new TupleTag<>();
-TupleTag rightCollectionTag = new TupleTag<>();
-
-MapElements> elementToVoid =
-MapElements.via(
-new SimpleFunction>() {
-  @Override
-  public KV apply(T element) {
-return KV.of(element, null);
-  }
-});
-
-PCollection> left = leftCollection.apply("PrepareLeftKV", 
elementToVoid);
-PCollection> right = rightCollection.apply("PrepareRightKV", 
elementToVoid);
-
-PCollection> coGbkResults =
-KeyedPCollectionTuple.of(leftCollectionTag, left)
-.and(rightCollectionTag, right)
-.apply(CoGroupByKey.create());
-// TODO: lift combiners through the CoGBK.
-return coGbkResults.apply(
-ParDo.of(
-new DoFn, T>() {
-
-  @ProcessElement
-  public void processElement(ProcessContext c) {
-KV elementGroups = c.element();
-
-CoGbkResult value = elementGroups.getValue();
-long inFirstSize = 
Iterables.size(value.getAll(leftCollectionTag));
-long inSecondSize = 
Iterables.size(value.getAll(rightCollectionTag));
-
-T element = elementGroups.getKey();
-for (long i = 0L; i < fn.apply(inFirstSize, inSecondSize); 
i++) {
-  c.output(element);
-}
-  }
-}));
+  public static  Flatten.PCollections unionAll() {
+return Flatten.pCollections();
   }
 
   public static class SetImpl extends PTransform, 
PCollection> {
-private final PCollection rightCollection;
-private final SerializableBiFunction fn;
 
-private SetImpl(PCollection rightCollection, 
SerializableBiFunction fn) {
-  this.rightCollection = rightCollection;
-  this.fn = fn;
+private final transient PCollection right;
+private final PTransform, PCollection> 
listTransformFn;
+
+private SetImpl(
+PCollection rightCollection,
+PTransform, PCollection> listTransformFn) {
+  this.right = rightCollection;
+  this.listTransformFn = listTransformFn;
 }
 
 @Override
 public PCollection expand(PCollection leftCollection) {
-  return performSetOperation(leftCollection, rightCollection, fn)
-  .setCoder(leftCollection.getCoder());
+  return 
PCollectionList.of(leftCollection).and(right).apply(listTransformFn);
 }
   }
 
-  public static class SetUnionAllImpl extends PTransform, 
PCollection> {
-private final PCollection rightCollection;
+  public static class SetImplCollections extends 
PTransform, PCollection> {
+
+private final transient SerializableBiFunction fn;
+
+private SetImplCollections(SerializableBiFunction fn) {
+  this.fn = fn;
+}
+
+private static  PCollection performSetOperationCollectionList(
+PCollectionList inputs, SerializableBiFunction fn) {
+  List> all = inputs.getAll();
+  int size = all.size();
+  if (size == 1) {
+return inputs.get(0); // Handle only one PCollection in list. Coder is 
already specified

Review comment:
   Yes. I need to check this as i can't bindly call `setCoder` if there is 
only one element in the list.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] darshanj commented on a change in pull request #11610: [BEAM-9825] | Implement Intersect,Union,Except transforms

2020-05-09 Thread GitBox


darshanj commented on a change in pull request #11610:
URL: https://github.com/apache/beam/pull/11610#discussion_r422464890



##
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SetFns.java
##
@@ -187,83 +237,112 @@
* left.apply(SetFns.unionAll(right)); // results will be 
PCollection containing: "1","1","1","2","3","4","4"
* }
*/
-  public static  SetUnionAllImpl unionAll(PCollection 
rightCollection) {
+  public static  SetImpl unionAll(PCollection rightCollection) {
 checkNotNull(rightCollection, "rightCollection argument is null");
-
-return new SetUnionAllImpl(rightCollection);
+return new SetImpl<>(rightCollection, unionAll());
   }
 
-  private static  PCollection performSetOperation(
-  PCollection leftCollection,
-  PCollection rightCollection,
-  SerializableBiFunction fn) {
-
-TupleTag leftCollectionTag = new TupleTag<>();
-TupleTag rightCollectionTag = new TupleTag<>();
-
-MapElements> elementToVoid =
-MapElements.via(
-new SimpleFunction>() {
-  @Override
-  public KV apply(T element) {
-return KV.of(element, null);
-  }
-});
-
-PCollection> left = leftCollection.apply("PrepareLeftKV", 
elementToVoid);
-PCollection> right = rightCollection.apply("PrepareRightKV", 
elementToVoid);
-
-PCollection> coGbkResults =
-KeyedPCollectionTuple.of(leftCollectionTag, left)
-.and(rightCollectionTag, right)
-.apply(CoGroupByKey.create());
-// TODO: lift combiners through the CoGBK.
-return coGbkResults.apply(
-ParDo.of(
-new DoFn, T>() {
-
-  @ProcessElement
-  public void processElement(ProcessContext c) {
-KV elementGroups = c.element();
-
-CoGbkResult value = elementGroups.getValue();
-long inFirstSize = 
Iterables.size(value.getAll(leftCollectionTag));
-long inSecondSize = 
Iterables.size(value.getAll(rightCollectionTag));
-
-T element = elementGroups.getKey();
-for (long i = 0L; i < fn.apply(inFirstSize, inSecondSize); 
i++) {
-  c.output(element);
-}
-  }
-}));
+  public static  Flatten.PCollections unionAll() {
+return Flatten.pCollections();
   }
 
   public static class SetImpl extends PTransform, 
PCollection> {
-private final PCollection rightCollection;
-private final SerializableBiFunction fn;
 
-private SetImpl(PCollection rightCollection, 
SerializableBiFunction fn) {
-  this.rightCollection = rightCollection;
-  this.fn = fn;
+private final transient PCollection right;
+private final PTransform, PCollection> 
listTransformFn;
+
+private SetImpl(
+PCollection rightCollection,
+PTransform, PCollection> listTransformFn) {
+  this.right = rightCollection;
+  this.listTransformFn = listTransformFn;
 }
 
 @Override
 public PCollection expand(PCollection leftCollection) {
-  return performSetOperation(leftCollection, rightCollection, fn)
-  .setCoder(leftCollection.getCoder());
+  return 
PCollectionList.of(leftCollection).and(right).apply(listTransformFn);
 }
   }
 
-  public static class SetUnionAllImpl extends PTransform, 
PCollection> {
-private final PCollection rightCollection;
+  public static class SetImplCollections extends 
PTransform, PCollection> {
+
+private final transient SerializableBiFunction fn;
+
+private SetImplCollections(SerializableBiFunction fn) {
+  this.fn = fn;
+}
+
+private static  PCollection performSetOperationCollectionList(
+PCollectionList inputs, SerializableBiFunction fn) {
+  List> all = inputs.getAll();
+  int size = all.size();
+  if (size == 1) {
+return inputs.get(0); // Handle only one PCollection in list. Coder is 
already specified
+  } else {
+PCollection accum = inputs.get(0);
+List> tail = all.subList(1, size);
 
-private SetUnionAllImpl(PCollection rightCollection) {
-  this.rightCollection = rightCollection;
+for (PCollection second : tail) {
+  accum = performSetOperation(accum, second, fn);

Review comment:
   Thanks. Thats very good idea. I have done it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org