This is an automated email from the ASF dual-hosted git repository. kenn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 761240f [BEAM-6430] Fix EXCEPT. 761240f is described below commit 761240fb526f1b0a770abe1a11a6eedd9cd2c42e Author: amaliujia <amaliu...@gmail.com> AuthorDate: Mon Jan 14 15:42:06 2019 -0800 [BEAM-6430] Fix EXCEPT. --- .../sql/impl/transform/BeamSetOperatorsTransforms.java | 18 ++++++++++++++++++ .../sdk/extensions/sql/impl/rel/BeamMinusRelTest.java | 15 ++++++++++++--- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java index 581fb08..4827e2d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamSetOperatorsTransforms.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators; /** Collections of {@code PTransform} and {@code DoFn} used to perform Set operations. */ public abstract class BeamSetOperatorsTransforms { @@ -89,6 +90,8 @@ public abstract class BeamSetOperatorsTransforms { } break; case MINUS: + // Say for Row R, there are m instances on left and n instances on right, + // EXCEPT ALL outputs MAX(m - n, 0) instances of R. if (leftRows.iterator().hasNext() && !rightRows.iterator().hasNext()) { Iterator<Row> iter = leftRows.iterator(); if (all) { @@ -100,6 +103,21 @@ public abstract class BeamSetOperatorsTransforms { // only output one ctx.output(iter.next()); } + } else if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) { + int leftCount = Iterators.size(leftRows.iterator()); + int rightCount = Iterators.size(rightRows.iterator()); + + int outputCount = leftCount - rightCount; + if (outputCount > 0) { + if (all) { + while (outputCount > 0) { + outputCount--; + ctx.output(ctx.element().getKey()); + } + } else { + ctx.output(ctx.element().getKey()); + } + } } } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java index 322f8fb..5ac5bce 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMinusRelTest.java @@ -93,7 +93,7 @@ public class BeamMinusRelTest extends BaseRelTest { Schema.FieldType.INT64, "order_id", Schema.FieldType.INT32, "site_id", Schema.FieldType.DECIMAL, "price") - .addRows(4L, 4, new BigDecimal(4.0)) + .addRows(1L, 1, new BigDecimal(1.0), 4L, 4, new BigDecimal(4.0)) .getRows()); pipeline.run(); @@ -110,7 +110,7 @@ public class BeamMinusRelTest extends BaseRelTest { + "FROM ORDER_DETAILS2 "; PCollection<Row> rows = compilePipeline(sql, pipeline); - PAssert.that(rows).satisfies(new CheckSize(2)); + PAssert.that(rows).satisfies(new CheckSize(3)); PAssert.that(rows) .containsInAnyOrder( @@ -118,7 +118,16 @@ public class BeamMinusRelTest extends BaseRelTest { Schema.FieldType.INT64, "order_id", Schema.FieldType.INT32, "site_id", Schema.FieldType.DECIMAL, "price") - .addRows(4L, 4, new BigDecimal(4.0), 4L, 4, new BigDecimal(4.0)) + .addRows( + 1L, + 1, + new BigDecimal(1.0), + 4L, + 4, + new BigDecimal(4.0), + 4L, + 4, + new BigDecimal(4.0)) .getRows()); pipeline.run();