http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java deleted file mode 100644 index 5d5d4fc..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java +++ /dev/null @@ -1,453 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.schema.transform; - -import java.text.ParseException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.fun.SqlAvgAggFunction; -import org.apache.calcite.sql.fun.SqlCountAggFunction; -import org.apache.calcite.sql.fun.SqlMinMaxAggFunction; -import org.apache.calcite.sql.fun.SqlSumAggFunction; -import org.apache.calcite.sql.type.BasicSqlType; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.util.ImmutableBitSet; -import org.junit.Rule; -import org.junit.Test; - -/** - * Unit tests for {@link BeamAggregationTransforms}. - * - */ -public class BeamAggregationTransformTest extends BeamTransformBaseTest{ - - @Rule - public TestPipeline p = TestPipeline.create(); - - private List<AggregateCall> aggCalls; - - private BeamSqlRowType keyType; - private BeamSqlRowType aggPartType; - private BeamSqlRowType outputType; - - private BeamSqlRowCoder inRecordCoder; - private BeamSqlRowCoder keyCoder; - private BeamSqlRowCoder aggCoder; - private BeamSqlRowCoder outRecordCoder; - - /** - * This step equals to below query. - * <pre> - * SELECT `f_int` - * , COUNT(*) AS `size` - * , SUM(`f_long`) AS `sum1`, AVG(`f_long`) AS `avg1` - * , MAX(`f_long`) AS `max1`, MIN(`f_long`) AS `min1` - * , SUM(`f_short`) AS `sum2`, AVG(`f_short`) AS `avg2` - * , MAX(`f_short`) AS `max2`, MIN(`f_short`) AS `min2` - * , SUM(`f_byte`) AS `sum3`, AVG(`f_byte`) AS `avg3` - * , MAX(`f_byte`) AS `max3`, MIN(`f_byte`) AS `min3` - * , SUM(`f_float`) AS `sum4`, AVG(`f_float`) AS `avg4` - * , MAX(`f_float`) AS `max4`, MIN(`f_float`) AS `min4` - * , SUM(`f_double`) AS `sum5`, AVG(`f_double`) AS `avg5` - * , MAX(`f_double`) AS `max5`, MIN(`f_double`) AS `min5` - * , MAX(`f_timestamp`) AS `max7`, MIN(`f_timestamp`) AS `min7` - * ,SUM(`f_int2`) AS `sum8`, AVG(`f_int2`) AS `avg8` - * , MAX(`f_int2`) AS `max8`, MIN(`f_int2`) AS `min8` - * FROM TABLE_NAME - * GROUP BY `f_int` - * </pre> - * @throws ParseException - */ - @Test - public void testCountPerElementBasic() throws ParseException { - setupEnvironment(); - - PCollection<BeamSqlRow> input = p.apply(Create.of(inputRows)); - - //1. extract fields in group-by key part - PCollection<KV<BeamSqlRow, BeamSqlRow>> exGroupByStream = input.apply("exGroupBy", - WithKeys - .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0)))) - .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, inRecordCoder)); - - //2. apply a GroupByKey. - PCollection<KV<BeamSqlRow, Iterable<BeamSqlRow>>> groupedStream = exGroupByStream - .apply("groupBy", GroupByKey.<BeamSqlRow, BeamSqlRow>create()) - .setCoder(KvCoder.<BeamSqlRow, Iterable<BeamSqlRow>>of(keyCoder, - IterableCoder.<BeamSqlRow>of(inRecordCoder))); - - //3. run aggregation functions - PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = groupedStream.apply("aggregation", - Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>groupedValues( - new BeamAggregationTransforms.AggregationAdaptor(aggCalls, inputRowType))) - .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, aggCoder)); - - //4. flat KV to a single record - PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply("mergeRecord", - ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls, -1))); - mergedStream.setCoder(outRecordCoder); - - //assert function BeamAggregationTransform.AggregationGroupByKeyFn - PAssert.that(exGroupByStream).containsInAnyOrder(prepareResultOfAggregationGroupByKeyFn()); - - //assert BeamAggregationTransform.AggregationCombineFn - PAssert.that(aggregatedStream).containsInAnyOrder(prepareResultOfAggregationCombineFn()); - - //assert BeamAggregationTransform.MergeAggregationRecord - PAssert.that(mergedStream).containsInAnyOrder(prepareResultOfMergeAggregationRecord()); - - p.run(); -} - - private void setupEnvironment() { - prepareAggregationCalls(); - prepareTypeAndCoder(); - } - - /** - * create list of all {@link AggregateCall}. - */ - @SuppressWarnings("deprecation") - private void prepareAggregationCalls() { - //aggregations for all data type - aggCalls = new ArrayList<>(); - aggCalls.add( - new AggregateCall(new SqlCountAggFunction(), false, - Arrays.<Integer>asList(), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), - "count") - ); - aggCalls.add( - new AggregateCall(new SqlSumAggFunction( - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT)), false, - Arrays.<Integer>asList(1), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), - "sum1") - ); - aggCalls.add( - new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, - Arrays.<Integer>asList(1), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), - "avg1") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(1), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), - "max1") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(1), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), - "min1") - ); - - aggCalls.add( - new AggregateCall(new SqlSumAggFunction( - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT)), false, - Arrays.<Integer>asList(2), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT), - "sum2") - ); - aggCalls.add( - new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, - Arrays.<Integer>asList(2), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT), - "avg2") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(2), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT), - "max2") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(2), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT), - "min2") - ); - - aggCalls.add( - new AggregateCall( - new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT)), - false, - Arrays.<Integer>asList(3), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT), - "sum3") - ); - aggCalls.add( - new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, - Arrays.<Integer>asList(3), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT), - "avg3") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(3), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT), - "max3") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(3), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT), - "min3") - ); - - aggCalls.add( - new AggregateCall( - new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT)), - false, - Arrays.<Integer>asList(4), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT), - "sum4") - ); - aggCalls.add( - new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, - Arrays.<Integer>asList(4), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT), - "avg4") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(4), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT), - "max4") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(4), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT), - "min4") - ); - - aggCalls.add( - new AggregateCall( - new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE)), - false, - Arrays.<Integer>asList(5), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE), - "sum5") - ); - aggCalls.add( - new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, - Arrays.<Integer>asList(5), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE), - "avg5") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(5), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE), - "max5") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(5), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE), - "min5") - ); - - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(7), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP), - "max7") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(7), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP), - "min7") - ); - - aggCalls.add( - new AggregateCall( - new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER)), - false, - Arrays.<Integer>asList(8), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER), - "sum8") - ); - aggCalls.add( - new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, - Arrays.<Integer>asList(8), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER), - "avg8") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(8), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER), - "max8") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(8), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER), - "min8") - ); - } - - /** - * Coders used in aggregation steps. - */ - private void prepareTypeAndCoder() { - inRecordCoder = new BeamSqlRowCoder(inputRowType); - - keyType = initTypeOfSqlRow(Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER))); - keyCoder = new BeamSqlRowCoder(keyType); - - aggPartType = initTypeOfSqlRow( - Arrays.asList(KV.of("count", SqlTypeName.BIGINT), - - KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT), - KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT), - - KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT), - KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT), - - KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT), - KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT), - - KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT), - KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT), - - KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE), - KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE), - - KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP), - - KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER), - KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER) - )); - aggCoder = new BeamSqlRowCoder(aggPartType); - - outputType = prepareFinalRowType(); - outRecordCoder = new BeamSqlRowCoder(outputType); - } - - /** - * expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}. - */ - private List<KV<BeamSqlRow, BeamSqlRow>> prepareResultOfAggregationGroupByKeyFn() { - return Arrays.asList( - KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))), - inputRows.get(0)), - KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))), - inputRows.get(1)), - KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))), - inputRows.get(2)), - KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))), - inputRows.get(3))); - } - - /** - * expected results after {@link BeamAggregationTransforms.AggregationCombineFn}. - */ - private List<KV<BeamSqlRow, BeamSqlRow>> prepareResultOfAggregationCombineFn() - throws ParseException { - return Arrays.asList( - KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))), - new BeamSqlRow(aggPartType, Arrays.<Object>asList( - 4L, - 10000L, 2500L, 4000L, 1000L, - (short) 10, (short) 2, (short) 4, (short) 1, - (byte) 10, (byte) 2, (byte) 4, (byte) 1, - 10.0F, 2.5F, 4.0F, 1.0F, - 10.0, 2.5, 4.0, 1.0, - format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"), - 10, 2, 4, 1 - ))) - ); - } - - /** - * Row type of final output row. - */ - private BeamSqlRowType prepareFinalRowType() { - FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder(); - List<KV<String, SqlTypeName>> columnMetadata = - Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT), - - KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT), - KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT), - - KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT), - KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT), - - KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT), - KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT), - - KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT), - KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT), - - KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE), - KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE), - - KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP), - - KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER), - KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER) - ); - for (KV<String, SqlTypeName> cm : columnMetadata) { - builder.add(cm.getKey(), cm.getValue()); - } - return CalciteUtils.toBeamRowType(builder.build()); - } - - /** - * expected results after {@link BeamAggregationTransforms.MergeAggregationRecord}. - */ - private BeamSqlRow prepareResultOfMergeAggregationRecord() throws ParseException { - return new BeamSqlRow(outputType, Arrays.<Object>asList( - 1, 4L, - 10000L, 2500L, 4000L, 1000L, - (short) 10, (short) 2, (short) 4, (short) 1, - (byte) 10, (byte) 2, (byte) 4, (byte) 1, - 10.0F, 2.5F, 4.0F, 1.0F, - 10.0, 2.5, 4.0, 1.0, - format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"), - 10, 2, 4, 1 - )); - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java deleted file mode 100644 index 4045bc8..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.schema.transform; - -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.values.KV; -import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.BeforeClass; - -/** - * shared methods to test PTransforms which execute Beam SQL steps. - * - */ -public class BeamTransformBaseTest { - public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - - public static BeamSqlRowType inputRowType; - public static List<BeamSqlRow> inputRows; - - @BeforeClass - public static void prepareInput() throws NumberFormatException, ParseException{ - List<KV<String, SqlTypeName>> columnMetadata = Arrays.asList( - KV.of("f_int", SqlTypeName.INTEGER), KV.of("f_long", SqlTypeName.BIGINT), - KV.of("f_short", SqlTypeName.SMALLINT), KV.of("f_byte", SqlTypeName.TINYINT), - KV.of("f_float", SqlTypeName.FLOAT), KV.of("f_double", SqlTypeName.DOUBLE), - KV.of("f_string", SqlTypeName.VARCHAR), KV.of("f_timestamp", SqlTypeName.TIMESTAMP), - KV.of("f_int2", SqlTypeName.INTEGER) - ); - inputRowType = initTypeOfSqlRow(columnMetadata); - inputRows = Arrays.asList( - initBeamSqlRow(columnMetadata, - Arrays.<Object>asList(1, 1000L, Short.valueOf("1"), Byte.valueOf("1"), 1.0F, 1.0, - "string_row1", format.parse("2017-01-01 01:01:03"), 1)), - initBeamSqlRow(columnMetadata, - Arrays.<Object>asList(1, 2000L, Short.valueOf("2"), Byte.valueOf("2"), 2.0F, 2.0, - "string_row2", format.parse("2017-01-01 01:02:03"), 2)), - initBeamSqlRow(columnMetadata, - Arrays.<Object>asList(1, 3000L, Short.valueOf("3"), Byte.valueOf("3"), 3.0F, 3.0, - "string_row3", format.parse("2017-01-01 01:03:03"), 3)), - initBeamSqlRow(columnMetadata, Arrays.<Object>asList(1, 4000L, Short.valueOf("4"), - Byte.valueOf("4"), 4.0F, 4.0, "string_row4", format.parse("2017-01-01 02:04:03"), 4))); - } - - /** - * create a {@code BeamSqlRowType} for given column metadata. - */ - public static BeamSqlRowType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){ - FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder(); - for (KV<String, SqlTypeName> cm : columnMetadata) { - builder.add(cm.getKey(), cm.getValue()); - } - return CalciteUtils.toBeamRowType(builder.build()); - } - - /** - * Create an empty row with given column metadata. - */ - public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata) { - return initBeamSqlRow(columnMetadata, Arrays.asList()); - } - - /** - * Create a row with given column metadata, and values for each column. - * - */ - public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata, - List<Object> rowValues){ - BeamSqlRowType rowType = initTypeOfSqlRow(columnMetadata); - - return new BeamSqlRow(rowType, rowValues); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java new file mode 100644 index 0000000..08678d1 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql; + +import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableSet; +import java.util.Set; +import org.apache.beam.sdk.util.ApiSurface; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Surface test for BeamSql api. + */ +@RunWith(JUnit4.class) +public class BeamSqlApiSurfaceTest { + @Test + public void testSdkApiSurface() throws Exception { + + @SuppressWarnings("unchecked") + final Set<String> allowed = + ImmutableSet.of( + "org.apache.beam", + "org.joda.time", + "org.apache.commons.csv"); + + ApiSurface surface = ApiSurface + .ofClass(BeamSqlCli.class) + .includingClass(BeamSql.class) + .includingClass(BeamSqlEnv.class) + .includingPackage("org.apache.beam.sdk.extensions.sql.schema", + getClass().getClassLoader()) + .pruningPrefix("java") + .pruningPattern("org[.]apache[.]beam[.]sdk[.]extensions[.]sql[.].*Test") + .pruningPattern("org[.]apache[.]beam[.]sdk[.]extensions[.]sql[.].*TestBase"); + + assertThat(surface, containsOnlyPackages(allowed)); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java new file mode 100644 index 0000000..e6ca18f --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql; + +import java.sql.Types; +import java.util.Arrays; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; +import org.junit.Test; + +/** + * Tests for GROUP-BY/aggregation, with global_window/fix_time_window/sliding_window/session_window + * with BOUNDED PCollection. + */ +public class BeamSqlDslAggregationTest extends BeamSqlDslBase { + /** + * GROUP-BY with single aggregation function with bounded PCollection. + */ + @Test + public void testAggregationWithoutWindowWithBounded() throws Exception { + runAggregationWithoutWindow(boundedInput1); + } + + /** + * GROUP-BY with single aggregation function with unbounded PCollection. + */ + @Test + public void testAggregationWithoutWindowWithUnbounded() throws Exception { + runAggregationWithoutWindow(unboundedInput1); + } + + private void runAggregationWithoutWindow(PCollection<BeamSqlRow> input) throws Exception { + String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2"; + + PCollection<BeamSqlRow> result = + input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql)); + + BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "size"), + Arrays.asList(Types.INTEGER, Types.BIGINT)); + + BeamSqlRow record = new BeamSqlRow(resultType); + record.addField("f_int2", 0); + record.addField("size", 4L); + + PAssert.that(result).containsInAnyOrder(record); + + pipeline.run().waitUntilFinish(); + } + + /** + * GROUP-BY with multiple aggregation functions with bounded PCollection. + */ + @Test + public void testAggregationFunctionsWithBounded() throws Exception{ + runAggregationFunctions(boundedInput1); + } + + /** + * GROUP-BY with multiple aggregation functions with unbounded PCollection. + */ + @Test + public void testAggregationFunctionsWithUnbounded() throws Exception{ + runAggregationFunctions(unboundedInput1); + } + + private void runAggregationFunctions(PCollection<BeamSqlRow> input) throws Exception{ + String sql = "select f_int2, count(*) as size, " + + "sum(f_long) as sum1, avg(f_long) as avg1, max(f_long) as max1, min(f_long) as min1," + + "sum(f_short) as sum2, avg(f_short) as avg2, max(f_short) as max2, min(f_short) as min2," + + "sum(f_byte) as sum3, avg(f_byte) as avg3, max(f_byte) as max3, min(f_byte) as min3," + + "sum(f_float) as sum4, avg(f_float) as avg4, max(f_float) as max4, min(f_float) as min4," + + "sum(f_double) as sum5, avg(f_double) as avg5, " + + "max(f_double) as max5, min(f_double) as min5," + + "max(f_timestamp) as max6, min(f_timestamp) as min6 " + + "FROM TABLE_A group by f_int2"; + + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) + .apply("testAggregationFunctions", BeamSql.query(sql)); + + BeamSqlRowType resultType = BeamSqlRowType.create( + Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", "sum2", "avg2", "max2", + "min2", "sum3", "avg3", "max3", "min3", "sum4", "avg4", "max4", "min4", "sum5", "avg5", + "max5", "min5", "max6", "min6"), + Arrays.asList(Types.INTEGER, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, + Types.BIGINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, + Types.TINYINT, Types.TINYINT, Types.TINYINT, Types.TINYINT, Types.FLOAT, Types.FLOAT, + Types.FLOAT, Types.FLOAT, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, + Types.TIMESTAMP, Types.TIMESTAMP)); + + BeamSqlRow record = new BeamSqlRow(resultType); + record.addField("f_int2", 0); + record.addField("size", 4L); + + record.addField("sum1", 10000L); + record.addField("avg1", 2500L); + record.addField("max1", 4000L); + record.addField("min1", 1000L); + + record.addField("sum2", (short) 10); + record.addField("avg2", (short) 2); + record.addField("max2", (short) 4); + record.addField("min2", (short) 1); + + record.addField("sum3", (byte) 10); + record.addField("avg3", (byte) 2); + record.addField("max3", (byte) 4); + record.addField("min3", (byte) 1); + + record.addField("sum4", 10.0F); + record.addField("avg4", 2.5F); + record.addField("max4", 4.0F); + record.addField("min4", 1.0F); + + record.addField("sum5", 10.0); + record.addField("avg5", 2.5); + record.addField("max5", 4.0); + record.addField("min5", 1.0); + + record.addField("max6", FORMAT.parse("2017-01-01 02:04:03")); + record.addField("min6", FORMAT.parse("2017-01-01 01:01:03")); + + PAssert.that(result).containsInAnyOrder(record); + + pipeline.run().waitUntilFinish(); + } + + /** + * Implicit GROUP-BY with DISTINCT with bounded PCollection. + */ + @Test + public void testDistinctWithBounded() throws Exception { + runDistinct(boundedInput1); + } + + /** + * Implicit GROUP-BY with DISTINCT with unbounded PCollection. + */ + @Test + public void testDistinctWithUnbounded() throws Exception { + runDistinct(unboundedInput1); + } + + private void runDistinct(PCollection<BeamSqlRow> input) throws Exception { + String sql = "SELECT distinct f_int, f_long FROM PCOLLECTION "; + + PCollection<BeamSqlRow> result = + input.apply("testDistinct", BeamSql.simpleQuery(sql)); + + BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"), + Arrays.asList(Types.INTEGER, Types.BIGINT)); + + BeamSqlRow record1 = new BeamSqlRow(resultType); + record1.addField("f_int", 1); + record1.addField("f_long", 1000L); + + BeamSqlRow record2 = new BeamSqlRow(resultType); + record2.addField("f_int", 2); + record2.addField("f_long", 2000L); + + BeamSqlRow record3 = new BeamSqlRow(resultType); + record3.addField("f_int", 3); + record3.addField("f_long", 3000L); + + BeamSqlRow record4 = new BeamSqlRow(resultType); + record4.addField("f_int", 4); + record4.addField("f_long", 4000L); + + PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4); + + pipeline.run().waitUntilFinish(); + } + + /** + * GROUP-BY with TUMBLE window(aka fix_time_window) with bounded PCollection. + */ + @Test + public void testTumbleWindowWithBounded() throws Exception { + runTumbleWindow(boundedInput1); + } + + /** + * GROUP-BY with TUMBLE window(aka fix_time_window) with unbounded PCollection. + */ + @Test + public void testTumbleWindowWithUnbounded() throws Exception { + runTumbleWindow(unboundedInput1); + } + + private void runTumbleWindow(PCollection<BeamSqlRow> input) throws Exception { + String sql = "SELECT f_int2, COUNT(*) AS `size`," + + " TUMBLE_START(f_timestamp, INTERVAL '1' HOUR) AS `window_start`" + + " FROM TABLE_A" + + " GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)"; + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) + .apply("testTumbleWindow", BeamSql.query(sql)); + + BeamSqlRowType resultType = BeamSqlRowType.create( + Arrays.asList("f_int2", "size", "window_start"), + Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); + + BeamSqlRow record1 = new BeamSqlRow(resultType); + record1.addField("f_int2", 0); + record1.addField("size", 3L); + record1.addField("window_start", FORMAT.parse("2017-01-01 01:00:00")); + record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime())); + record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); + + BeamSqlRow record2 = new BeamSqlRow(resultType); + record2.addField("f_int2", 0); + record2.addField("size", 1L); + record2.addField("window_start", FORMAT.parse("2017-01-01 02:00:00")); + record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); + record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime())); + + PAssert.that(result).containsInAnyOrder(record1, record2); + + pipeline.run().waitUntilFinish(); + } + + /** + * GROUP-BY with HOP window(aka sliding_window) with bounded PCollection. + */ + @Test + public void testHopWindowWithBounded() throws Exception { + runHopWindow(boundedInput1); + } + + /** + * GROUP-BY with HOP window(aka sliding_window) with unbounded PCollection. + */ + @Test + public void testHopWindowWithUnbounded() throws Exception { + runHopWindow(unboundedInput1); + } + + private void runHopWindow(PCollection<BeamSqlRow> input) throws Exception { + String sql = "SELECT f_int2, COUNT(*) AS `size`," + + " HOP_START(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE) AS `window_start`" + + " FROM PCOLLECTION" + + " GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)"; + PCollection<BeamSqlRow> result = + input.apply("testHopWindow", BeamSql.simpleQuery(sql)); + + BeamSqlRowType resultType = BeamSqlRowType.create( + Arrays.asList("f_int2", "size", "window_start"), + Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); + + BeamSqlRow record1 = new BeamSqlRow(resultType); + record1.addField("f_int2", 0); + record1.addField("size", 3L); + record1.addField("window_start", FORMAT.parse("2017-01-01 00:30:00")); + record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 00:30:00").getTime())); + record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime())); + + BeamSqlRow record2 = new BeamSqlRow(resultType); + record2.addField("f_int2", 0); + record2.addField("size", 3L); + record2.addField("window_start", FORMAT.parse("2017-01-01 01:00:00")); + record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime())); + record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); + + BeamSqlRow record3 = new BeamSqlRow(resultType); + record3.addField("f_int2", 0); + record3.addField("size", 1L); + record3.addField("window_start", FORMAT.parse("2017-01-01 01:30:00")); + record3.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime())); + record3.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:30:00").getTime())); + + BeamSqlRow record4 = new BeamSqlRow(resultType); + record4.addField("f_int2", 0); + record4.addField("size", 1L); + record4.addField("window_start", FORMAT.parse("2017-01-01 02:00:00")); + record4.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); + record4.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime())); + + PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4); + + pipeline.run().waitUntilFinish(); + } + + /** + * GROUP-BY with SESSION window with bounded PCollection. + */ + @Test + public void testSessionWindowWithBounded() throws Exception { + runSessionWindow(boundedInput1); + } + + /** + * GROUP-BY with SESSION window with unbounded PCollection. + */ + @Test + public void testSessionWindowWithUnbounded() throws Exception { + runSessionWindow(unboundedInput1); + } + + private void runSessionWindow(PCollection<BeamSqlRow> input) throws Exception { + String sql = "SELECT f_int2, COUNT(*) AS `size`," + + " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`" + + " FROM TABLE_A" + + " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)"; + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) + .apply("testSessionWindow", BeamSql.query(sql)); + + BeamSqlRowType resultType = BeamSqlRowType.create( + Arrays.asList("f_int2", "size", "window_start"), + Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); + + BeamSqlRow record1 = new BeamSqlRow(resultType); + record1.addField("f_int2", 0); + record1.addField("size", 3L); + record1.addField("window_start", FORMAT.parse("2017-01-01 01:01:03")); + record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:01:03").getTime())); + record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:11:03").getTime())); + + BeamSqlRow record2 = new BeamSqlRow(resultType); + record2.addField("f_int2", 0); + record2.addField("size", 1L); + record2.addField("window_start", FORMAT.parse("2017-01-01 02:04:03")); + record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:04:03").getTime())); + record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:09:03").getTime())); + + PAssert.that(result).containsInAnyOrder(record1, record2); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testWindowOnNonTimestampField() throws Exception { + exceptions.expect(IllegalStateException.class); + exceptions.expectMessage( + "Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL HOUR>)'"); + pipeline.enableAbandonedNodeEnforcement(false); + + String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A " + + "GROUP BY f_int2, TUMBLE(f_long, INTERVAL '1' HOUR)"; + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1) + .apply("testWindowOnNonTimestampField", BeamSql.query(sql)); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testUnsupportedDistinct() throws Exception { + exceptions.expect(IllegalStateException.class); + exceptions.expectMessage("Encountered \"*\""); + pipeline.enableAbandonedNodeEnforcement(false); + + String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` FROM PCOLLECTION GROUP BY f_int2"; + + PCollection<BeamSqlRow> result = + boundedInput1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql)); + + pipeline.run().waitUntilFinish(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java new file mode 100644 index 0000000..0c1ce1c --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslBase.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql; + +import java.math.BigDecimal; +import java.sql.Types; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.ExpectedException; + +/** + * prepare input records to test {@link BeamSql}. + * + * <p>Note that, any change in these records would impact tests in this package. + * + */ +public class BeamSqlDslBase { + public static final DateFormat FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + @Rule + public ExpectedException exceptions = ExpectedException.none(); + + public static BeamSqlRowType rowTypeInTableA; + public static List<BeamSqlRow> recordsInTableA; + + //bounded PCollections + public PCollection<BeamSqlRow> boundedInput1; + public PCollection<BeamSqlRow> boundedInput2; + + //unbounded PCollections + public PCollection<BeamSqlRow> unboundedInput1; + public PCollection<BeamSqlRow> unboundedInput2; + + @BeforeClass + public static void prepareClass() throws ParseException { + rowTypeInTableA = BeamSqlRowType.create( + Arrays.asList("f_int", "f_long", "f_short", "f_byte", "f_float", "f_double", "f_string", + "f_timestamp", "f_int2", "f_decimal"), + Arrays.asList(Types.INTEGER, Types.BIGINT, Types.SMALLINT, Types.TINYINT, Types.FLOAT, + Types.DOUBLE, Types.VARCHAR, Types.TIMESTAMP, Types.INTEGER, Types.DECIMAL)); + + recordsInTableA = prepareInputRowsInTableA(); + } + + @Before + public void preparePCollections(){ + boundedInput1 = PBegin.in(pipeline).apply("boundedInput1", + Create.of(recordsInTableA).withCoder(new BeamSqlRowCoder(rowTypeInTableA))); + + boundedInput2 = PBegin.in(pipeline).apply("boundedInput2", + Create.of(recordsInTableA.get(0)).withCoder(new BeamSqlRowCoder(rowTypeInTableA))); + + unboundedInput1 = prepareUnboundedPCollection1(); + unboundedInput2 = prepareUnboundedPCollection2(); + } + + private PCollection<BeamSqlRow> prepareUnboundedPCollection1() { + TestStream.Builder<BeamSqlRow> values = TestStream + .create(new BeamSqlRowCoder(rowTypeInTableA)); + + for (BeamSqlRow row : recordsInTableA) { + values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp"))); + values = values.addElements(row); + } + + return PBegin.in(pipeline).apply("unboundedInput1", values.advanceWatermarkToInfinity()); + } + + private PCollection<BeamSqlRow> prepareUnboundedPCollection2() { + TestStream.Builder<BeamSqlRow> values = TestStream + .create(new BeamSqlRowCoder(rowTypeInTableA)); + + BeamSqlRow row = recordsInTableA.get(0); + values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp"))); + values = values.addElements(row); + + return PBegin.in(pipeline).apply("unboundedInput2", values.advanceWatermarkToInfinity()); + } + + private static List<BeamSqlRow> prepareInputRowsInTableA() throws ParseException{ + List<BeamSqlRow> rows = new ArrayList<>(); + + BeamSqlRow row1 = new BeamSqlRow(rowTypeInTableA); + row1.addField(0, 1); + row1.addField(1, 1000L); + row1.addField(2, Short.valueOf("1")); + row1.addField(3, Byte.valueOf("1")); + row1.addField(4, 1.0f); + row1.addField(5, 1.0); + row1.addField(6, "string_row1"); + row1.addField(7, FORMAT.parse("2017-01-01 01:01:03")); + row1.addField(8, 0); + row1.addField(9, new BigDecimal(1)); + rows.add(row1); + + BeamSqlRow row2 = new BeamSqlRow(rowTypeInTableA); + row2.addField(0, 2); + row2.addField(1, 2000L); + row2.addField(2, Short.valueOf("2")); + row2.addField(3, Byte.valueOf("2")); + row2.addField(4, 2.0f); + row2.addField(5, 2.0); + row2.addField(6, "string_row2"); + row2.addField(7, FORMAT.parse("2017-01-01 01:02:03")); + row2.addField(8, 0); + row2.addField(9, new BigDecimal(2)); + rows.add(row2); + + BeamSqlRow row3 = new BeamSqlRow(rowTypeInTableA); + row3.addField(0, 3); + row3.addField(1, 3000L); + row3.addField(2, Short.valueOf("3")); + row3.addField(3, Byte.valueOf("3")); + row3.addField(4, 3.0f); + row3.addField(5, 3.0); + row3.addField(6, "string_row3"); + row3.addField(7, FORMAT.parse("2017-01-01 01:06:03")); + row3.addField(8, 0); + row3.addField(9, new BigDecimal(3)); + rows.add(row3); + + BeamSqlRow row4 = new BeamSqlRow(rowTypeInTableA); + row4.addField(0, 4); + row4.addField(1, 4000L); + row4.addField(2, Short.valueOf("4")); + row4.addField(3, Byte.valueOf("4")); + row4.addField(4, 4.0f); + row4.addField(5, 4.0); + row4.addField(6, "string_row4"); + row4.addField(7, FORMAT.parse("2017-01-01 02:04:03")); + row4.addField(8, 0); + row4.addField(9, new BigDecimal(4)); + rows.add(row4); + + return rows; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java new file mode 100644 index 0000000..16b6426 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslFilterTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql; + +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.junit.Test; + +/** + * Tests for WHERE queries with BOUNDED PCollection. + */ +public class BeamSqlDslFilterTest extends BeamSqlDslBase { + /** + * single filter with bounded PCollection. + */ + @Test + public void testSingleFilterWithBounded() throws Exception { + runSingleFilter(boundedInput1); + } + + /** + * single filter with unbounded PCollection. + */ + @Test + public void testSingleFilterWithUnbounded() throws Exception { + runSingleFilter(unboundedInput1); + } + + private void runSingleFilter(PCollection<BeamSqlRow> input) throws Exception { + String sql = "SELECT * FROM PCOLLECTION WHERE f_int = 1"; + + PCollection<BeamSqlRow> result = + input.apply("testSingleFilter", BeamSql.simpleQuery(sql)); + + PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0)); + + pipeline.run().waitUntilFinish(); + } + + /** + * composite filters with bounded PCollection. + */ + @Test + public void testCompositeFilterWithBounded() throws Exception { + runCompositeFilter(boundedInput1); + } + + /** + * composite filters with unbounded PCollection. + */ + @Test + public void testCompositeFilterWithUnbounded() throws Exception { + runCompositeFilter(unboundedInput1); + } + + private void runCompositeFilter(PCollection<BeamSqlRow> input) throws Exception { + String sql = "SELECT * FROM TABLE_A" + + " WHERE f_int > 1 AND (f_long < 3000 OR f_string = 'string_row3')"; + + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) + .apply("testCompositeFilter", BeamSql.query(sql)); + + PAssert.that(result).containsInAnyOrder(recordsInTableA.get(1), recordsInTableA.get(2)); + + pipeline.run().waitUntilFinish(); + } + + /** + * nothing return with filters in bounded PCollection. + */ + @Test + public void testNoReturnFilterWithBounded() throws Exception { + runNoReturnFilter(boundedInput1); + } + + /** + * nothing return with filters in unbounded PCollection. + */ + @Test + public void testNoReturnFilterWithUnbounded() throws Exception { + runNoReturnFilter(unboundedInput1); + } + + private void runNoReturnFilter(PCollection<BeamSqlRow> input) throws Exception { + String sql = "SELECT * FROM TABLE_A WHERE f_int < 1"; + + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) + .apply("testNoReturnFilter", BeamSql.query(sql)); + + PAssert.that(result).empty(); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testFromInvalidTableName1() throws Exception { + exceptions.expect(IllegalStateException.class); + exceptions.expectMessage("Object 'TABLE_B' not found"); + pipeline.enableAbandonedNodeEnforcement(false); + + String sql = "SELECT * FROM TABLE_B WHERE f_int < 1"; + + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1) + .apply("testFromInvalidTableName1", BeamSql.query(sql)); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testFromInvalidTableName2() throws Exception { + exceptions.expect(IllegalStateException.class); + exceptions.expectMessage("Use fixed table name PCOLLECTION"); + pipeline.enableAbandonedNodeEnforcement(false); + + String sql = "SELECT * FROM PCOLLECTION_NA"; + + PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql)); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testInvalidFilter() throws Exception { + exceptions.expect(IllegalStateException.class); + exceptions.expectMessage("Column 'f_int_na' not found in any table"); + pipeline.enableAbandonedNodeEnforcement(false); + + String sql = "SELECT * FROM PCOLLECTION WHERE f_int_na = 0"; + + PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql)); + + pipeline.run().waitUntilFinish(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java new file mode 100644 index 0000000..363ab8f --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql; + +import static org.apache.beam.sdk.extensions.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS1; +import static org.apache.beam.sdk.extensions.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS2; + +import java.sql.Types; +import java.util.Arrays; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.junit.Rule; +import org.junit.Test; + +/** + * Tests for joins in queries. + */ +public class BeamSqlDslJoinTest { + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + + private static final BeamSqlRowType SOURCE_RECORD_TYPE = + BeamSqlRowType.create( + Arrays.asList( + "order_id", "site_id", "price" + ), + Arrays.asList( + Types.INTEGER, Types.INTEGER, Types.INTEGER + ) + ); + + private static final BeamSqlRowCoder SOURCE_CODER = + new BeamSqlRowCoder(SOURCE_RECORD_TYPE); + + private static final BeamSqlRowType RESULT_RECORD_TYPE = + BeamSqlRowType.create( + Arrays.asList( + "order_id", "site_id", "price", "order_id0", "site_id0", "price0" + ), + Arrays.asList( + Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.INTEGER + , Types.INTEGER, Types.INTEGER + ) + ); + + private static final BeamSqlRowCoder RESULT_CODER = + new BeamSqlRowCoder(RESULT_RECORD_TYPE); + + @Test + public void testInnerJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id" + ; + + PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder( + TestUtils.RowsBuilder.of( + RESULT_RECORD_TYPE + ).addRows( + 2, 3, 3, 1, 2, 3 + ).getRows()); + pipeline.run(); + } + + @Test + public void testLeftOuterJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " LEFT OUTER JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id" + ; + + PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder( + TestUtils.RowsBuilder.of( + RESULT_RECORD_TYPE + ).addRows( + 1, 2, 3, null, null, null, + 2, 3, 3, 1, 2, 3, + 3, 4, 5, null, null, null + ).getRows()); + pipeline.run(); + } + + @Test + public void testRightOuterJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " RIGHT OUTER JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id" + ; + + PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder( + TestUtils.RowsBuilder.of( + RESULT_RECORD_TYPE + ).addRows( + 2, 3, 3, 1, 2, 3, + null, null, null, 2, 3, 3, + null, null, null, 3, 4, 5 + ).getRows()); + pipeline.run(); + } + + @Test + public void testFullOuterJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " FULL OUTER JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id=o2.site_id AND o2.price=o1.site_id" + ; + + PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder( + TestUtils.RowsBuilder.of( + RESULT_RECORD_TYPE + ).addRows( + 2, 3, 3, 1, 2, 3, + 1, 2, 3, null, null, null, + 3, 4, 5, null, null, null, + null, null, null, 2, 3, 3, + null, null, null, 3, 4, 5 + ).getRows()); + pipeline.run(); + } + + @Test(expected = IllegalStateException.class) + public void testException_nonEqualJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1" + + " JOIN ORDER_DETAILS2 o2" + + " on " + + " o1.order_id>o2.site_id" + ; + + pipeline.enableAbandonedNodeEnforcement(false); + queryFromOrderTables(sql); + pipeline.run(); + } + + @Test(expected = IllegalStateException.class) + public void testException_crossJoin() throws Exception { + String sql = + "SELECT * " + + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2"; + + pipeline.enableAbandonedNodeEnforcement(false); + queryFromOrderTables(sql); + pipeline.run(); + } + + private PCollection<BeamSqlRow> queryFromOrderTables(String sql) { + return PCollectionTuple + .of( + new TupleTag<BeamSqlRow>("ORDER_DETAILS1"), + ORDER_DETAILS1.buildIOReader(pipeline).setCoder(SOURCE_CODER) + ) + .and(new TupleTag<BeamSqlRow>("ORDER_DETAILS2"), + ORDER_DETAILS2.buildIOReader(pipeline).setCoder(SOURCE_CODER) + ).apply("join", BeamSql.query(sql)).setCoder(RESULT_CODER); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java new file mode 100644 index 0000000..6468011 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslProjectTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql; + +import java.sql.Types; +import java.util.Arrays; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.junit.Test; + +/** + * Tests for field-project in queries with BOUNDED PCollection. + */ +public class BeamSqlDslProjectTest extends BeamSqlDslBase { + /** + * select all fields with bounded PCollection. + */ + @Test + public void testSelectAllWithBounded() throws Exception { + runSelectAll(boundedInput2); + } + + /** + * select all fields with unbounded PCollection. + */ + @Test + public void testSelectAllWithUnbounded() throws Exception { + runSelectAll(unboundedInput2); + } + + private void runSelectAll(PCollection<BeamSqlRow> input) throws Exception { + String sql = "SELECT * FROM PCOLLECTION"; + + PCollection<BeamSqlRow> result = + input.apply("testSelectAll", BeamSql.simpleQuery(sql)); + + PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0)); + + pipeline.run().waitUntilFinish(); + } + + /** + * select partial fields with bounded PCollection. + */ + @Test + public void testPartialFieldsWithBounded() throws Exception { + runPartialFields(boundedInput2); + } + + /** + * select partial fields with unbounded PCollection. + */ + @Test + public void testPartialFieldsWithUnbounded() throws Exception { + runPartialFields(unboundedInput2); + } + + private void runPartialFields(PCollection<BeamSqlRow> input) throws Exception { + String sql = "SELECT f_int, f_long FROM TABLE_A"; + + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) + .apply("testPartialFields", BeamSql.query(sql)); + + BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"), + Arrays.asList(Types.INTEGER, Types.BIGINT)); + + BeamSqlRow record = new BeamSqlRow(resultType); + record.addField("f_int", recordsInTableA.get(0).getFieldValue(0)); + record.addField("f_long", recordsInTableA.get(0).getFieldValue(1)); + + PAssert.that(result).containsInAnyOrder(record); + + pipeline.run().waitUntilFinish(); + } + + /** + * select partial fields for multiple rows with bounded PCollection. + */ + @Test + public void testPartialFieldsInMultipleRowWithBounded() throws Exception { + runPartialFieldsInMultipleRow(boundedInput1); + } + + /** + * select partial fields for multiple rows with unbounded PCollection. + */ + @Test + public void testPartialFieldsInMultipleRowWithUnbounded() throws Exception { + runPartialFieldsInMultipleRow(unboundedInput1); + } + + private void runPartialFieldsInMultipleRow(PCollection<BeamSqlRow> input) throws Exception { + String sql = "SELECT f_int, f_long FROM TABLE_A"; + + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) + .apply("testPartialFieldsInMultipleRow", BeamSql.query(sql)); + + BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"), + Arrays.asList(Types.INTEGER, Types.BIGINT)); + + BeamSqlRow record1 = new BeamSqlRow(resultType); + record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0)); + record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1)); + + BeamSqlRow record2 = new BeamSqlRow(resultType); + record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0)); + record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1)); + + BeamSqlRow record3 = new BeamSqlRow(resultType); + record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0)); + record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1)); + + BeamSqlRow record4 = new BeamSqlRow(resultType); + record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0)); + record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1)); + + PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4); + + pipeline.run().waitUntilFinish(); + } + + /** + * select partial fields with bounded PCollection. + */ + @Test + public void testPartialFieldsInRowsWithBounded() throws Exception { + runPartialFieldsInRows(boundedInput1); + } + + /** + * select partial fields with unbounded PCollection. + */ + @Test + public void testPartialFieldsInRowsWithUnbounded() throws Exception { + runPartialFieldsInRows(unboundedInput1); + } + + private void runPartialFieldsInRows(PCollection<BeamSqlRow> input) throws Exception { + String sql = "SELECT f_int, f_long FROM TABLE_A"; + + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) + .apply("testPartialFieldsInRows", BeamSql.query(sql)); + + BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"), + Arrays.asList(Types.INTEGER, Types.BIGINT)); + + BeamSqlRow record1 = new BeamSqlRow(resultType); + record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0)); + record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1)); + + BeamSqlRow record2 = new BeamSqlRow(resultType); + record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0)); + record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1)); + + BeamSqlRow record3 = new BeamSqlRow(resultType); + record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0)); + record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1)); + + BeamSqlRow record4 = new BeamSqlRow(resultType); + record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0)); + record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1)); + + PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4); + + pipeline.run().waitUntilFinish(); + } + + /** + * select literal field with bounded PCollection. + */ + @Test + public void testLiteralFieldWithBounded() throws Exception { + runLiteralField(boundedInput2); + } + + /** + * select literal field with unbounded PCollection. + */ + @Test + public void testLiteralFieldWithUnbounded() throws Exception { + runLiteralField(unboundedInput2); + } + + public void runLiteralField(PCollection<BeamSqlRow> input) throws Exception { + String sql = "SELECT 1 as literal_field FROM TABLE_A"; + + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) + .apply("testLiteralField", BeamSql.query(sql)); + + BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("literal_field"), + Arrays.asList(Types.INTEGER)); + + BeamSqlRow record = new BeamSqlRow(resultType); + record.addField("literal_field", 1); + + PAssert.that(result).containsInAnyOrder(record); + + pipeline.run().waitUntilFinish(); + } + + @Test + public void testProjectUnknownField() throws Exception { + exceptions.expect(IllegalStateException.class); + exceptions.expectMessage("Column 'f_int_na' not found in any table"); + pipeline.enableAbandonedNodeEnforcement(false); + + String sql = "SELECT f_int_na FROM TABLE_A"; + + PCollection<BeamSqlRow> result = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1) + .apply("testProjectUnknownField", BeamSql.query(sql)); + + pipeline.run().waitUntilFinish(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java new file mode 100644 index 0000000..46cab09 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql; + +import java.sql.Types; +import java.util.Arrays; +import java.util.Iterator; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.junit.Test; + +/** + * Tests for UDF/UDAF. + */ +public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { + /** + * GROUP-BY with UDAF. + */ + @Test + public void testUdaf() throws Exception { + BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "squaresum"), + Arrays.asList(Types.INTEGER, Types.INTEGER)); + + BeamSqlRow record = new BeamSqlRow(resultType); + record.addField("f_int2", 0); + record.addField("squaresum", 30); + + String sql1 = "SELECT f_int2, squaresum1(f_int) AS `squaresum`" + + " FROM PCOLLECTION GROUP BY f_int2"; + PCollection<BeamSqlRow> result1 = + boundedInput1.apply("testUdaf1", + BeamSql.simpleQuery(sql1).withUdaf("squaresum1", SquareSum.class)); + PAssert.that(result1).containsInAnyOrder(record); + + String sql2 = "SELECT f_int2, squaresum2(f_int) AS `squaresum`" + + " FROM PCOLLECTION GROUP BY f_int2"; + PCollection<BeamSqlRow> result2 = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1) + .apply("testUdaf2", + BeamSql.query(sql2).withUdaf("squaresum2", SquareSum.class)); + PAssert.that(result2).containsInAnyOrder(record); + + pipeline.run().waitUntilFinish(); + } + + /** + * test UDF. + */ + @Test + public void testUdf() throws Exception{ + BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "cubicvalue"), + Arrays.asList(Types.INTEGER, Types.INTEGER)); + + BeamSqlRow record = new BeamSqlRow(resultType); + record.addField("f_int", 2); + record.addField("cubicvalue", 8); + + String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2"; + PCollection<BeamSqlRow> result1 = + boundedInput1.apply("testUdf1", + BeamSql.simpleQuery(sql1).withUdf("cubic1", CubicInteger.class)); + PAssert.that(result1).containsInAnyOrder(record); + + String sql2 = "SELECT f_int, cubic2(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2"; + PCollection<BeamSqlRow> result2 = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1) + .apply("testUdf2", + BeamSql.query(sql2).withUdf("cubic2", CubicInteger.class)); + PAssert.that(result2).containsInAnyOrder(record); + + pipeline.run().waitUntilFinish(); + } + + /** + * UDAF for test, which returns the sum of square. + */ + public static class SquareSum extends BeamSqlUdaf<Integer, Integer, Integer> { + + public SquareSum() { + } + + @Override + public Integer init() { + return 0; + } + + @Override + public Integer add(Integer accumulator, Integer input) { + return accumulator + input * input; + } + + @Override + public Integer merge(Iterable<Integer> accumulators) { + int v = 0; + Iterator<Integer> ite = accumulators.iterator(); + while (ite.hasNext()) { + v += ite.next(); + } + return v; + } + + @Override + public Integer result(Integer accumulator) { + return accumulator; + } + + } + + /** + * A example UDF for test. + */ + public static class CubicInteger implements BeamSqlUdf { + public static Integer eval(Integer input){ + return input * input * input; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java new file mode 100644 index 0000000..9995b0a --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.transforms.DoFn; + +/** + * Test utilities. + */ +public class TestUtils { + /** + * A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code String}. + */ + public static class BeamSqlRow2StringDoFn extends DoFn<BeamSqlRow, String> { + @ProcessElement + public void processElement(ProcessContext ctx) { + ctx.output(ctx.element().valueInString()); + } + } + + /** + * Convert list of {@code BeamSqlRow} to list of {@code String}. + */ + public static List<String> beamSqlRows2Strings(List<BeamSqlRow> rows) { + List<String> strs = new ArrayList<>(); + for (BeamSqlRow row : rows) { + strs.add(row.valueInString()); + } + + return strs; + } + + /** + * Convenient way to build a list of {@code BeamSqlRow}s. + * + * <p>You can use it like this: + * + * <pre>{@code + * TestUtils.RowsBuilder.of( + * Types.INTEGER, "order_id", + * Types.INTEGER, "sum_site_id", + * Types.VARCHAR, "buyer" + * ).addRows( + * 1, 3, "james", + * 2, 5, "bond" + * ).getStringRows() + * }</pre> + * {@code} + */ + public static class RowsBuilder { + private BeamSqlRowType type; + private List<BeamSqlRow> rows = new ArrayList<>(); + + /** + * Create a RowsBuilder with the specified row type info. + * + * <p>For example: + * <pre>{@code + * TestUtils.RowsBuilder.of( + * Types.INTEGER, "order_id", + * Types.INTEGER, "sum_site_id", + * Types.VARCHAR, "buyer" + * )}</pre> + * + * @args pairs of column type and column names. + */ + public static RowsBuilder of(final Object... args) { + BeamSqlRowType beamSQLRowType = buildBeamSqlRowType(args); + RowsBuilder builder = new RowsBuilder(); + builder.type = beamSQLRowType; + + return builder; + } + + /** + * Create a RowsBuilder with the specified row type info. + * + * <p>For example: + * <pre>{@code + * TestUtils.RowsBuilder.of( + * beamSqlRowType + * )}</pre> + * @beamSQLRowType the record type. + */ + public static RowsBuilder of(final BeamSqlRowType beamSQLRowType) { + RowsBuilder builder = new RowsBuilder(); + builder.type = beamSQLRowType; + + return builder; + } + + /** + * Add rows to the builder. + * + * <p>Note: check the class javadoc for for detailed example. + */ + public RowsBuilder addRows(final Object... args) { + this.rows.addAll(buildRows(type, Arrays.asList(args))); + return this; + } + + /** + * Add rows to the builder. + * + * <p>Note: check the class javadoc for for detailed example. + */ + public RowsBuilder addRows(final List args) { + this.rows.addAll(buildRows(type, args)); + return this; + } + + public List<BeamSqlRow> getRows() { + return rows; + } + + public List<String> getStringRows() { + return beamSqlRows2Strings(rows); + } + } + + /** + * Convenient way to build a {@code BeamSqlRowType}. + * + * <p>e.g. + * + * <pre>{@code + * buildBeamSqlRowType( + * Types.BIGINT, "order_id", + * Types.INTEGER, "site_id", + * Types.DOUBLE, "price", + * Types.TIMESTAMP, "order_time" + * ) + * }</pre> + */ + public static BeamSqlRowType buildBeamSqlRowType(Object... args) { + List<Integer> types = new ArrayList<>(); + List<String> names = new ArrayList<>(); + + for (int i = 0; i < args.length - 1; i += 2) { + types.add((int) args[i]); + names.add((String) args[i + 1]); + } + + return BeamSqlRowType.create(names, types); + } + + /** + * Convenient way to build a {@code BeamSqlRow}s. + * + * <p>e.g. + * + * <pre>{@code + * buildRows( + * rowType, + * 1, 1, 1, // the first row + * 2, 2, 2, // the second row + * ... + * ) + * }</pre> + */ + public static List<BeamSqlRow> buildRows(BeamSqlRowType type, List args) { + List<BeamSqlRow> rows = new ArrayList<>(); + int fieldCount = type.size(); + + for (int i = 0; i < args.size(); i += fieldCount) { + BeamSqlRow row = new BeamSqlRow(type); + for (int j = 0; j < fieldCount; j++) { + row.addField(j, args.get(i + j)); + } + rows.add(row); + } + return rows; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java new file mode 100644 index 0000000..5e626a2 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.integrationtest; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import org.junit.Test; + +/** + * Integration test for arithmetic operators. + */ +public class BeamSqlArithmeticOperatorsIntegrationTest + extends BeamSqlBuiltinFunctionsIntegrationTestBase { + + private static final BigDecimal ZERO = BigDecimal.valueOf(0.0); + private static final BigDecimal ONE0 = BigDecimal.valueOf(1); + private static final BigDecimal ONE = BigDecimal.valueOf(1.0); + private static final BigDecimal ONE2 = BigDecimal.valueOf(1.0).multiply(BigDecimal.valueOf(1.0)); + private static final BigDecimal ONE10 = BigDecimal.ONE.divide( + BigDecimal.ONE, 10, RoundingMode.HALF_EVEN); + private static final BigDecimal TWO = BigDecimal.valueOf(2.0); + + @Test + public void testPlus() throws Exception { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("1 + 1", 2) + .addExpr("1.0 + 1", TWO) + .addExpr("1 + 1.0", TWO) + .addExpr("1.0 + 1.0", TWO) + .addExpr("c_tinyint + c_tinyint", (byte) 2) + .addExpr("c_smallint + c_smallint", (short) 2) + .addExpr("c_bigint + c_bigint", 2L) + .addExpr("c_decimal + c_decimal", TWO) + .addExpr("c_tinyint + c_decimal", TWO) + .addExpr("c_float + c_decimal", 2.0) + .addExpr("c_double + c_decimal", 2.0) + .addExpr("c_float + c_float", 2.0f) + .addExpr("c_double + c_float", 2.0) + .addExpr("c_double + c_double", 2.0) + .addExpr("c_float + c_bigint", 2.0f) + .addExpr("c_double + c_bigint", 2.0) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testPlus_overflow() throws Exception { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("c_tinyint_max + c_tinyint_max", (byte) -2) + .addExpr("c_smallint_max + c_smallint_max", (short) -2) + .addExpr("c_integer_max + c_integer_max", -2) + // yeah, I know 384L is strange, but since it is already overflowed + // what the actualy result is not so important, it is wrong any way. + .addExpr("c_bigint_max + c_bigint_max", 384L) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testMinus() throws Exception { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("1 - 1", 0) + .addExpr("1.0 - 1", ZERO) + .addExpr("1 - 0.0", ONE) + .addExpr("1.0 - 1.0", ZERO) + .addExpr("c_tinyint - c_tinyint", (byte) 0) + .addExpr("c_smallint - c_smallint", (short) 0) + .addExpr("c_bigint - c_bigint", 0L) + .addExpr("c_decimal - c_decimal", ZERO) + .addExpr("c_tinyint - c_decimal", ZERO) + .addExpr("c_float - c_decimal", 0.0) + .addExpr("c_double - c_decimal", 0.0) + .addExpr("c_float - c_float", 0.0f) + .addExpr("c_double - c_float", 0.0) + .addExpr("c_double - c_double", 0.0) + .addExpr("c_float - c_bigint", 0.0f) + .addExpr("c_double - c_bigint", 0.0) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testMultiply() throws Exception { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("1 * 1", 1) + .addExpr("1.0 * 1", ONE2) + .addExpr("1 * 1.0", ONE2) + .addExpr("1.0 * 1.0", ONE2) + .addExpr("c_tinyint * c_tinyint", (byte) 1) + .addExpr("c_smallint * c_smallint", (short) 1) + .addExpr("c_bigint * c_bigint", 1L) + .addExpr("c_decimal * c_decimal", ONE2) + .addExpr("c_tinyint * c_decimal", ONE2) + .addExpr("c_float * c_decimal", 1.0) + .addExpr("c_double * c_decimal", 1.0) + .addExpr("c_float * c_float", 1.0f) + .addExpr("c_double * c_float", 1.0) + .addExpr("c_double * c_double", 1.0) + .addExpr("c_float * c_bigint", 1.0f) + .addExpr("c_double * c_bigint", 1.0) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testDivide() throws Exception { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("1 / 1", 1) + .addExpr("1.0 / 1", ONE10) + .addExpr("1 / 1.0", ONE10) + .addExpr("1.0 / 1.0", ONE10) + .addExpr("c_tinyint / c_tinyint", (byte) 1) + .addExpr("c_smallint / c_smallint", (short) 1) + .addExpr("c_bigint / c_bigint", 1L) + .addExpr("c_decimal / c_decimal", ONE10) + .addExpr("c_tinyint / c_decimal", ONE10) + .addExpr("c_float / c_decimal", 1.0) + .addExpr("c_double / c_decimal", 1.0) + .addExpr("c_float / c_float", 1.0f) + .addExpr("c_double / c_float", 1.0) + .addExpr("c_double / c_double", 1.0) + .addExpr("c_float / c_bigint", 1.0f) + .addExpr("c_double / c_bigint", 1.0) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testMod() throws Exception { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("mod(1, 1)", 0) + .addExpr("mod(1.0, 1)", 0) + .addExpr("mod(1, 1.0)", ZERO) + .addExpr("mod(1.0, 1.0)", ZERO) + .addExpr("mod(c_tinyint, c_tinyint)", (byte) 0) + .addExpr("mod(c_smallint, c_smallint)", (short) 0) + .addExpr("mod(c_bigint, c_bigint)", 0L) + .addExpr("mod(c_decimal, c_decimal)", ZERO) + .addExpr("mod(c_tinyint, c_decimal)", ZERO) + ; + + checker.buildRunAndCheck(); + } +}