This is an automated email from the ASF dual-hosted git repository. apilloud pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 88875bc Add null check to fieldToAvatica new 1075ff3 Merge pull request #8375: [BEAM-7129] BeamEnumerableConverter does not handle null values for all types 88875bc is described below commit 88875bcf0407713ebf22d11761556549fd9da970 Author: Brian Hulette <bhule...@google.com> AuthorDate: Mon Apr 22 10:18:38 2019 -0700 Add null check to fieldToAvatica Also parameterizes testToEnumerable_collectNullValue to test null values of all primitive types. --- .../sql/impl/rel/BeamEnumerableConverter.java | 4 + .../sql/impl/rel/BeamEnumerableConverterTest.java | 287 ++++++++++++--------- 2 files changed, 169 insertions(+), 122 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java index 755d589..3b0e94c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java @@ -303,6 +303,10 @@ public class BeamEnumerableConverter extends ConverterImpl implements Enumerable } private static Object fieldToAvatica(Schema.FieldType type, Object beamValue) { + if (beamValue == null) { + return null; + } + switch (type.getTypeName()) { case LOGICAL_TYPE: String logicalId = type.getLogicalType().getIdentifier(); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java index 76e952e..5dfa38d 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverterTest.java @@ -30,6 +30,8 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PBegin; @@ -49,6 +51,10 @@ import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexLiteral; import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; /** Test for {@code BeamEnumerableConverter}. */ public class BeamEnumerableConverterTest { @@ -57,137 +63,174 @@ public class BeamEnumerableConverterTest { static PipelineOptions options = PipelineOptionsFactory.create(); static RelOptCluster cluster = RelOptCluster.create(new VolcanoPlanner(), rexBuilder); - @Test - public void testToEnumerable_collectSingle() { - Schema schema = Schema.builder().addInt64Field("id").build(); - RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY); - ImmutableList<ImmutableList<RexLiteral>> tuples = - ImmutableList.of(ImmutableList.of(rexBuilder.makeBigintLiteral(BigDecimal.ZERO))); - BeamRelNode node = new BeamValuesRel(cluster, type, tuples, null); - - Enumerable<Object> enumerable = BeamEnumerableConverter.toEnumerable(options, node); - Enumerator<Object> enumerator = enumerable.enumerator(); - - assertTrue(enumerator.moveNext()); - assertEquals(0L, enumerator.current()); - assertFalse(enumerator.moveNext()); - enumerator.close(); - } - - @Test - public void testToEnumerable_collectMultiple() { - Schema schema = Schema.builder().addInt64Field("id").addInt64Field("otherid").build(); - RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY); - ImmutableList<ImmutableList<RexLiteral>> tuples = - ImmutableList.of( - ImmutableList.of( - rexBuilder.makeBigintLiteral(BigDecimal.ZERO), - rexBuilder.makeBigintLiteral(BigDecimal.ONE))); - BeamRelNode node = new BeamValuesRel(cluster, type, tuples, null); - - Enumerable<Object> enumerable = BeamEnumerableConverter.toEnumerable(options, node); - Enumerator<Object> enumerator = enumerable.enumerator(); - - assertTrue(enumerator.moveNext()); - Object[] row = (Object[]) enumerator.current(); - assertEquals(2, row.length); - assertEquals(0L, row[0]); - assertEquals(1L, row[1]); - assertFalse(enumerator.moveNext()); - enumerator.close(); - } - - @Test - public void testToListRow_collectMultiple() { - Schema schema = Schema.builder().addInt64Field("id").addInt64Field("otherid").build(); - RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY); - ImmutableList<ImmutableList<RexLiteral>> tuples = - ImmutableList.of( - ImmutableList.of( - rexBuilder.makeBigintLiteral(BigDecimal.ZERO), - rexBuilder.makeBigintLiteral(BigDecimal.ONE))); - BeamRelNode node = new BeamValuesRel(cluster, type, tuples, null); - - List<Row> rowList = BeamEnumerableConverter.toRowList(options, node); - assertTrue(rowList.size() == 1); - assertEquals(Row.withSchema(schema).addValues(0L, 1L).build(), rowList.get(0)); - } - - @Test - public void testToEnumerable_collectNullValue() { - Schema schema = Schema.builder().addNullableField("id", FieldType.INT64).build(); - RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY); - ImmutableList<ImmutableList<RexLiteral>> tuples = - ImmutableList.of( - ImmutableList.of( - rexBuilder.makeNullLiteral( - CalciteUtils.toRelDataType(TYPE_FACTORY, FieldType.INT64)))); - BeamRelNode node = new BeamValuesRel(cluster, type, tuples, null); - - Enumerable<Object> enumerable = BeamEnumerableConverter.toEnumerable(options, node); - Enumerator<Object> enumerator = enumerable.enumerator(); - - assertTrue(enumerator.moveNext()); - Object row = enumerator.current(); - assertEquals(null, row); - assertFalse(enumerator.moveNext()); - enumerator.close(); - } + /** Non-parameterized tests for BeamEnumerableConverter. */ + @RunWith(JUnit4.class) + public static class SpecificTypeTests { + + @Test + public void testToEnumerable_collectSingle() { + Schema schema = Schema.builder().addInt64Field("id").build(); + RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY); + ImmutableList<ImmutableList<RexLiteral>> tuples = + ImmutableList.of(ImmutableList.of(rexBuilder.makeBigintLiteral(BigDecimal.ZERO))); + BeamRelNode node = new BeamValuesRel(cluster, type, tuples, null); + + Enumerable<Object> enumerable = BeamEnumerableConverter.toEnumerable(options, node); + Enumerator<Object> enumerator = enumerable.enumerator(); + + assertTrue(enumerator.moveNext()); + assertEquals(0L, enumerator.current()); + assertFalse(enumerator.moveNext()); + enumerator.close(); + } - private static class FakeTable extends BaseBeamTable { - public FakeTable() { - super(null); + @Test + public void testToEnumerable_collectMultiple() { + Schema schema = Schema.builder().addInt64Field("id").addInt64Field("otherid").build(); + RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY); + ImmutableList<ImmutableList<RexLiteral>> tuples = + ImmutableList.of( + ImmutableList.of( + rexBuilder.makeBigintLiteral(BigDecimal.ZERO), + rexBuilder.makeBigintLiteral(BigDecimal.ONE))); + BeamRelNode node = new BeamValuesRel(cluster, type, tuples, null); + + Enumerable<Object> enumerable = BeamEnumerableConverter.toEnumerable(options, node); + Enumerator<Object> enumerator = enumerable.enumerator(); + + assertTrue(enumerator.moveNext()); + Object[] row = (Object[]) enumerator.current(); + assertEquals(2, row.length); + assertEquals(0L, row[0]); + assertEquals(1L, row[1]); + assertFalse(enumerator.moveNext()); + enumerator.close(); } - @Override - public PCollection.IsBounded isBounded() { - return null; + @Test + public void testToListRow_collectMultiple() { + Schema schema = Schema.builder().addInt64Field("id").addInt64Field("otherid").build(); + RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY); + ImmutableList<ImmutableList<RexLiteral>> tuples = + ImmutableList.of( + ImmutableList.of( + rexBuilder.makeBigintLiteral(BigDecimal.ZERO), + rexBuilder.makeBigintLiteral(BigDecimal.ONE))); + BeamRelNode node = new BeamValuesRel(cluster, type, tuples, null); + + List<Row> rowList = BeamEnumerableConverter.toRowList(options, node); + assertTrue(rowList.size() == 1); + assertEquals(Row.withSchema(schema).addValues(0L, 1L).build(), rowList.get(0)); } - @Override - public PCollection<Row> buildIOReader(PBegin begin) { - return null; + private static class FakeTable extends BaseBeamTable { + public FakeTable() { + super(null); + } + + @Override + public PCollection.IsBounded isBounded() { + return null; + } + + @Override + public PCollection<Row> buildIOReader(PBegin begin) { + return null; + } + + @Override + public POutput buildIOWriter(PCollection<Row> input) { + input.apply( + ParDo.of( + new DoFn<Row, Void>() { + @ProcessElement + public void processElement(ProcessContext context) {} + })); + return PDone.in(input.getPipeline()); + } } - @Override - public POutput buildIOWriter(PCollection<Row> input) { - input.apply( - ParDo.of( - new DoFn<Row, Void>() { - @ProcessElement - public void processElement(ProcessContext context) {} - })); - return PDone.in(input.getPipeline()); + @Test + public void testToEnumerable_count() { + Schema schema = Schema.builder().addInt64Field("id").build(); + RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY); + ImmutableList<ImmutableList<RexLiteral>> tuples = + ImmutableList.of( + ImmutableList.of(rexBuilder.makeBigintLiteral(BigDecimal.ZERO)), + ImmutableList.of(rexBuilder.makeBigintLiteral(BigDecimal.ONE))); + BeamRelNode node = + new BeamIOSinkRel( + cluster, + RelOptTableImpl.create(null, type, ImmutableList.of(), null), + null, + new BeamValuesRel(cluster, type, tuples, null), + null, + null, + null, + false, + new FakeTable(), + null); + + Enumerable<Object> enumerable = BeamEnumerableConverter.toEnumerable(options, node); + Enumerator<Object> enumerator = enumerable.enumerator(); + + assertTrue(enumerator.moveNext()); + assertEquals(2L, enumerator.current()); + assertFalse(enumerator.moveNext()); + enumerator.close(); } } - @Test - public void testToEnumerable_count() { - Schema schema = Schema.builder().addInt64Field("id").build(); - RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY); - ImmutableList<ImmutableList<RexLiteral>> tuples = - ImmutableList.of( - ImmutableList.of(rexBuilder.makeBigintLiteral(BigDecimal.ZERO)), - ImmutableList.of(rexBuilder.makeBigintLiteral(BigDecimal.ONE))); - BeamRelNode node = - new BeamIOSinkRel( - cluster, - RelOptTableImpl.create(null, type, ImmutableList.of(), null), - null, - new BeamValuesRel(cluster, type, tuples, null), - null, - null, - null, - false, - new FakeTable(), - null); - - Enumerable<Object> enumerable = BeamEnumerableConverter.toEnumerable(options, node); - Enumerator<Object> enumerator = enumerable.enumerator(); - - assertTrue(enumerator.moveNext()); - assertEquals(2L, enumerator.current()); - assertFalse(enumerator.moveNext()); - enumerator.close(); + /** + * Tests for BeamEnumerableConverter parameterized by field type. Only primitive field types and + * BeamSQL logical types are included. + */ + @RunWith(Parameterized.class) + public static class PrimitiveTypeTests { + @Parameterized.Parameter public FieldType fieldType; + + @Parameterized.Parameters(name = "{0}") + public static Iterable<Object[]> data() { + ImmutableList.Builder<Object[]> builder = ImmutableList.builder(); + + // Add all primitive types + for (TypeName typeName : TypeName.values()) { + if (typeName.isPrimitiveType()) { + builder.add(new Object[] {FieldType.of(typeName)}); + } + } + + // Add all BeamSQL logical types + builder + .add(new Object[] {CalciteUtils.DATE}) + .add(new Object[] {CalciteUtils.CHAR}) + .add(new Object[] {CalciteUtils.TIME}) + .add(new Object[] {CalciteUtils.TIME_WITH_LOCAL_TZ}) + .add(new Object[] {CalciteUtils.TIMESTAMP}) + .add(new Object[] {CalciteUtils.TIMESTAMP_WITH_LOCAL_TZ}); + + return builder.build(); + } + + @Test + @Category(NeedsRunner.class) + public void testToEnumerable_collectNullValue() { + Schema schema = Schema.builder().addNullableField("id", fieldType).build(); + RelDataType type = CalciteUtils.toCalciteRowType(schema, TYPE_FACTORY); + ImmutableList<ImmutableList<RexLiteral>> tuples = + ImmutableList.of( + ImmutableList.of( + rexBuilder.makeNullLiteral(CalciteUtils.toRelDataType(TYPE_FACTORY, fieldType)))); + BeamRelNode node = new BeamValuesRel(cluster, type, tuples, null); + + Enumerable<Object> enumerable = BeamEnumerableConverter.toEnumerable(options, node); + Enumerator<Object> enumerator = enumerable.enumerator(); + + assertTrue(enumerator.moveNext()); + Object row = enumerator.current(); + assertEquals(null, row); + assertFalse(enumerator.moveNext()); + enumerator.close(); + } } }