http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java deleted file mode 100644 index 5d5d4fc..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java +++ /dev/null @@ -1,453 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.schema.transform; - -import java.text.ParseException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.fun.SqlAvgAggFunction; -import org.apache.calcite.sql.fun.SqlCountAggFunction; -import org.apache.calcite.sql.fun.SqlMinMaxAggFunction; -import org.apache.calcite.sql.fun.SqlSumAggFunction; -import org.apache.calcite.sql.type.BasicSqlType; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.util.ImmutableBitSet; -import org.junit.Rule; -import org.junit.Test; - -/** - * Unit tests for {@link BeamAggregationTransforms}. - * - */ -public class BeamAggregationTransformTest extends BeamTransformBaseTest{ - - @Rule - public TestPipeline p = TestPipeline.create(); - - private List<AggregateCall> aggCalls; - - private BeamSqlRowType keyType; - private BeamSqlRowType aggPartType; - private BeamSqlRowType outputType; - - private BeamSqlRowCoder inRecordCoder; - private BeamSqlRowCoder keyCoder; - private BeamSqlRowCoder aggCoder; - private BeamSqlRowCoder outRecordCoder; - - /** - * This step equals to below query. - * <pre> - * SELECT `f_int` - * , COUNT(*) AS `size` - * , SUM(`f_long`) AS `sum1`, AVG(`f_long`) AS `avg1` - * , MAX(`f_long`) AS `max1`, MIN(`f_long`) AS `min1` - * , SUM(`f_short`) AS `sum2`, AVG(`f_short`) AS `avg2` - * , MAX(`f_short`) AS `max2`, MIN(`f_short`) AS `min2` - * , SUM(`f_byte`) AS `sum3`, AVG(`f_byte`) AS `avg3` - * , MAX(`f_byte`) AS `max3`, MIN(`f_byte`) AS `min3` - * , SUM(`f_float`) AS `sum4`, AVG(`f_float`) AS `avg4` - * , MAX(`f_float`) AS `max4`, MIN(`f_float`) AS `min4` - * , SUM(`f_double`) AS `sum5`, AVG(`f_double`) AS `avg5` - * , MAX(`f_double`) AS `max5`, MIN(`f_double`) AS `min5` - * , MAX(`f_timestamp`) AS `max7`, MIN(`f_timestamp`) AS `min7` - * ,SUM(`f_int2`) AS `sum8`, AVG(`f_int2`) AS `avg8` - * , MAX(`f_int2`) AS `max8`, MIN(`f_int2`) AS `min8` - * FROM TABLE_NAME - * GROUP BY `f_int` - * </pre> - * @throws ParseException - */ - @Test - public void testCountPerElementBasic() throws ParseException { - setupEnvironment(); - - PCollection<BeamSqlRow> input = p.apply(Create.of(inputRows)); - - //1. extract fields in group-by key part - PCollection<KV<BeamSqlRow, BeamSqlRow>> exGroupByStream = input.apply("exGroupBy", - WithKeys - .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0)))) - .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, inRecordCoder)); - - //2. apply a GroupByKey. - PCollection<KV<BeamSqlRow, Iterable<BeamSqlRow>>> groupedStream = exGroupByStream - .apply("groupBy", GroupByKey.<BeamSqlRow, BeamSqlRow>create()) - .setCoder(KvCoder.<BeamSqlRow, Iterable<BeamSqlRow>>of(keyCoder, - IterableCoder.<BeamSqlRow>of(inRecordCoder))); - - //3. run aggregation functions - PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = groupedStream.apply("aggregation", - Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>groupedValues( - new BeamAggregationTransforms.AggregationAdaptor(aggCalls, inputRowType))) - .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, aggCoder)); - - //4. flat KV to a single record - PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply("mergeRecord", - ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls, -1))); - mergedStream.setCoder(outRecordCoder); - - //assert function BeamAggregationTransform.AggregationGroupByKeyFn - PAssert.that(exGroupByStream).containsInAnyOrder(prepareResultOfAggregationGroupByKeyFn()); - - //assert BeamAggregationTransform.AggregationCombineFn - PAssert.that(aggregatedStream).containsInAnyOrder(prepareResultOfAggregationCombineFn()); - - //assert BeamAggregationTransform.MergeAggregationRecord - PAssert.that(mergedStream).containsInAnyOrder(prepareResultOfMergeAggregationRecord()); - - p.run(); -} - - private void setupEnvironment() { - prepareAggregationCalls(); - prepareTypeAndCoder(); - } - - /** - * create list of all {@link AggregateCall}. - */ - @SuppressWarnings("deprecation") - private void prepareAggregationCalls() { - //aggregations for all data type - aggCalls = new ArrayList<>(); - aggCalls.add( - new AggregateCall(new SqlCountAggFunction(), false, - Arrays.<Integer>asList(), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), - "count") - ); - aggCalls.add( - new AggregateCall(new SqlSumAggFunction( - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT)), false, - Arrays.<Integer>asList(1), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), - "sum1") - ); - aggCalls.add( - new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, - Arrays.<Integer>asList(1), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), - "avg1") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(1), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), - "max1") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(1), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.BIGINT), - "min1") - ); - - aggCalls.add( - new AggregateCall(new SqlSumAggFunction( - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT)), false, - Arrays.<Integer>asList(2), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT), - "sum2") - ); - aggCalls.add( - new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, - Arrays.<Integer>asList(2), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT), - "avg2") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(2), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT), - "max2") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(2), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.SMALLINT), - "min2") - ); - - aggCalls.add( - new AggregateCall( - new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT)), - false, - Arrays.<Integer>asList(3), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT), - "sum3") - ); - aggCalls.add( - new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, - Arrays.<Integer>asList(3), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT), - "avg3") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(3), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT), - "max3") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(3), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TINYINT), - "min3") - ); - - aggCalls.add( - new AggregateCall( - new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT)), - false, - Arrays.<Integer>asList(4), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT), - "sum4") - ); - aggCalls.add( - new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, - Arrays.<Integer>asList(4), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT), - "avg4") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(4), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT), - "max4") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(4), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.FLOAT), - "min4") - ); - - aggCalls.add( - new AggregateCall( - new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE)), - false, - Arrays.<Integer>asList(5), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE), - "sum5") - ); - aggCalls.add( - new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, - Arrays.<Integer>asList(5), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE), - "avg5") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(5), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE), - "max5") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(5), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.DOUBLE), - "min5") - ); - - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(7), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP), - "max7") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(7), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.TIMESTAMP), - "min7") - ); - - aggCalls.add( - new AggregateCall( - new SqlSumAggFunction(new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER)), - false, - Arrays.<Integer>asList(8), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER), - "sum8") - ); - aggCalls.add( - new AggregateCall(new SqlAvgAggFunction(SqlKind.AVG), false, - Arrays.<Integer>asList(8), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER), - "avg8") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MAX), false, - Arrays.<Integer>asList(8), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER), - "max8") - ); - aggCalls.add( - new AggregateCall(new SqlMinMaxAggFunction(SqlKind.MIN), false, - Arrays.<Integer>asList(8), - new BasicSqlType(RelDataTypeSystem.DEFAULT, SqlTypeName.INTEGER), - "min8") - ); - } - - /** - * Coders used in aggregation steps. - */ - private void prepareTypeAndCoder() { - inRecordCoder = new BeamSqlRowCoder(inputRowType); - - keyType = initTypeOfSqlRow(Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER))); - keyCoder = new BeamSqlRowCoder(keyType); - - aggPartType = initTypeOfSqlRow( - Arrays.asList(KV.of("count", SqlTypeName.BIGINT), - - KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT), - KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT), - - KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT), - KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT), - - KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT), - KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT), - - KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT), - KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT), - - KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE), - KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE), - - KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP), - - KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER), - KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER) - )); - aggCoder = new BeamSqlRowCoder(aggPartType); - - outputType = prepareFinalRowType(); - outRecordCoder = new BeamSqlRowCoder(outputType); - } - - /** - * expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}. - */ - private List<KV<BeamSqlRow, BeamSqlRow>> prepareResultOfAggregationGroupByKeyFn() { - return Arrays.asList( - KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))), - inputRows.get(0)), - KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))), - inputRows.get(1)), - KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))), - inputRows.get(2)), - KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))), - inputRows.get(3))); - } - - /** - * expected results after {@link BeamAggregationTransforms.AggregationCombineFn}. - */ - private List<KV<BeamSqlRow, BeamSqlRow>> prepareResultOfAggregationCombineFn() - throws ParseException { - return Arrays.asList( - KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))), - new BeamSqlRow(aggPartType, Arrays.<Object>asList( - 4L, - 10000L, 2500L, 4000L, 1000L, - (short) 10, (short) 2, (short) 4, (short) 1, - (byte) 10, (byte) 2, (byte) 4, (byte) 1, - 10.0F, 2.5F, 4.0F, 1.0F, - 10.0, 2.5, 4.0, 1.0, - format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"), - 10, 2, 4, 1 - ))) - ); - } - - /** - * Row type of final output row. - */ - private BeamSqlRowType prepareFinalRowType() { - FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder(); - List<KV<String, SqlTypeName>> columnMetadata = - Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT), - - KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT), - KV.of("max1", SqlTypeName.BIGINT), KV.of("min1", SqlTypeName.BIGINT), - - KV.of("sum2", SqlTypeName.SMALLINT), KV.of("avg2", SqlTypeName.SMALLINT), - KV.of("max2", SqlTypeName.SMALLINT), KV.of("min2", SqlTypeName.SMALLINT), - - KV.of("sum3", SqlTypeName.TINYINT), KV.of("avg3", SqlTypeName.TINYINT), - KV.of("max3", SqlTypeName.TINYINT), KV.of("min3", SqlTypeName.TINYINT), - - KV.of("sum4", SqlTypeName.FLOAT), KV.of("avg4", SqlTypeName.FLOAT), - KV.of("max4", SqlTypeName.FLOAT), KV.of("min4", SqlTypeName.FLOAT), - - KV.of("sum5", SqlTypeName.DOUBLE), KV.of("avg5", SqlTypeName.DOUBLE), - KV.of("max5", SqlTypeName.DOUBLE), KV.of("min5", SqlTypeName.DOUBLE), - - KV.of("max7", SqlTypeName.TIMESTAMP), KV.of("min7", SqlTypeName.TIMESTAMP), - - KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER), - KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER) - ); - for (KV<String, SqlTypeName> cm : columnMetadata) { - builder.add(cm.getKey(), cm.getValue()); - } - return CalciteUtils.toBeamRowType(builder.build()); - } - - /** - * expected results after {@link BeamAggregationTransforms.MergeAggregationRecord}. - */ - private BeamSqlRow prepareResultOfMergeAggregationRecord() throws ParseException { - return new BeamSqlRow(outputType, Arrays.<Object>asList( - 1, 4L, - 10000L, 2500L, 4000L, 1000L, - (short) 10, (short) 2, (short) 4, (short) 1, - (byte) 10, (byte) 2, (byte) 4, (byte) 1, - 10.0F, 2.5F, 4.0F, 1.0F, - 10.0, 2.5, 4.0, 1.0, - format.parse("2017-01-01 02:04:03"), format.parse("2017-01-01 01:01:03"), - 10, 2, 4, 1 - )); - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java deleted file mode 100644 index 4045bc8..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.schema.transform; - -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.values.KV; -import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.BeforeClass; - -/** - * shared methods to test PTransforms which execute Beam SQL steps. - * - */ -public class BeamTransformBaseTest { - public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - - public static BeamSqlRowType inputRowType; - public static List<BeamSqlRow> inputRows; - - @BeforeClass - public static void prepareInput() throws NumberFormatException, ParseException{ - List<KV<String, SqlTypeName>> columnMetadata = Arrays.asList( - KV.of("f_int", SqlTypeName.INTEGER), KV.of("f_long", SqlTypeName.BIGINT), - KV.of("f_short", SqlTypeName.SMALLINT), KV.of("f_byte", SqlTypeName.TINYINT), - KV.of("f_float", SqlTypeName.FLOAT), KV.of("f_double", SqlTypeName.DOUBLE), - KV.of("f_string", SqlTypeName.VARCHAR), KV.of("f_timestamp", SqlTypeName.TIMESTAMP), - KV.of("f_int2", SqlTypeName.INTEGER) - ); - inputRowType = initTypeOfSqlRow(columnMetadata); - inputRows = Arrays.asList( - initBeamSqlRow(columnMetadata, - Arrays.<Object>asList(1, 1000L, Short.valueOf("1"), Byte.valueOf("1"), 1.0F, 1.0, - "string_row1", format.parse("2017-01-01 01:01:03"), 1)), - initBeamSqlRow(columnMetadata, - Arrays.<Object>asList(1, 2000L, Short.valueOf("2"), Byte.valueOf("2"), 2.0F, 2.0, - "string_row2", format.parse("2017-01-01 01:02:03"), 2)), - initBeamSqlRow(columnMetadata, - Arrays.<Object>asList(1, 3000L, Short.valueOf("3"), Byte.valueOf("3"), 3.0F, 3.0, - "string_row3", format.parse("2017-01-01 01:03:03"), 3)), - initBeamSqlRow(columnMetadata, Arrays.<Object>asList(1, 4000L, Short.valueOf("4"), - Byte.valueOf("4"), 4.0F, 4.0, "string_row4", format.parse("2017-01-01 02:04:03"), 4))); - } - - /** - * create a {@code BeamSqlRowType} for given column metadata. - */ - public static BeamSqlRowType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){ - FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder(); - for (KV<String, SqlTypeName> cm : columnMetadata) { - builder.add(cm.getKey(), cm.getValue()); - } - return CalciteUtils.toBeamRowType(builder.build()); - } - - /** - * Create an empty row with given column metadata. - */ - public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata) { - return initBeamSqlRow(columnMetadata, Arrays.asList()); - } - - /** - * Create a row with given column metadata, and values for each column. - * - */ - public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata, - List<Object> rowValues){ - BeamSqlRowType rowType = initTypeOfSqlRow(columnMetadata); - - return new BeamSqlRow(rowType, rowValues); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a86a680..803d30c 100644 --- a/pom.xml +++ b/pom.xml @@ -180,7 +180,6 @@ <module>sdks/java/build-tools</module> <module>sdks</module> <module>runners</module> - <module>dsls</module> <module>examples</module> <!-- sdks/java/javadoc builds project-wide Javadoc. It has to run last. --> <module>sdks/java/javadoc</module> http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml index 1222476..5465cf0 100644 --- a/sdks/java/extensions/pom.xml +++ b/sdks/java/extensions/pom.xml @@ -37,6 +37,7 @@ <module>join-library</module> <module>protobuf</module> <module>sorter</module> + <module>sql</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/pom.xml b/sdks/java/extensions/sql/pom.xml new file mode 100644 index 0000000..b4aa223 --- /dev/null +++ b/sdks/java/extensions/sql/pom.xml @@ -0,0 +1,226 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" + xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-parent</artifactId> + <version>2.2.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-sdks-java-extensions-sql</artifactId> + <name>Apache Beam :: SDKs :: Java :: Extensions :: SQL</name> + <description>Beam SQL provides a new interface to generate a Beam pipeline from SQL statement</description> + + <packaging>jar</packaging> + + <properties> + <timestamp>${maven.build.timestamp}</timestamp> + <maven.build.timestamp.format>yyyy-MM-dd HH:mm</maven.build.timestamp.format> + <calcite.version>1.13.0</calcite.version> + <avatica.version>1.10.0</avatica.version> + </properties> + + <profiles> + <!-- + The direct runner is available by default. + You can also include it on the classpath explicitly with -P direct-runner + --> + <profile> + <id>direct-runner</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <scope>runtime</scope> + </dependency> + </dependencies> + </profile> + </profiles> + + <build> + <resources> + <resource> + <directory>src/main/resources</directory> + <filtering>true</filtering> + </resource> + </resources> + + <pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + <!-- Set testSourceDirectory in order to exclude generated-test-sources --> + <testSourceDirectory>${project.basedir}/src/test/</testSourceDirectory> + </configuration> + </plugin> + </plugins> + </pluginManagement> + + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <argLine>-da</argLine> <!-- disable assert in Calcite converter validation --> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>bundle-and-repackage</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadeTestJar>true</shadeTestJar> + <artifactSet> + <includes> + <include>com.google.guava:guava</include> + </includes> + </artifactSet> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + </execution> + </executions> + </plugin> + + <!-- Coverage analysis for unit tests. --> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-core</artifactId> + <version>${calcite.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-linq4j</artifactId> + <version>${calcite.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.calcite.avatica</groupId> + <artifactId>avatica-core</artifactId> + <version>${avatica.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + <exclusions> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-lite</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-csv</artifactId> + </dependency> + + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-join-library</artifactId> + </dependency> + + <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <!-- this is a hack to make it available at compile time but not bundled.--> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-io-kafka</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <scope>provided</scope> + </dependency> + + <!-- for tests --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java new file mode 100644 index 0000000..d902f42 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql; + +import com.google.auto.value.AutoValue; +import org.apache.beam.dsls.sql.rel.BeamRelNode; +import org.apache.beam.dsls.sql.schema.BeamPCollectionTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.schema.BeamSqlUdaf; +import org.apache.beam.dsls.sql.schema.BeamSqlUdf; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.parser.SqlParseException; +import org.apache.calcite.tools.RelConversionException; +import org.apache.calcite.tools.ValidationException; + +/** + * {@code BeamSql} is the DSL interface of BeamSQL. It translates a SQL query as a + * {@link PTransform}, so developers can use standard SQL queries in a Beam pipeline. + * + * <h1>Beam SQL DSL usage:</h1> + * A typical pipeline with Beam SQL DSL is: + * <pre> + *{@code +PipelineOptions options = PipelineOptionsFactory.create(); +Pipeline p = Pipeline.create(options); + +//create table from TextIO; +PCollection<BeamSqlRow> inputTableA = p.apply(TextIO.read().from("/my/input/patha")) + .apply(...); +PCollection<BeamSqlRow> inputTableB = p.apply(TextIO.read().from("/my/input/pathb")) + .apply(...); + +//run a simple query, and register the output as a table in BeamSql; +String sql1 = "select MY_FUNC(c1), c2 from PCOLLECTION"; +PCollection<BeamSqlRow> outputTableA = inputTableA.apply( + BeamSql.simpleQuery(sql1) + .withUdf("MY_FUNC", MY_FUNC.class, "FUNC")); + +//run a JOIN with one table from TextIO, and one table from another query +PCollection<BeamSqlRow> outputTableB = PCollectionTuple.of( + new TupleTag<BeamSqlRow>("TABLE_O_A"), outputTableA) + .and(new TupleTag<BeamSqlRow>("TABLE_B"), inputTableB) + .apply(BeamSql.query("select * from TABLE_O_A JOIN TABLE_B where ...")); + +//output the final result with TextIO +outputTableB.apply(...).apply(TextIO.write().to("/my/output/path")); + +p.run().waitUntilFinish(); + * } + * </pre> + */ +@Experimental +public class BeamSql { + /** + * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan. + * + * <p>The returned {@link PTransform} can be applied to a {@link PCollectionTuple} representing + * all the input tables and results in a {@code PCollection<BeamSqlRow>} representing the output + * table. The {@link PCollectionTuple} contains the mapping from {@code table names} to + * {@code PCollection<BeamSqlRow>}, each representing an input table. + * + * <p>It is an error to apply a {@link PCollectionTuple} missing any {@code table names} + * referenced within the query. + */ + public static QueryTransform query(String sqlQuery) { + return QueryTransform.builder() + .setSqlEnv(new BeamSqlEnv()) + .setSqlQuery(sqlQuery) + .build(); + } + + /** + * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan. + * + * <p>This is a simplified form of {@link #query(String)} where the query must reference + * a single input table. + * + * <p>Make sure to query it from a static table name <em>PCOLLECTION</em>. + */ + public static SimpleQueryTransform simpleQuery(String sqlQuery) throws Exception { + return SimpleQueryTransform.builder() + .setSqlEnv(new BeamSqlEnv()) + .setSqlQuery(sqlQuery) + .build(); + } + + /** + * A {@link PTransform} representing an execution plan for a SQL query. + */ + @AutoValue + public abstract static class QueryTransform extends + PTransform<PCollectionTuple, PCollection<BeamSqlRow>> { + abstract BeamSqlEnv getSqlEnv(); + abstract String getSqlQuery(); + + static Builder builder() { + return new AutoValue_BeamSql_QueryTransform.Builder(); + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setSqlQuery(String sqlQuery); + abstract Builder setSqlEnv(BeamSqlEnv sqlEnv); + abstract QueryTransform build(); + } + + /** + * register a UDF function used in this query. + */ + public QueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){ + getSqlEnv().registerUdf(functionName, clazz); + return this; + } + + /** + * register a UDAF function used in this query. + */ + public QueryTransform withUdaf(String functionName, Class<? extends BeamSqlUdaf> clazz){ + getSqlEnv().registerUdaf(functionName, clazz); + return this; + } + + @Override + public PCollection<BeamSqlRow> expand(PCollectionTuple input) { + registerTables(input); + + BeamRelNode beamRelNode = null; + try { + beamRelNode = getSqlEnv().planner.convertToBeamRel(getSqlQuery()); + } catch (ValidationException | RelConversionException | SqlParseException e) { + throw new IllegalStateException(e); + } + + try { + return beamRelNode.buildBeamPipeline(input, getSqlEnv()); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + //register tables, related with input PCollections. + private void registerTables(PCollectionTuple input){ + for (TupleTag<?> sourceTag : input.getAll().keySet()) { + PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag); + BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder(); + + getSqlEnv().registerTable(sourceTag.getId(), + new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema())); + } + } + } + + /** + * A {@link PTransform} representing an execution plan for a SQL query referencing + * a single table. + */ + @AutoValue + public abstract static class SimpleQueryTransform + extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> { + private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION"; + abstract BeamSqlEnv getSqlEnv(); + abstract String getSqlQuery(); + + static Builder builder() { + return new AutoValue_BeamSql_SimpleQueryTransform.Builder(); + } + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setSqlQuery(String sqlQuery); + abstract Builder setSqlEnv(BeamSqlEnv sqlEnv); + abstract SimpleQueryTransform build(); + } + + /** + * register a UDF function used in this query. + */ + public SimpleQueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){ + getSqlEnv().registerUdf(functionName, clazz); + return this; + } + + /** + * register a UDAF function used in this query. + */ + public SimpleQueryTransform withUdaf(String functionName, Class<? extends BeamSqlUdaf> clazz){ + getSqlEnv().registerUdaf(functionName, clazz); + return this; + } + + private void validateQuery() { + SqlNode sqlNode; + try { + sqlNode = getSqlEnv().planner.parseQuery(getSqlQuery()); + getSqlEnv().planner.getPlanner().close(); + } catch (SqlParseException e) { + throw new IllegalStateException(e); + } + + if (sqlNode instanceof SqlSelect) { + SqlSelect select = (SqlSelect) sqlNode; + String tableName = select.getFrom().toString(); + if (!tableName.equalsIgnoreCase(PCOLLECTION_TABLE_NAME)) { + throw new IllegalStateException("Use fixed table name " + PCOLLECTION_TABLE_NAME); + } + } else { + throw new UnsupportedOperationException( + "Sql operation: " + sqlNode.toString() + " is not supported!"); + } + } + + @Override + public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) { + validateQuery(); + return PCollectionTuple.of(new TupleTag<BeamSqlRow>(PCOLLECTION_TABLE_NAME), input) + .apply(QueryTransform.builder() + .setSqlEnv(getSqlEnv()) + .setSqlQuery(getSqlQuery()) + .build()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java new file mode 100644 index 0000000..50da244 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql; + +import org.apache.beam.dsls.sql.rel.BeamRelNode; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.plan.RelOptUtil; + +/** + * {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client. + */ +@Experimental +public class BeamSqlCli { + /** + * Returns a human readable representation of the query execution plan. + */ + public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) throws Exception { + BeamRelNode exeTree = sqlEnv.planner.convertToBeamRel(sqlString); + String beamPlan = RelOptUtil.toString(exeTree); + return beamPlan; + } + + /** + * compile SQL, and return a {@link Pipeline}. + */ + public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv) + throws Exception{ + PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation() + .as(PipelineOptions.class); // FlinkPipelineOptions.class + options.setJobName("BeamPlanCreator"); + Pipeline pipeline = Pipeline.create(options); + + return compilePipeline(sqlStatement, pipeline, sqlEnv); + } + + /** + * compile SQL, and return a {@link Pipeline}. + */ + public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline + , BeamSqlEnv sqlEnv) throws Exception{ + PCollection<BeamSqlRow> resultStream = + sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv); + return resultStream; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java new file mode 100644 index 0000000..0e1ac98 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql; + +import java.io.Serializable; + +import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; +import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; +import org.apache.beam.dsls.sql.schema.BeamSqlUdaf; +import org.apache.beam.dsls.sql.schema.BeamSqlUdf; +import org.apache.beam.dsls.sql.utils.CalciteUtils; +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; +import org.apache.calcite.schema.impl.AggregateFunctionImpl; +import org.apache.calcite.schema.impl.ScalarFunctionImpl; +import org.apache.calcite.tools.Frameworks; + +/** + * {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and {@link BeamSqlCli}. + * + * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF functions, and + * a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries. + */ +public class BeamSqlEnv implements Serializable{ + transient SchemaPlus schema; + transient BeamQueryPlanner planner; + + public BeamSqlEnv() { + schema = Frameworks.createRootSchema(true); + planner = new BeamQueryPlanner(schema); + } + + /** + * Register a UDF function which can be used in SQL expression. + */ + public void registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) { + schema.add(functionName, ScalarFunctionImpl.create(clazz, BeamSqlUdf.UDF_METHOD)); + } + + /** + * Register a UDAF function which can be used in GROUP-BY expression. + * See {@link BeamSqlUdaf} on how to implement a UDAF. + */ + public void registerUdaf(String functionName, Class<? extends BeamSqlUdaf> clazz) { + schema.add(functionName, AggregateFunctionImpl.create(clazz)); + } + + /** + * Registers a {@link BaseBeamTable} which can be used for all subsequent queries. + * + */ + public void registerTable(String tableName, BaseBeamTable table) { + schema.add(tableName, new BeamCalciteTable(table.getRowType())); + planner.getSourceTables().put(tableName, table); + } + + /** + * Find {@link BaseBeamTable} by table name. + */ + public BaseBeamTable findTable(String tableName){ + return planner.getSourceTables().get(tableName); + } + + private static class BeamCalciteTable implements ScannableTable, Serializable { + private BeamSqlRowType beamSqlRowType; + public BeamCalciteTable(BeamSqlRowType beamSqlRowType) { + this.beamSqlRowType = beamSqlRowType; + } + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return CalciteUtils.toCalciteRowType(this.beamSqlRowType) + .apply(BeamQueryPlanner.TYPE_FACTORY); + } + + @Override + public Enumerable<Object[]> scan(DataContext root) { + // not used as Beam SQL uses its own execution engine + return null; + } + + /** + * Not used {@link Statistic} to optimize the plan. + */ + @Override + public Statistic getStatistic() { + return Statistics.UNKNOWN; + } + + /** + * all sources are treated as TABLE in Beam SQL. + */ + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java new file mode 100644 index 0000000..4e364e1 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.example; + +import java.sql.Types; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.dsls.sql.BeamSql; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; + +/** + * This is a quick example, which uses Beam SQL DSL to create a data pipeline. + * + * <p>Run the example with + * <pre> + * mvn -pl dsls/sql compile exec:java \ + * -Dexec.mainClass=org.apache.beam.dsls.sql.example.BeamSqlExample \ + * -Dexec.args="--runner=DirectRunner" -Pdirect-runner + * </pre> + * + */ +class BeamSqlExample { + public static void main(String[] args) throws Exception { + PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class); + Pipeline p = Pipeline.create(options); + + //define the input row format + List<String> fieldNames = Arrays.asList("c1", "c2", "c3"); + List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE); + BeamSqlRowType type = BeamSqlRowType.create(fieldNames, fieldTypes); + BeamSqlRow row = new BeamSqlRow(type); + row.addField(0, 1); + row.addField(1, "row"); + row.addField(2, 1.0); + + //create a source PCollection with Create.of(); + PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row) + .withCoder(new BeamSqlRowCoder(type))); + + //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery; + PCollection<BeamSqlRow> outputStream = inputTable.apply( + BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1=1")); + + //print the output record of case 1; + outputStream.apply("log_result", + MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() { + public Void apply(BeamSqlRow input) { + System.out.println("PCOLLECTION: " + input); + return null; + } + })); + + //Case 2. run the query with BeamSql.query over result PCollection of case 1. + PCollection<BeamSqlRow> outputStream2 = + PCollectionTuple.of(new TupleTag<BeamSqlRow>("CASE1_RESULT"), outputStream) + .apply(BeamSql.query("select c2, c3 from CASE1_RESULT where c1=1")); + + //print the output record of case 2; + outputStream2.apply("log_result", + MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() { + @Override + public Void apply(BeamSqlRow input) { + System.out.println("TABLE_B: " + input); + return null; + } + })); + + p.run().waitUntilFinish(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/package-info.java new file mode 100644 index 0000000..52a9fce --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * examples on how to use BeamSQL. + * + */ +package org.apache.beam.dsls.sql.example; http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java new file mode 100644 index 0000000..3732933 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter; + +import java.io.Serializable; +import java.util.List; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; + +/** + * {@code BeamSqlExpressionExecutor} fills the gap between relational + * expressions in Calcite SQL and executable code. + * + */ +public interface BeamSqlExpressionExecutor extends Serializable { + + /** + * invoked before data processing. + */ + void prepare(); + + /** + * apply transformation to input record {@link BeamSqlRow}. + * + */ + List<Object> execute(BeamSqlRow inputRow); + + void close(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java new file mode 100644 index 0000000..aee0e4a --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCastExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlReinterpretExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowEndExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowStartExpression; +import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression; +import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression; +import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression; +import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression; +import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression; +import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlEqualsExpression; +import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlGreaterThanExpression; +import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlGreaterThanOrEqualsExpression; +import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlIsNotNullExpression; +import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlIsNullExpression; +import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlLessThanExpression; +import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression; +import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlNotEqualsExpression; +import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentDateExpression; +import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentTimeExpression; +import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentTimestampExpression; +import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlDateCeilExpression; +import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlDateFloorExpression; +import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlExtractExpression; +import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlAndExpression; +import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlNotExpression; +import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlOrExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAbsExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAcosExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAsinExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAtan2Expression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAtanExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlCeilExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlCosExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlCotExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlDegreesExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlExpExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlFloorExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlLnExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlLogExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlPiExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlPowerExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRadiansExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRandExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRandIntegerExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRoundExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlSignExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlSinExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlTanExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlTruncateExpression; +import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlCharLengthExpression; +import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlConcatExpression; +import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlInitCapExpression; +import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlLowerExpression; +import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlOverlayExpression; +import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlPositionExpression; +import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlSubstringExpression; +import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlTrimExpression; +import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlUpperExpression; +import org.apache.beam.dsls.sql.rel.BeamFilterRel; +import org.apache.beam.dsls.sql.rel.BeamProjectRel; +import org.apache.beam.dsls.sql.rel.BeamRelNode; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.impl.ScalarFunctionImpl; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.validate.SqlUserDefinedFunction; +import org.apache.calcite.util.NlsString; + +/** + * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}. + * {@code BeamSqlFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression}, + * which can be evaluated against the {@link BeamSqlRow}. + * + */ +public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor { + protected List<BeamSqlExpression> exps; + + public BeamSqlFnExecutor(BeamRelNode relNode) { + this.exps = new ArrayList<>(); + if (relNode instanceof BeamFilterRel) { + BeamFilterRel filterNode = (BeamFilterRel) relNode; + RexNode condition = filterNode.getCondition(); + exps.add(buildExpression(condition)); + } else if (relNode instanceof BeamProjectRel) { + BeamProjectRel projectNode = (BeamProjectRel) relNode; + List<RexNode> projects = projectNode.getProjects(); + for (RexNode rexNode : projects) { + exps.add(buildExpression(rexNode)); + } + } else { + throw new UnsupportedOperationException( + String.format("%s is not supported yet!", relNode.getClass().toString())); + } + } + + /** + * {@link #buildExpression(RexNode)} visits the operands of {@link RexNode} recursively, + * and represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}. + */ + static BeamSqlExpression buildExpression(RexNode rexNode) { + BeamSqlExpression ret = null; + if (rexNode instanceof RexLiteral) { + RexLiteral node = (RexLiteral) rexNode; + SqlTypeName type = node.getTypeName(); + Object value = node.getValue(); + + if (SqlTypeName.CHAR_TYPES.contains(type) + && node.getValue() instanceof NlsString) { + // NlsString is not serializable, we need to convert + // it to string explicitly. + return BeamSqlPrimitive.of(type, ((NlsString) value).getValue()); + } else if (type == SqlTypeName.DATE && value instanceof Calendar) { + // does this actually make sense? + // Calcite actually treat Calendar as the java type of Date Literal + return BeamSqlPrimitive.of(type, ((Calendar) value).getTime()); + } else { + // node.getType().getSqlTypeName() and node.getSqlTypeName() can be different + // e.g. sql: "select 1" + // here the literal 1 will be parsed as a RexLiteral where: + // node.getType().getSqlTypeName() = INTEGER (the display type) + // node.getSqlTypeName() = DECIMAL (the actual internal storage format) + // So we need to do a convert here. + // check RexBuilder#makeLiteral for more information. + SqlTypeName realType = node.getType().getSqlTypeName(); + Object realValue = value; + if (type == SqlTypeName.DECIMAL) { + BigDecimal rawValue = (BigDecimal) value; + switch (realType) { + case TINYINT: + realValue = (byte) rawValue.intValue(); + break; + case SMALLINT: + realValue = (short) rawValue.intValue(); + break; + case INTEGER: + realValue = rawValue.intValue(); + break; + case BIGINT: + realValue = rawValue.longValue(); + break; + case DECIMAL: + realValue = rawValue; + break; + default: + throw new IllegalStateException("type/realType mismatch: " + + type + " VS " + realType); + } + } else if (type == SqlTypeName.DOUBLE) { + Double rawValue = (Double) value; + if (realType == SqlTypeName.FLOAT) { + realValue = rawValue.floatValue(); + } + } + return BeamSqlPrimitive.of(realType, realValue); + } + } else if (rexNode instanceof RexInputRef) { + RexInputRef node = (RexInputRef) rexNode; + ret = new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex()); + } else if (rexNode instanceof RexCall) { + RexCall node = (RexCall) rexNode; + String opName = node.op.getName(); + List<BeamSqlExpression> subExps = new ArrayList<>(); + for (RexNode subNode : node.getOperands()) { + subExps.add(buildExpression(subNode)); + } + switch (opName) { + // logical operators + case "AND": + ret = new BeamSqlAndExpression(subExps); + break; + case "OR": + ret = new BeamSqlOrExpression(subExps); + break; + case "NOT": + ret = new BeamSqlNotExpression(subExps); + break; + case "=": + ret = new BeamSqlEqualsExpression(subExps); + break; + case "<>": + ret = new BeamSqlNotEqualsExpression(subExps); + break; + case ">": + ret = new BeamSqlGreaterThanExpression(subExps); + break; + case ">=": + ret = new BeamSqlGreaterThanOrEqualsExpression(subExps); + break; + case "<": + ret = new BeamSqlLessThanExpression(subExps); + break; + case "<=": + ret = new BeamSqlLessThanOrEqualsExpression(subExps); + break; + + // arithmetic operators + case "+": + ret = new BeamSqlPlusExpression(subExps); + break; + case "-": + ret = new BeamSqlMinusExpression(subExps); + break; + case "*": + ret = new BeamSqlMultiplyExpression(subExps); + break; + case "/": + case "/INT": + ret = new BeamSqlDivideExpression(subExps); + break; + case "MOD": + ret = new BeamSqlModExpression(subExps); + break; + + case "ABS": + ret = new BeamSqlAbsExpression(subExps); + break; + case "ROUND": + ret = new BeamSqlRoundExpression(subExps); + break; + case "LN": + ret = new BeamSqlLnExpression(subExps); + break; + case "LOG10": + ret = new BeamSqlLogExpression(subExps); + break; + case "EXP": + ret = new BeamSqlExpExpression(subExps); + break; + case "ACOS": + ret = new BeamSqlAcosExpression(subExps); + break; + case "ASIN": + ret = new BeamSqlAsinExpression(subExps); + break; + case "ATAN": + ret = new BeamSqlAtanExpression(subExps); + break; + case "COT": + ret = new BeamSqlCotExpression(subExps); + break; + case "DEGREES": + ret = new BeamSqlDegreesExpression(subExps); + break; + case "RADIANS": + ret = new BeamSqlRadiansExpression(subExps); + break; + case "COS": + ret = new BeamSqlCosExpression(subExps); + break; + case "SIN": + ret = new BeamSqlSinExpression(subExps); + break; + case "TAN": + ret = new BeamSqlTanExpression(subExps); + break; + case "SIGN": + ret = new BeamSqlSignExpression(subExps); + break; + case "POWER": + ret = new BeamSqlPowerExpression(subExps); + break; + case "PI": + ret = new BeamSqlPiExpression(); + break; + case "ATAN2": + ret = new BeamSqlAtan2Expression(subExps); + break; + case "TRUNCATE": + ret = new BeamSqlTruncateExpression(subExps); + break; + case "RAND": + ret = new BeamSqlRandExpression(subExps); + break; + case "RAND_INTEGER": + ret = new BeamSqlRandIntegerExpression(subExps); + break; + + // string operators + case "||": + ret = new BeamSqlConcatExpression(subExps); + break; + case "POSITION": + ret = new BeamSqlPositionExpression(subExps); + break; + case "CHAR_LENGTH": + case "CHARACTER_LENGTH": + ret = new BeamSqlCharLengthExpression(subExps); + break; + case "UPPER": + ret = new BeamSqlUpperExpression(subExps); + break; + case "LOWER": + ret = new BeamSqlLowerExpression(subExps); + break; + case "TRIM": + ret = new BeamSqlTrimExpression(subExps); + break; + case "SUBSTRING": + ret = new BeamSqlSubstringExpression(subExps); + break; + case "OVERLAY": + ret = new BeamSqlOverlayExpression(subExps); + break; + case "INITCAP": + ret = new BeamSqlInitCapExpression(subExps); + break; + + // date functions + case "Reinterpret": + return new BeamSqlReinterpretExpression(subExps, node.type.getSqlTypeName()); + case "CEIL": + if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) { + return new BeamSqlCeilExpression(subExps); + } else { + return new BeamSqlDateCeilExpression(subExps); + } + case "FLOOR": + if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) { + return new BeamSqlFloorExpression(subExps); + } else { + return new BeamSqlDateFloorExpression(subExps); + } + case "EXTRACT_DATE": + case "EXTRACT": + return new BeamSqlExtractExpression(subExps); + + case "LOCALTIME": + case "CURRENT_TIME": + return new BeamSqlCurrentTimeExpression(subExps); + + case "CURRENT_TIMESTAMP": + case "LOCALTIMESTAMP": + return new BeamSqlCurrentTimestampExpression(subExps); + + case "CURRENT_DATE": + return new BeamSqlCurrentDateExpression(); + + + case "CASE": + ret = new BeamSqlCaseExpression(subExps); + break; + case "CAST": + ret = new BeamSqlCastExpression(subExps, node.type.getSqlTypeName()); + break; + + case "IS NULL": + ret = new BeamSqlIsNullExpression(subExps.get(0)); + break; + case "IS NOT NULL": + ret = new BeamSqlIsNotNullExpression(subExps.get(0)); + break; + + case "HOP": + case "TUMBLE": + case "SESSION": + ret = new BeamSqlWindowExpression(subExps, node.type.getSqlTypeName()); + break; + case "HOP_START": + case "TUMBLE_START": + case "SESSION_START": + ret = new BeamSqlWindowStartExpression(); + break; + case "HOP_END": + case "TUMBLE_END": + case "SESSION_END": + ret = new BeamSqlWindowEndExpression(); + break; + default: + //handle UDF + if (((RexCall) rexNode).getOperator() instanceof SqlUserDefinedFunction) { + SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) rexNode).getOperator(); + ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction(); + ret = new BeamSqlUdfExpression(fn.method, subExps, + ((RexCall) rexNode).type.getSqlTypeName()); + } else { + throw new UnsupportedOperationException("Operator: " + opName + " is not supported yet!"); + } + } + } else { + throw new UnsupportedOperationException( + String.format("%s is not supported yet!", rexNode.getClass().toString())); + } + + if (ret != null && !ret.accept()) { + throw new IllegalStateException(ret.getClass().getSimpleName() + + " does not accept the operands.(" + rexNode + ")"); + } + + return ret; + } + + @Override + public void prepare() { + } + + @Override + public List<Object> execute(BeamSqlRow inputRow) { + List<Object> results = new ArrayList<>(); + for (BeamSqlExpression exp : exps) { + results.add(exp.evaluate(inputRow).getValue()); + } + return results; + } + + @Override + public void close() { + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java new file mode 100644 index 0000000..a30916b --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.util.List; + +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlCaseExpression} represents CASE, NULLIF, COALESCE in SQL. + */ +public class BeamSqlCaseExpression extends BeamSqlExpression { + public BeamSqlCaseExpression(List<BeamSqlExpression> operands) { + // the return type of CASE is the type of the `else` condition + super(operands, operands.get(operands.size() - 1).getOutputType()); + } + + @Override public boolean accept() { + // `when`-`then` pair + `else` + if (operands.size() % 2 != 1) { + return false; + } + + for (int i = 0; i < operands.size() - 1; i += 2) { + if (opType(i) != SqlTypeName.BOOLEAN) { + return false; + } else if (opType(i + 1) != outputType) { + return false; + } + } + + return true; + } + + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + for (int i = 0; i < operands.size() - 1; i += 2) { + if (opValueEvaluated(i, inputRow)) { + return BeamSqlPrimitive.of( + outputType, + opValueEvaluated(i + 1, inputRow) + ); + } + } + return BeamSqlPrimitive.of(outputType, + opValueEvaluated(operands.size() - 1, inputRow)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java new file mode 100644 index 0000000..524d1df --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.sql.Date; +import java.sql.Timestamp; +import java.util.List; + +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.runtime.SqlFunctions; +import org.apache.calcite.sql.type.SqlTypeName; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.DateTimeFormatterBuilder; +import org.joda.time.format.DateTimeParser; + +/** + * Base class to support 'CAST' operations for all {@link SqlTypeName}. + */ +public class BeamSqlCastExpression extends BeamSqlExpression { + + private static final int index = 0; + private static final String outputTimestampFormat = "yyyy-MM-dd HH:mm:ss"; + private static final String outputDateFormat = "yyyy-MM-dd"; + /** + * Date and Timestamp formats used to parse + * {@link SqlTypeName#DATE}, {@link SqlTypeName#TIMESTAMP}. + */ + private static final DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder() + .append(null/*printer*/, new DateTimeParser[] { + // date formats + DateTimeFormat.forPattern("yy-MM-dd").getParser(), + DateTimeFormat.forPattern("yy/MM/dd").getParser(), + DateTimeFormat.forPattern("yy.MM.dd").getParser(), + DateTimeFormat.forPattern("yyMMdd").getParser(), + DateTimeFormat.forPattern("yyyyMMdd").getParser(), + DateTimeFormat.forPattern("yyyy-MM-dd").getParser(), + DateTimeFormat.forPattern("yyyy/MM/dd").getParser(), + DateTimeFormat.forPattern("yyyy.MM.dd").getParser(), + // datetime formats + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").getParser(), + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssz").getParser(), + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss z").getParser(), + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS").getParser(), + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSSz").getParser(), + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS z").getParser() }).toFormatter() + .withPivotYear(2020); + + public BeamSqlCastExpression(List<BeamSqlExpression> operands, SqlTypeName castType) { + super(operands, castType); + } + + @Override + public boolean accept() { + return numberOfOperands() == 1; + } + + @Override + public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + SqlTypeName castOutputType = getOutputType(); + switch (castOutputType) { + case INTEGER: + return BeamSqlPrimitive + .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow))); + case DOUBLE: + return BeamSqlPrimitive + .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRow))); + case SMALLINT: + return BeamSqlPrimitive + .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRow))); + case TINYINT: + return BeamSqlPrimitive + .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRow))); + case BIGINT: + return BeamSqlPrimitive + .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow))); + case DECIMAL: + return BeamSqlPrimitive.of(SqlTypeName.DECIMAL, + SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow))); + case FLOAT: + return BeamSqlPrimitive + .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow))); + case CHAR: + case VARCHAR: + return BeamSqlPrimitive + .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow).toString()); + case DATE: + return BeamSqlPrimitive + .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRow), outputDateFormat)); + case TIMESTAMP: + return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, + toTimeStamp(opValueEvaluated(index, inputRow), outputTimestampFormat)); + } + throw new UnsupportedOperationException( + String.format("Cast to type %s not supported", castOutputType)); + } + + private Date toDate(Object inputDate, String outputFormat) { + try { + return Date + .valueOf(dateTimeFormatter.parseLocalDate(inputDate.toString()).toString(outputFormat)); + } catch (IllegalArgumentException | UnsupportedOperationException e) { + throw new UnsupportedOperationException("Can't be cast to type 'Date'"); + } + } + + private Timestamp toTimeStamp(Object inputTimestamp, String outputFormat) { + try { + return Timestamp.valueOf( + dateTimeFormatter.parseDateTime(inputTimestamp.toString()).secondOfMinute() + .roundCeilingCopy().toString(outputFormat)); + } catch (IllegalArgumentException | UnsupportedOperationException e) { + throw new UnsupportedOperationException("Can't be cast to type 'Timestamp'"); + } + } +}