http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java deleted file mode 100644 index a5d92e7..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql; - -import java.math.BigDecimal; -import java.sql.Types; -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.TestStream; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.rules.ExpectedException; - -/** - * prepare input records to test {@link BeamSql}. - * - * <p>Note that, any change in these records would impact tests in this package. - * - */ -public class BeamSqlDslBase { - public static final DateFormat FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - @Rule - public ExpectedException exceptions = ExpectedException.none(); - - public static BeamSqlRowType rowTypeInTableA; - public static List<BeamSqlRow> recordsInTableA; - - //bounded PCollections - public PCollection<BeamSqlRow> boundedInput1; - public PCollection<BeamSqlRow> boundedInput2; - - //unbounded PCollections - public PCollection<BeamSqlRow> unboundedInput1; - public PCollection<BeamSqlRow> unboundedInput2; - - @BeforeClass - public static void prepareClass() throws ParseException { - rowTypeInTableA = BeamSqlRowType.create( - Arrays.asList("f_int", "f_long", "f_short", "f_byte", "f_float", "f_double", "f_string", - "f_timestamp", "f_int2", "f_decimal"), - Arrays.asList(Types.INTEGER, Types.BIGINT, Types.SMALLINT, Types.TINYINT, Types.FLOAT, - Types.DOUBLE, Types.VARCHAR, Types.TIMESTAMP, Types.INTEGER, Types.DECIMAL)); - - recordsInTableA = prepareInputRowsInTableA(); - } - - @Before - public void preparePCollections(){ - boundedInput1 = PBegin.in(pipeline).apply("boundedInput1", - Create.of(recordsInTableA).withCoder(new BeamSqlRowCoder(rowTypeInTableA))); - - boundedInput2 = PBegin.in(pipeline).apply("boundedInput2", - Create.of(recordsInTableA.get(0)).withCoder(new BeamSqlRowCoder(rowTypeInTableA))); - - unboundedInput1 = prepareUnboundedPCollection1(); - unboundedInput2 = prepareUnboundedPCollection2(); - } - - private PCollection<BeamSqlRow> prepareUnboundedPCollection1() { - TestStream.Builder<BeamSqlRow> values = TestStream - .create(new BeamSqlRowCoder(rowTypeInTableA)); - - for (BeamSqlRow row : recordsInTableA) { - values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp"))); - values = values.addElements(row); - } - - return PBegin.in(pipeline).apply("unboundedInput1", values.advanceWatermarkToInfinity()); - } - - private PCollection<BeamSqlRow> prepareUnboundedPCollection2() { - TestStream.Builder<BeamSqlRow> values = TestStream - .create(new BeamSqlRowCoder(rowTypeInTableA)); - - BeamSqlRow row = recordsInTableA.get(0); - values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp"))); - values = values.addElements(row); - - return PBegin.in(pipeline).apply("unboundedInput2", values.advanceWatermarkToInfinity()); - } - - private static List<BeamSqlRow> prepareInputRowsInTableA() throws ParseException{ - List<BeamSqlRow> rows = new ArrayList<>(); - - BeamSqlRow row1 = new BeamSqlRow(rowTypeInTableA); - row1.addField(0, 1); - row1.addField(1, 1000L); - row1.addField(2, Short.valueOf("1")); - row1.addField(3, Byte.valueOf("1")); - row1.addField(4, 1.0f); - row1.addField(5, 1.0); - row1.addField(6, "string_row1"); - row1.addField(7, FORMAT.parse("2017-01-01 01:01:03")); - row1.addField(8, 0); - row1.addField(9, new BigDecimal(1)); - rows.add(row1); - - BeamSqlRow row2 = new BeamSqlRow(rowTypeInTableA); - row2.addField(0, 2); - row2.addField(1, 2000L); - row2.addField(2, Short.valueOf("2")); - row2.addField(3, Byte.valueOf("2")); - row2.addField(4, 2.0f); - row2.addField(5, 2.0); - row2.addField(6, "string_row2"); - row2.addField(7, FORMAT.parse("2017-01-01 01:02:03")); - row2.addField(8, 0); - row2.addField(9, new BigDecimal(2)); - rows.add(row2); - - BeamSqlRow row3 = new BeamSqlRow(rowTypeInTableA); - row3.addField(0, 3); - row3.addField(1, 3000L); - row3.addField(2, Short.valueOf("3")); - row3.addField(3, Byte.valueOf("3")); - row3.addField(4, 3.0f); - row3.addField(5, 3.0); - row3.addField(6, "string_row3"); - row3.addField(7, FORMAT.parse("2017-01-01 01:06:03")); - row3.addField(8, 0); - row3.addField(9, new BigDecimal(3)); - rows.add(row3); - - BeamSqlRow row4 = new BeamSqlRow(rowTypeInTableA); - row4.addField(0, 4); - row4.addField(1, 4000L); - row4.addField(2, Short.valueOf("4")); - row4.addField(3, Byte.valueOf("4")); - row4.addField(4, 4.0f); - row4.addField(5, 4.0); - row4.addField(6, "string_row4"); - row4.addField(7, FORMAT.parse("2017-01-01 02:04:03")); - row4.addField(8, 0); - row4.addField(9, new BigDecimal(4)); - rows.add(row4); - - return rows; - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java deleted file mode 100644 index b4b50c1..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslFilterTest.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql; - -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.junit.Test; - -/** - * Tests for WHERE queries with BOUNDED PCollection. - */ -public class BeamSqlDslFilterTest extends BeamSqlDslBase { - /** - * single filter with bounded PCollection. - */ - @Test - public void testSingleFilterWithBounded() throws Exception { - runSingleFilter(boundedInput1); - } - - /** - * single filter with unbounded PCollection. - */ - @Test - public void testSingleFilterWithUnbounded() throws Exception { - runSingleFilter(unboundedInput1); - } - - private void runSingleFilter(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT * FROM PCOLLECTION WHERE f_int = 1"; - - PCollection<BeamSqlRow> result = - input.apply("testSingleFilter", BeamSql.simpleQuery(sql)); - - PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0)); - - pipeline.run().waitUntilFinish(); - } - - /** - * composite filters with bounded PCollection. - */ - @Test - public void testCompositeFilterWithBounded() throws Exception { - runCompositeFilter(boundedInput1); - } - - /** - * composite filters with unbounded PCollection. - */ - @Test - public void testCompositeFilterWithUnbounded() throws Exception { - runCompositeFilter(unboundedInput1); - } - - private void runCompositeFilter(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT * FROM TABLE_A" - + " WHERE f_int > 1 AND (f_long < 3000 OR f_string = 'string_row3')"; - - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) - .apply("testCompositeFilter", BeamSql.query(sql)); - - PAssert.that(result).containsInAnyOrder(recordsInTableA.get(1), recordsInTableA.get(2)); - - pipeline.run().waitUntilFinish(); - } - - /** - * nothing return with filters in bounded PCollection. - */ - @Test - public void testNoReturnFilterWithBounded() throws Exception { - runNoReturnFilter(boundedInput1); - } - - /** - * nothing return with filters in unbounded PCollection. - */ - @Test - public void testNoReturnFilterWithUnbounded() throws Exception { - runNoReturnFilter(unboundedInput1); - } - - private void runNoReturnFilter(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT * FROM TABLE_A WHERE f_int < 1"; - - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) - .apply("testNoReturnFilter", BeamSql.query(sql)); - - PAssert.that(result).empty(); - - pipeline.run().waitUntilFinish(); - } - - @Test - public void testFromInvalidTableName1() throws Exception { - exceptions.expect(IllegalStateException.class); - exceptions.expectMessage("Object 'TABLE_B' not found"); - pipeline.enableAbandonedNodeEnforcement(false); - - String sql = "SELECT * FROM TABLE_B WHERE f_int < 1"; - - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1) - .apply("testFromInvalidTableName1", BeamSql.query(sql)); - - pipeline.run().waitUntilFinish(); - } - - @Test - public void testFromInvalidTableName2() throws Exception { - exceptions.expect(IllegalStateException.class); - exceptions.expectMessage("Use fixed table name PCOLLECTION"); - pipeline.enableAbandonedNodeEnforcement(false); - - String sql = "SELECT * FROM PCOLLECTION_NA"; - - PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql)); - - pipeline.run().waitUntilFinish(); - } - - @Test - public void testInvalidFilter() throws Exception { - exceptions.expect(IllegalStateException.class); - exceptions.expectMessage("Column 'f_int_na' not found in any table"); - pipeline.enableAbandonedNodeEnforcement(false); - - String sql = "SELECT * FROM PCOLLECTION WHERE f_int_na = 0"; - - PCollection<BeamSqlRow> result = boundedInput1.apply(BeamSql.simpleQuery(sql)); - - pipeline.run().waitUntilFinish(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java deleted file mode 100644 index e010915..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql; - -import static org.apache.beam.dsls.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS1; -import static org.apache.beam.dsls.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER_DETAILS2; - -import java.sql.Types; -import java.util.Arrays; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.junit.Rule; -import org.junit.Test; - -/** - * Tests for joins in queries. - */ -public class BeamSqlDslJoinTest { - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - - private static final BeamSqlRowType SOURCE_RECORD_TYPE = - BeamSqlRowType.create( - Arrays.asList( - "order_id", "site_id", "price" - ), - Arrays.asList( - Types.INTEGER, Types.INTEGER, Types.INTEGER - ) - ); - - private static final BeamSqlRowCoder SOURCE_CODER = - new BeamSqlRowCoder(SOURCE_RECORD_TYPE); - - private static final BeamSqlRowType RESULT_RECORD_TYPE = - BeamSqlRowType.create( - Arrays.asList( - "order_id", "site_id", "price", "order_id0", "site_id0", "price0" - ), - Arrays.asList( - Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.INTEGER - , Types.INTEGER, Types.INTEGER - ) - ); - - private static final BeamSqlRowCoder RESULT_CODER = - new BeamSqlRowCoder(RESULT_RECORD_TYPE); - - @Test - public void testInnerJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1" - + " JOIN ORDER_DETAILS2 o2" - + " on " - + " o1.order_id=o2.site_id AND o2.price=o1.site_id" - ; - - PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder( - TestUtils.RowsBuilder.of( - RESULT_RECORD_TYPE - ).addRows( - 2, 3, 3, 1, 2, 3 - ).getRows()); - pipeline.run(); - } - - @Test - public void testLeftOuterJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1" - + " LEFT OUTER JOIN ORDER_DETAILS2 o2" - + " on " - + " o1.order_id=o2.site_id AND o2.price=o1.site_id" - ; - - PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder( - TestUtils.RowsBuilder.of( - RESULT_RECORD_TYPE - ).addRows( - 1, 2, 3, null, null, null, - 2, 3, 3, 1, 2, 3, - 3, 4, 5, null, null, null - ).getRows()); - pipeline.run(); - } - - @Test - public void testRightOuterJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1" - + " RIGHT OUTER JOIN ORDER_DETAILS2 o2" - + " on " - + " o1.order_id=o2.site_id AND o2.price=o1.site_id" - ; - - PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder( - TestUtils.RowsBuilder.of( - RESULT_RECORD_TYPE - ).addRows( - 2, 3, 3, 1, 2, 3, - null, null, null, 2, 3, 3, - null, null, null, 3, 4, 5 - ).getRows()); - pipeline.run(); - } - - @Test - public void testFullOuterJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1" - + " FULL OUTER JOIN ORDER_DETAILS2 o2" - + " on " - + " o1.order_id=o2.site_id AND o2.price=o1.site_id" - ; - - PAssert.that(queryFromOrderTables(sql)).containsInAnyOrder( - TestUtils.RowsBuilder.of( - RESULT_RECORD_TYPE - ).addRows( - 2, 3, 3, 1, 2, 3, - 1, 2, 3, null, null, null, - 3, 4, 5, null, null, null, - null, null, null, 2, 3, 3, - null, null, null, 3, 4, 5 - ).getRows()); - pipeline.run(); - } - - @Test(expected = IllegalStateException.class) - public void testException_nonEqualJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1" - + " JOIN ORDER_DETAILS2 o2" - + " on " - + " o1.order_id>o2.site_id" - ; - - pipeline.enableAbandonedNodeEnforcement(false); - queryFromOrderTables(sql); - pipeline.run(); - } - - @Test(expected = IllegalStateException.class) - public void testException_crossJoin() throws Exception { - String sql = - "SELECT * " - + "FROM ORDER_DETAILS1 o1, ORDER_DETAILS2 o2"; - - pipeline.enableAbandonedNodeEnforcement(false); - queryFromOrderTables(sql); - pipeline.run(); - } - - private PCollection<BeamSqlRow> queryFromOrderTables(String sql) { - return PCollectionTuple - .of( - new TupleTag<BeamSqlRow>("ORDER_DETAILS1"), - ORDER_DETAILS1.buildIOReader(pipeline).setCoder(SOURCE_CODER) - ) - .and(new TupleTag<BeamSqlRow>("ORDER_DETAILS2"), - ORDER_DETAILS2.buildIOReader(pipeline).setCoder(SOURCE_CODER) - ).apply("join", BeamSql.query(sql)).setCoder(RESULT_CODER); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java deleted file mode 100644 index ab5a639..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql; - -import java.sql.Types; -import java.util.Arrays; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.junit.Test; - -/** - * Tests for field-project in queries with BOUNDED PCollection. - */ -public class BeamSqlDslProjectTest extends BeamSqlDslBase { - /** - * select all fields with bounded PCollection. - */ - @Test - public void testSelectAllWithBounded() throws Exception { - runSelectAll(boundedInput2); - } - - /** - * select all fields with unbounded PCollection. - */ - @Test - public void testSelectAllWithUnbounded() throws Exception { - runSelectAll(unboundedInput2); - } - - private void runSelectAll(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT * FROM PCOLLECTION"; - - PCollection<BeamSqlRow> result = - input.apply("testSelectAll", BeamSql.simpleQuery(sql)); - - PAssert.that(result).containsInAnyOrder(recordsInTableA.get(0)); - - pipeline.run().waitUntilFinish(); - } - - /** - * select partial fields with bounded PCollection. - */ - @Test - public void testPartialFieldsWithBounded() throws Exception { - runPartialFields(boundedInput2); - } - - /** - * select partial fields with unbounded PCollection. - */ - @Test - public void testPartialFieldsWithUnbounded() throws Exception { - runPartialFields(unboundedInput2); - } - - private void runPartialFields(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT f_int, f_long FROM TABLE_A"; - - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) - .apply("testPartialFields", BeamSql.query(sql)); - - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"), - Arrays.asList(Types.INTEGER, Types.BIGINT)); - - BeamSqlRow record = new BeamSqlRow(resultType); - record.addField("f_int", recordsInTableA.get(0).getFieldValue(0)); - record.addField("f_long", recordsInTableA.get(0).getFieldValue(1)); - - PAssert.that(result).containsInAnyOrder(record); - - pipeline.run().waitUntilFinish(); - } - - /** - * select partial fields for multiple rows with bounded PCollection. - */ - @Test - public void testPartialFieldsInMultipleRowWithBounded() throws Exception { - runPartialFieldsInMultipleRow(boundedInput1); - } - - /** - * select partial fields for multiple rows with unbounded PCollection. - */ - @Test - public void testPartialFieldsInMultipleRowWithUnbounded() throws Exception { - runPartialFieldsInMultipleRow(unboundedInput1); - } - - private void runPartialFieldsInMultipleRow(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT f_int, f_long FROM TABLE_A"; - - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) - .apply("testPartialFieldsInMultipleRow", BeamSql.query(sql)); - - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"), - Arrays.asList(Types.INTEGER, Types.BIGINT)); - - BeamSqlRow record1 = new BeamSqlRow(resultType); - record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0)); - record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1)); - - BeamSqlRow record2 = new BeamSqlRow(resultType); - record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0)); - record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1)); - - BeamSqlRow record3 = new BeamSqlRow(resultType); - record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0)); - record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1)); - - BeamSqlRow record4 = new BeamSqlRow(resultType); - record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0)); - record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1)); - - PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4); - - pipeline.run().waitUntilFinish(); - } - - /** - * select partial fields with bounded PCollection. - */ - @Test - public void testPartialFieldsInRowsWithBounded() throws Exception { - runPartialFieldsInRows(boundedInput1); - } - - /** - * select partial fields with unbounded PCollection. - */ - @Test - public void testPartialFieldsInRowsWithUnbounded() throws Exception { - runPartialFieldsInRows(unboundedInput1); - } - - private void runPartialFieldsInRows(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT f_int, f_long FROM TABLE_A"; - - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) - .apply("testPartialFieldsInRows", BeamSql.query(sql)); - - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"), - Arrays.asList(Types.INTEGER, Types.BIGINT)); - - BeamSqlRow record1 = new BeamSqlRow(resultType); - record1.addField("f_int", recordsInTableA.get(0).getFieldValue(0)); - record1.addField("f_long", recordsInTableA.get(0).getFieldValue(1)); - - BeamSqlRow record2 = new BeamSqlRow(resultType); - record2.addField("f_int", recordsInTableA.get(1).getFieldValue(0)); - record2.addField("f_long", recordsInTableA.get(1).getFieldValue(1)); - - BeamSqlRow record3 = new BeamSqlRow(resultType); - record3.addField("f_int", recordsInTableA.get(2).getFieldValue(0)); - record3.addField("f_long", recordsInTableA.get(2).getFieldValue(1)); - - BeamSqlRow record4 = new BeamSqlRow(resultType); - record4.addField("f_int", recordsInTableA.get(3).getFieldValue(0)); - record4.addField("f_long", recordsInTableA.get(3).getFieldValue(1)); - - PAssert.that(result).containsInAnyOrder(record1, record2, record3, record4); - - pipeline.run().waitUntilFinish(); - } - - /** - * select literal field with bounded PCollection. - */ - @Test - public void testLiteralFieldWithBounded() throws Exception { - runLiteralField(boundedInput2); - } - - /** - * select literal field with unbounded PCollection. - */ - @Test - public void testLiteralFieldWithUnbounded() throws Exception { - runLiteralField(unboundedInput2); - } - - public void runLiteralField(PCollection<BeamSqlRow> input) throws Exception { - String sql = "SELECT 1 as literal_field FROM TABLE_A"; - - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) - .apply("testLiteralField", BeamSql.query(sql)); - - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("literal_field"), - Arrays.asList(Types.INTEGER)); - - BeamSqlRow record = new BeamSqlRow(resultType); - record.addField("literal_field", 1); - - PAssert.that(result).containsInAnyOrder(record); - - pipeline.run().waitUntilFinish(); - } - - @Test - public void testProjectUnknownField() throws Exception { - exceptions.expect(IllegalStateException.class); - exceptions.expectMessage("Column 'f_int_na' not found in any table"); - pipeline.enableAbandonedNodeEnforcement(false); - - String sql = "SELECT f_int_na FROM TABLE_A"; - - PCollection<BeamSqlRow> result = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), boundedInput1) - .apply("testProjectUnknownField", BeamSql.query(sql)); - - pipeline.run().waitUntilFinish(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java deleted file mode 100644 index 726f658..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql; - -import java.sql.Types; -import java.util.Arrays; -import java.util.Iterator; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.dsls.sql.schema.BeamSqlUdaf; -import org.apache.beam.dsls.sql.schema.BeamSqlUdf; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.junit.Test; - -/** - * Tests for UDF/UDAF. - */ -public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { - /** - * GROUP-BY with UDAF. - */ - @Test - public void testUdaf() throws Exception { - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "squaresum"), - Arrays.asList(Types.INTEGER, Types.INTEGER)); - - BeamSqlRow record = new BeamSqlRow(resultType); - record.addField("f_int2", 0); - record.addField("squaresum", 30); - - String sql1 = "SELECT f_int2, squaresum1(f_int) AS `squaresum`" - + " FROM PCOLLECTION GROUP BY f_int2"; - PCollection<BeamSqlRow> result1 = - boundedInput1.apply("testUdaf1", - BeamSql.simpleQuery(sql1).withUdaf("squaresum1", SquareSum.class)); - PAssert.that(result1).containsInAnyOrder(record); - - String sql2 = "SELECT f_int2, squaresum2(f_int) AS `squaresum`" - + " FROM PCOLLECTION GROUP BY f_int2"; - PCollection<BeamSqlRow> result2 = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1) - .apply("testUdaf2", - BeamSql.query(sql2).withUdaf("squaresum2", SquareSum.class)); - PAssert.that(result2).containsInAnyOrder(record); - - pipeline.run().waitUntilFinish(); - } - - /** - * test UDF. - */ - @Test - public void testUdf() throws Exception{ - BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "cubicvalue"), - Arrays.asList(Types.INTEGER, Types.INTEGER)); - - BeamSqlRow record = new BeamSqlRow(resultType); - record.addField("f_int", 2); - record.addField("cubicvalue", 8); - - String sql1 = "SELECT f_int, cubic1(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2"; - PCollection<BeamSqlRow> result1 = - boundedInput1.apply("testUdf1", - BeamSql.simpleQuery(sql1).withUdf("cubic1", CubicInteger.class)); - PAssert.that(result1).containsInAnyOrder(record); - - String sql2 = "SELECT f_int, cubic2(f_int) as cubicvalue FROM PCOLLECTION WHERE f_int = 2"; - PCollection<BeamSqlRow> result2 = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("PCOLLECTION"), boundedInput1) - .apply("testUdf2", - BeamSql.query(sql2).withUdf("cubic2", CubicInteger.class)); - PAssert.that(result2).containsInAnyOrder(record); - - pipeline.run().waitUntilFinish(); - } - - /** - * UDAF for test, which returns the sum of square. - */ - public static class SquareSum extends BeamSqlUdaf<Integer, Integer, Integer> { - - public SquareSum() { - } - - @Override - public Integer init() { - return 0; - } - - @Override - public Integer add(Integer accumulator, Integer input) { - return accumulator + input * input; - } - - @Override - public Integer merge(Iterable<Integer> accumulators) { - int v = 0; - Iterator<Integer> ite = accumulators.iterator(); - while (ite.hasNext()) { - v += ite.next(); - } - return v; - } - - @Override - public Integer result(Integer accumulator) { - return accumulator; - } - - } - - /** - * A example UDF for test. - */ - public static class CubicInteger implements BeamSqlUdf{ - public static Integer eval(Integer input){ - return input * input * input; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java deleted file mode 100644 index a669635..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.transforms.DoFn; - -/** - * Test utilities. - */ -public class TestUtils { - /** - * A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code String}. - */ - public static class BeamSqlRow2StringDoFn extends DoFn<BeamSqlRow, String> { - @ProcessElement - public void processElement(ProcessContext ctx) { - ctx.output(ctx.element().valueInString()); - } - } - - /** - * Convert list of {@code BeamSqlRow} to list of {@code String}. - */ - public static List<String> beamSqlRows2Strings(List<BeamSqlRow> rows) { - List<String> strs = new ArrayList<>(); - for (BeamSqlRow row : rows) { - strs.add(row.valueInString()); - } - - return strs; - } - - /** - * Convenient way to build a list of {@code BeamSqlRow}s. - * - * <p>You can use it like this: - * - * <pre>{@code - * TestUtils.RowsBuilder.of( - * Types.INTEGER, "order_id", - * Types.INTEGER, "sum_site_id", - * Types.VARCHAR, "buyer" - * ).addRows( - * 1, 3, "james", - * 2, 5, "bond" - * ).getStringRows() - * }</pre> - * {@code} - */ - public static class RowsBuilder { - private BeamSqlRowType type; - private List<BeamSqlRow> rows = new ArrayList<>(); - - /** - * Create a RowsBuilder with the specified row type info. - * - * <p>For example: - * <pre>{@code - * TestUtils.RowsBuilder.of( - * Types.INTEGER, "order_id", - * Types.INTEGER, "sum_site_id", - * Types.VARCHAR, "buyer" - * )}</pre> - * - * @args pairs of column type and column names. - */ - public static RowsBuilder of(final Object... args) { - BeamSqlRowType beamSQLRowType = buildBeamSqlRowType(args); - RowsBuilder builder = new RowsBuilder(); - builder.type = beamSQLRowType; - - return builder; - } - - /** - * Create a RowsBuilder with the specified row type info. - * - * <p>For example: - * <pre>{@code - * TestUtils.RowsBuilder.of( - * beamSqlRowType - * )}</pre> - * @beamSQLRowType the record type. - */ - public static RowsBuilder of(final BeamSqlRowType beamSQLRowType) { - RowsBuilder builder = new RowsBuilder(); - builder.type = beamSQLRowType; - - return builder; - } - - /** - * Add rows to the builder. - * - * <p>Note: check the class javadoc for for detailed example. - */ - public RowsBuilder addRows(final Object... args) { - this.rows.addAll(buildRows(type, Arrays.asList(args))); - return this; - } - - /** - * Add rows to the builder. - * - * <p>Note: check the class javadoc for for detailed example. - */ - public RowsBuilder addRows(final List args) { - this.rows.addAll(buildRows(type, args)); - return this; - } - - public List<BeamSqlRow> getRows() { - return rows; - } - - public List<String> getStringRows() { - return beamSqlRows2Strings(rows); - } - } - - /** - * Convenient way to build a {@code BeamSqlRowType}. - * - * <p>e.g. - * - * <pre>{@code - * buildBeamSqlRowType( - * Types.BIGINT, "order_id", - * Types.INTEGER, "site_id", - * Types.DOUBLE, "price", - * Types.TIMESTAMP, "order_time" - * ) - * }</pre> - */ - public static BeamSqlRowType buildBeamSqlRowType(Object... args) { - List<Integer> types = new ArrayList<>(); - List<String> names = new ArrayList<>(); - - for (int i = 0; i < args.length - 1; i += 2) { - types.add((int) args[i]); - names.add((String) args[i + 1]); - } - - return BeamSqlRowType.create(names, types); - } - - /** - * Convenient way to build a {@code BeamSqlRow}s. - * - * <p>e.g. - * - * <pre>{@code - * buildRows( - * rowType, - * 1, 1, 1, // the first row - * 2, 2, 2, // the second row - * ... - * ) - * }</pre> - */ - public static List<BeamSqlRow> buildRows(BeamSqlRowType type, List args) { - List<BeamSqlRow> rows = new ArrayList<>(); - int fieldCount = type.size(); - - for (int i = 0; i < args.size(); i += fieldCount) { - BeamSqlRow row = new BeamSqlRow(type); - for (int j = 0; j < fieldCount; j++) { - row.addField(j, args.get(i + j)); - } - rows.add(row); - } - return rows; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java deleted file mode 100644 index 947660a..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.integrationtest; - -import java.math.BigDecimal; -import java.math.RoundingMode; -import org.junit.Test; - -/** - * Integration test for arithmetic operators. - */ -public class BeamSqlArithmeticOperatorsIntegrationTest - extends BeamSqlBuiltinFunctionsIntegrationTestBase { - - private static final BigDecimal ZERO = BigDecimal.valueOf(0.0); - private static final BigDecimal ONE0 = BigDecimal.valueOf(1); - private static final BigDecimal ONE = BigDecimal.valueOf(1.0); - private static final BigDecimal ONE2 = BigDecimal.valueOf(1.0).multiply(BigDecimal.valueOf(1.0)); - private static final BigDecimal ONE10 = BigDecimal.ONE.divide( - BigDecimal.ONE, 10, RoundingMode.HALF_EVEN); - private static final BigDecimal TWO = BigDecimal.valueOf(2.0); - - @Test - public void testPlus() throws Exception { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("1 + 1", 2) - .addExpr("1.0 + 1", TWO) - .addExpr("1 + 1.0", TWO) - .addExpr("1.0 + 1.0", TWO) - .addExpr("c_tinyint + c_tinyint", (byte) 2) - .addExpr("c_smallint + c_smallint", (short) 2) - .addExpr("c_bigint + c_bigint", 2L) - .addExpr("c_decimal + c_decimal", TWO) - .addExpr("c_tinyint + c_decimal", TWO) - .addExpr("c_float + c_decimal", 2.0) - .addExpr("c_double + c_decimal", 2.0) - .addExpr("c_float + c_float", 2.0f) - .addExpr("c_double + c_float", 2.0) - .addExpr("c_double + c_double", 2.0) - .addExpr("c_float + c_bigint", 2.0f) - .addExpr("c_double + c_bigint", 2.0) - ; - - checker.buildRunAndCheck(); - } - - @Test - public void testPlus_overflow() throws Exception { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_tinyint_max + c_tinyint_max", (byte) -2) - .addExpr("c_smallint_max + c_smallint_max", (short) -2) - .addExpr("c_integer_max + c_integer_max", -2) - // yeah, I know 384L is strange, but since it is already overflowed - // what the actualy result is not so important, it is wrong any way. - .addExpr("c_bigint_max + c_bigint_max", 384L) - ; - - checker.buildRunAndCheck(); - } - - @Test - public void testMinus() throws Exception { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("1 - 1", 0) - .addExpr("1.0 - 1", ZERO) - .addExpr("1 - 0.0", ONE) - .addExpr("1.0 - 1.0", ZERO) - .addExpr("c_tinyint - c_tinyint", (byte) 0) - .addExpr("c_smallint - c_smallint", (short) 0) - .addExpr("c_bigint - c_bigint", 0L) - .addExpr("c_decimal - c_decimal", ZERO) - .addExpr("c_tinyint - c_decimal", ZERO) - .addExpr("c_float - c_decimal", 0.0) - .addExpr("c_double - c_decimal", 0.0) - .addExpr("c_float - c_float", 0.0f) - .addExpr("c_double - c_float", 0.0) - .addExpr("c_double - c_double", 0.0) - .addExpr("c_float - c_bigint", 0.0f) - .addExpr("c_double - c_bigint", 0.0) - ; - - checker.buildRunAndCheck(); - } - - @Test - public void testMultiply() throws Exception { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("1 * 1", 1) - .addExpr("1.0 * 1", ONE2) - .addExpr("1 * 1.0", ONE2) - .addExpr("1.0 * 1.0", ONE2) - .addExpr("c_tinyint * c_tinyint", (byte) 1) - .addExpr("c_smallint * c_smallint", (short) 1) - .addExpr("c_bigint * c_bigint", 1L) - .addExpr("c_decimal * c_decimal", ONE2) - .addExpr("c_tinyint * c_decimal", ONE2) - .addExpr("c_float * c_decimal", 1.0) - .addExpr("c_double * c_decimal", 1.0) - .addExpr("c_float * c_float", 1.0f) - .addExpr("c_double * c_float", 1.0) - .addExpr("c_double * c_double", 1.0) - .addExpr("c_float * c_bigint", 1.0f) - .addExpr("c_double * c_bigint", 1.0) - ; - - checker.buildRunAndCheck(); - } - - @Test - public void testDivide() throws Exception { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("1 / 1", 1) - .addExpr("1.0 / 1", ONE10) - .addExpr("1 / 1.0", ONE10) - .addExpr("1.0 / 1.0", ONE10) - .addExpr("c_tinyint / c_tinyint", (byte) 1) - .addExpr("c_smallint / c_smallint", (short) 1) - .addExpr("c_bigint / c_bigint", 1L) - .addExpr("c_decimal / c_decimal", ONE10) - .addExpr("c_tinyint / c_decimal", ONE10) - .addExpr("c_float / c_decimal", 1.0) - .addExpr("c_double / c_decimal", 1.0) - .addExpr("c_float / c_float", 1.0f) - .addExpr("c_double / c_float", 1.0) - .addExpr("c_double / c_double", 1.0) - .addExpr("c_float / c_bigint", 1.0f) - .addExpr("c_double / c_bigint", 1.0) - ; - - checker.buildRunAndCheck(); - } - - @Test - public void testMod() throws Exception { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("mod(1, 1)", 0) - .addExpr("mod(1.0, 1)", 0) - .addExpr("mod(1, 1.0)", ZERO) - .addExpr("mod(1.0, 1.0)", ZERO) - .addExpr("mod(c_tinyint, c_tinyint)", (byte) 0) - .addExpr("mod(c_smallint, c_smallint)", (short) 0) - .addExpr("mod(c_bigint, c_bigint)", 0L) - .addExpr("mod(c_decimal, c_decimal)", ZERO) - .addExpr("mod(c_tinyint, c_decimal)", ZERO) - ; - - checker.buildRunAndCheck(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java deleted file mode 100644 index b9ce9b4..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.integrationtest; - -import com.google.common.base.Joiner; -import java.math.BigDecimal; -import java.sql.Types; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TimeZone; -import org.apache.beam.dsls.sql.BeamSql; -import org.apache.beam.dsls.sql.TestUtils; -import org.apache.beam.dsls.sql.mock.MockedBoundedTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.util.Pair; -import org.junit.Rule; - -/** - * Base class for all built-in functions integration tests. - */ -public class BeamSqlBuiltinFunctionsIntegrationTestBase { - private static final Map<Class, Integer> JAVA_CLASS_TO_SQL_TYPE = new HashMap<>(); - static { - JAVA_CLASS_TO_SQL_TYPE.put(Byte.class, Types.TINYINT); - JAVA_CLASS_TO_SQL_TYPE.put(Short.class, Types.SMALLINT); - JAVA_CLASS_TO_SQL_TYPE.put(Integer.class, Types.INTEGER); - JAVA_CLASS_TO_SQL_TYPE.put(Long.class, Types.BIGINT); - JAVA_CLASS_TO_SQL_TYPE.put(Float.class, Types.FLOAT); - JAVA_CLASS_TO_SQL_TYPE.put(Double.class, Types.DOUBLE); - JAVA_CLASS_TO_SQL_TYPE.put(BigDecimal.class, Types.DECIMAL); - JAVA_CLASS_TO_SQL_TYPE.put(String.class, Types.VARCHAR); - JAVA_CLASS_TO_SQL_TYPE.put(Date.class, Types.DATE); - JAVA_CLASS_TO_SQL_TYPE.put(Boolean.class, Types.BOOLEAN); - } - - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - - protected PCollection<BeamSqlRow> getTestPCollection() { - BeamSqlRowType type = BeamSqlRowType.create( - Arrays.asList("ts", "c_tinyint", "c_smallint", - "c_integer", "c_bigint", "c_float", "c_double", "c_decimal", - "c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"), - Arrays.asList(Types.DATE, Types.TINYINT, Types.SMALLINT, - Types.INTEGER, Types.BIGINT, Types.FLOAT, Types.DOUBLE, Types.DECIMAL, - Types.TINYINT, Types.SMALLINT, Types.INTEGER, Types.BIGINT) - ); - try { - return MockedBoundedTable - .of(type) - .addRows( - parseDate("1986-02-15 11:35:26"), - (byte) 1, - (short) 1, - 1, - 1L, - 1.0f, - 1.0, - BigDecimal.ONE, - (byte) 127, - (short) 32767, - 2147483647, - 9223372036854775807L - ) - .buildIOReader(pipeline) - .setCoder(new BeamSqlRowCoder(type)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - protected static Date parseDate(String str) { - try { - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - sdf.setTimeZone(TimeZone.getTimeZone("GMT")); - return sdf.parse(str); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - - /** - * Helper class to make write integration test for built-in functions easier. - * - * <p>example usage: - * <pre>{@code - * ExpressionChecker checker = new ExpressionChecker() - * .addExpr("1 + 1", 2) - * .addExpr("1.0 + 1", 2.0) - * .addExpr("1 + 1.0", 2.0) - * .addExpr("1.0 + 1.0", 2.0) - * .addExpr("c_tinyint + c_tinyint", (byte) 2); - * checker.buildRunAndCheck(inputCollections); - * }</pre> - */ - public class ExpressionChecker { - private transient List<Pair<String, Object>> exps = new ArrayList<>(); - - public ExpressionChecker addExpr(String expression, Object expectedValue) { - exps.add(Pair.of(expression, expectedValue)); - return this; - } - - private String getSql() { - List<String> expStrs = new ArrayList<>(); - for (Pair<String, Object> pair : exps) { - expStrs.add(pair.getKey()); - } - return "SELECT " + Joiner.on(",\n ").join(expStrs) + " FROM PCOLLECTION"; - } - - /** - * Build the corresponding SQL, compile to Beam Pipeline, run it, and check the result. - */ - public void buildRunAndCheck() { - PCollection<BeamSqlRow> inputCollection = getTestPCollection(); - System.out.println("SQL:>\n" + getSql()); - try { - List<String> names = new ArrayList<>(); - List<Integer> types = new ArrayList<>(); - List<Object> values = new ArrayList<>(); - - for (Pair<String, Object> pair : exps) { - names.add(pair.getKey()); - types.add(JAVA_CLASS_TO_SQL_TYPE.get(pair.getValue().getClass())); - values.add(pair.getValue()); - } - - PCollection<BeamSqlRow> rows = inputCollection.apply(BeamSql.simpleQuery(getSql())); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder - .of(BeamSqlRowType.create(names, types)) - .addRows(values) - .getRows() - ); - inputCollection.getPipeline().run(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java deleted file mode 100644 index 5502ad4..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlComparisonOperatorsIntegrationTest.java +++ /dev/null @@ -1,330 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.integrationtest; - -import java.math.BigDecimal; -import java.sql.Types; -import java.util.Arrays; -import org.apache.beam.dsls.sql.mock.MockedBoundedTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.values.PCollection; -import org.junit.Test; - -/** - * Integration test for comparison operators. - */ -public class BeamSqlComparisonOperatorsIntegrationTest - extends BeamSqlBuiltinFunctionsIntegrationTestBase { - - @Test - public void testEquals() { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_tinyint_1 = c_tinyint_1", true) - .addExpr("c_tinyint_1 = c_tinyint_2", false) - .addExpr("c_smallint_1 = c_smallint_1", true) - .addExpr("c_smallint_1 = c_smallint_2", false) - .addExpr("c_integer_1 = c_integer_1", true) - .addExpr("c_integer_1 = c_integer_2", false) - .addExpr("c_bigint_1 = c_bigint_1", true) - .addExpr("c_bigint_1 = c_bigint_2", false) - .addExpr("c_float_1 = c_float_1", true) - .addExpr("c_float_1 = c_float_2", false) - .addExpr("c_double_1 = c_double_1", true) - .addExpr("c_double_1 = c_double_2", false) - .addExpr("c_decimal_1 = c_decimal_1", true) - .addExpr("c_decimal_1 = c_decimal_2", false) - .addExpr("c_varchar_1 = c_varchar_1", true) - .addExpr("c_varchar_1 = c_varchar_2", false) - .addExpr("c_boolean_true = c_boolean_true", true) - .addExpr("c_boolean_true = c_boolean_false", false) - - ; - checker.buildRunAndCheck(); - } - - @Test - public void testNotEquals() { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_tinyint_1 <> c_tinyint_1", false) - .addExpr("c_tinyint_1 <> c_tinyint_2", true) - .addExpr("c_smallint_1 <> c_smallint_1", false) - .addExpr("c_smallint_1 <> c_smallint_2", true) - .addExpr("c_integer_1 <> c_integer_1", false) - .addExpr("c_integer_1 <> c_integer_2", true) - .addExpr("c_bigint_1 <> c_bigint_1", false) - .addExpr("c_bigint_1 <> c_bigint_2", true) - .addExpr("c_float_1 <> c_float_1", false) - .addExpr("c_float_1 <> c_float_2", true) - .addExpr("c_double_1 <> c_double_1", false) - .addExpr("c_double_1 <> c_double_2", true) - .addExpr("c_decimal_1 <> c_decimal_1", false) - .addExpr("c_decimal_1 <> c_decimal_2", true) - .addExpr("c_varchar_1 <> c_varchar_1", false) - .addExpr("c_varchar_1 <> c_varchar_2", true) - .addExpr("c_boolean_true <> c_boolean_true", false) - .addExpr("c_boolean_true <> c_boolean_false", true) - ; - checker.buildRunAndCheck(); - } - - @Test - public void testGreaterThan() { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_tinyint_2 > c_tinyint_1", true) - .addExpr("c_tinyint_1 > c_tinyint_1", false) - .addExpr("c_tinyint_1 > c_tinyint_2", false) - - .addExpr("c_smallint_2 > c_smallint_1", true) - .addExpr("c_smallint_1 > c_smallint_1", false) - .addExpr("c_smallint_1 > c_smallint_2", false) - - .addExpr("c_integer_2 > c_integer_1", true) - .addExpr("c_integer_1 > c_integer_1", false) - .addExpr("c_integer_1 > c_integer_2", false) - - .addExpr("c_bigint_2 > c_bigint_1", true) - .addExpr("c_bigint_1 > c_bigint_1", false) - .addExpr("c_bigint_1 > c_bigint_2", false) - - .addExpr("c_float_2 > c_float_1", true) - .addExpr("c_float_1 > c_float_1", false) - .addExpr("c_float_1 > c_float_2", false) - - .addExpr("c_double_2 > c_double_1", true) - .addExpr("c_double_1 > c_double_1", false) - .addExpr("c_double_1 > c_double_2", false) - - .addExpr("c_decimal_2 > c_decimal_1", true) - .addExpr("c_decimal_1 > c_decimal_1", false) - .addExpr("c_decimal_1 > c_decimal_2", false) - - .addExpr("c_varchar_2 > c_varchar_1", true) - .addExpr("c_varchar_1 > c_varchar_1", false) - .addExpr("c_varchar_1 > c_varchar_2", false) - ; - - checker.buildRunAndCheck(); - } - - @Test(expected = RuntimeException.class) - public void testGreaterThanException() { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_boolean_false > c_boolean_true", false); - checker.buildRunAndCheck(); - } - - @Test - public void testGreaterThanOrEquals() { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_tinyint_2 >= c_tinyint_1", true) - .addExpr("c_tinyint_1 >= c_tinyint_1", true) - .addExpr("c_tinyint_1 >= c_tinyint_2", false) - - .addExpr("c_smallint_2 >= c_smallint_1", true) - .addExpr("c_smallint_1 >= c_smallint_1", true) - .addExpr("c_smallint_1 >= c_smallint_2", false) - - .addExpr("c_integer_2 >= c_integer_1", true) - .addExpr("c_integer_1 >= c_integer_1", true) - .addExpr("c_integer_1 >= c_integer_2", false) - - .addExpr("c_bigint_2 >= c_bigint_1", true) - .addExpr("c_bigint_1 >= c_bigint_1", true) - .addExpr("c_bigint_1 >= c_bigint_2", false) - - .addExpr("c_float_2 >= c_float_1", true) - .addExpr("c_float_1 >= c_float_1", true) - .addExpr("c_float_1 >= c_float_2", false) - - .addExpr("c_double_2 >= c_double_1", true) - .addExpr("c_double_1 >= c_double_1", true) - .addExpr("c_double_1 >= c_double_2", false) - - .addExpr("c_decimal_2 >= c_decimal_1", true) - .addExpr("c_decimal_1 >= c_decimal_1", true) - .addExpr("c_decimal_1 >= c_decimal_2", false) - - .addExpr("c_varchar_2 >= c_varchar_1", true) - .addExpr("c_varchar_1 >= c_varchar_1", true) - .addExpr("c_varchar_1 >= c_varchar_2", false) - ; - - checker.buildRunAndCheck(); - } - - @Test(expected = RuntimeException.class) - public void testGreaterThanOrEqualsException() { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_boolean_false >= c_boolean_true", false); - checker.buildRunAndCheck(); - } - - @Test - public void testLessThan() { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_tinyint_2 < c_tinyint_1", false) - .addExpr("c_tinyint_1 < c_tinyint_1", false) - .addExpr("c_tinyint_1 < c_tinyint_2", true) - - .addExpr("c_smallint_2 < c_smallint_1", false) - .addExpr("c_smallint_1 < c_smallint_1", false) - .addExpr("c_smallint_1 < c_smallint_2", true) - - .addExpr("c_integer_2 < c_integer_1", false) - .addExpr("c_integer_1 < c_integer_1", false) - .addExpr("c_integer_1 < c_integer_2", true) - - .addExpr("c_bigint_2 < c_bigint_1", false) - .addExpr("c_bigint_1 < c_bigint_1", false) - .addExpr("c_bigint_1 < c_bigint_2", true) - - .addExpr("c_float_2 < c_float_1", false) - .addExpr("c_float_1 < c_float_1", false) - .addExpr("c_float_1 < c_float_2", true) - - .addExpr("c_double_2 < c_double_1", false) - .addExpr("c_double_1 < c_double_1", false) - .addExpr("c_double_1 < c_double_2", true) - - .addExpr("c_decimal_2 < c_decimal_1", false) - .addExpr("c_decimal_1 < c_decimal_1", false) - .addExpr("c_decimal_1 < c_decimal_2", true) - - .addExpr("c_varchar_2 < c_varchar_1", false) - .addExpr("c_varchar_1 < c_varchar_1", false) - .addExpr("c_varchar_1 < c_varchar_2", true) - ; - - checker.buildRunAndCheck(); - } - - @Test(expected = RuntimeException.class) - public void testLessThanException() { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_boolean_false < c_boolean_true", false); - checker.buildRunAndCheck(); - } - - @Test - public void testLessThanOrEquals() { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_tinyint_2 <= c_tinyint_1", false) - .addExpr("c_tinyint_1 <= c_tinyint_1", true) - .addExpr("c_tinyint_1 <= c_tinyint_2", true) - - .addExpr("c_smallint_2 <= c_smallint_1", false) - .addExpr("c_smallint_1 <= c_smallint_1", true) - .addExpr("c_smallint_1 <= c_smallint_2", true) - - .addExpr("c_integer_2 <= c_integer_1", false) - .addExpr("c_integer_1 <= c_integer_1", true) - .addExpr("c_integer_1 <= c_integer_2", true) - - .addExpr("c_bigint_2 <= c_bigint_1", false) - .addExpr("c_bigint_1 <= c_bigint_1", true) - .addExpr("c_bigint_1 <= c_bigint_2", true) - - .addExpr("c_float_2 <= c_float_1", false) - .addExpr("c_float_1 <= c_float_1", true) - .addExpr("c_float_1 <= c_float_2", true) - - .addExpr("c_double_2 <= c_double_1", false) - .addExpr("c_double_1 <= c_double_1", true) - .addExpr("c_double_1 <= c_double_2", true) - - .addExpr("c_decimal_2 <= c_decimal_1", false) - .addExpr("c_decimal_1 <= c_decimal_1", true) - .addExpr("c_decimal_1 <= c_decimal_2", true) - - .addExpr("c_varchar_2 <= c_varchar_1", false) - .addExpr("c_varchar_1 <= c_varchar_1", true) - .addExpr("c_varchar_1 <= c_varchar_2", true) - ; - - checker.buildRunAndCheck(); - } - - @Test(expected = RuntimeException.class) - public void testLessThanOrEqualsException() { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_boolean_false <= c_boolean_true", false); - checker.buildRunAndCheck(); - } - - @Test - public void testIsNullAndIsNotNull() throws Exception { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("1 IS NOT NULL", true) - .addExpr("NULL IS NOT NULL", false) - - .addExpr("1 IS NULL", false) - .addExpr("NULL IS NULL", true) - ; - - checker.buildRunAndCheck(); - } - - @Override protected PCollection<BeamSqlRow> getTestPCollection() { - BeamSqlRowType type = BeamSqlRowType.create( - Arrays.asList( - "c_tinyint_0", "c_tinyint_1", "c_tinyint_2", - "c_smallint_0", "c_smallint_1", "c_smallint_2", - "c_integer_0", "c_integer_1", "c_integer_2", - "c_bigint_0", "c_bigint_1", "c_bigint_2", - "c_float_0", "c_float_1", "c_float_2", - "c_double_0", "c_double_1", "c_double_2", - "c_decimal_0", "c_decimal_1", "c_decimal_2", - "c_varchar_0", "c_varchar_1", "c_varchar_2", - "c_boolean_false", "c_boolean_true" - ), - Arrays.asList( - Types.TINYINT, Types.TINYINT, Types.TINYINT, - Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, - Types.INTEGER, Types.INTEGER, Types.INTEGER, - Types.BIGINT, Types.BIGINT, Types.BIGINT, - Types.FLOAT, Types.FLOAT, Types.FLOAT, - Types.DOUBLE, Types.DOUBLE, Types.DOUBLE, - Types.DECIMAL, Types.DECIMAL, Types.DECIMAL, - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, - Types.BOOLEAN, Types.BOOLEAN - ) - ); - try { - return MockedBoundedTable - .of(type) - .addRows( - (byte) 0, (byte) 1, (byte) 2, - (short) 0, (short) 1, (short) 2, - 0, 1, 2, - 0L, 1L, 2L, - 0.0f, 1.0f, 2.0f, - 0.0, 1.0, 2.0, - BigDecimal.ZERO, BigDecimal.ONE, BigDecimal.ONE.add(BigDecimal.ONE), - "a", "b", "c", - false, true - ) - .buildIOReader(pipeline) - .setCoder(new BeamSqlRowCoder(type)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java deleted file mode 100644 index 6233aeb..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlConditionalFunctionsIntegrationTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.integrationtest; - -import org.junit.Test; - -/** - * Integration test for conditional functions. - */ -public class BeamSqlConditionalFunctionsIntegrationTest - extends BeamSqlBuiltinFunctionsIntegrationTestBase { - @Test - public void testConditionalFunctions() throws Exception { - ExpressionChecker checker = new ExpressionChecker() - .addExpr( - "CASE 1 WHEN 1 THEN 'hello' ELSE 'world' END", - "hello" - ) - .addExpr( - "CASE 2 " - + "WHEN 1 THEN 'hello' " - + "WHEN 3 THEN 'bond' " - + "ELSE 'world' END", - "world" - ) - .addExpr( - "CASE " - + "WHEN 1 = 1 THEN 'hello' " - + "ELSE 'world' END", - "hello" - ) - .addExpr( - "CASE " - + "WHEN 1 > 1 THEN 'hello' " - + "ELSE 'world' END", - "world" - ) - .addExpr("NULLIF(5, 4) ", 5) - .addExpr("COALESCE(1, 5) ", 1) - .addExpr("COALESCE(NULL, 5) ", 5) - ; - - checker.buildRunAndCheck(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java deleted file mode 100644 index bd0d3ba..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlDateFunctionsIntegrationTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.integrationtest; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.Date; -import java.util.Iterator; -import org.apache.beam.dsls.sql.BeamSql; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.PCollection; -import org.junit.Test; - -/** - * Integration test for date functions. - */ -public class BeamSqlDateFunctionsIntegrationTest - extends BeamSqlBuiltinFunctionsIntegrationTestBase { - @Test public void testDateTimeFunctions() throws Exception { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("EXTRACT(YEAR FROM ts)", 1986L) - .addExpr("YEAR(ts)", 1986L) - .addExpr("QUARTER(ts)", 1L) - .addExpr("MONTH(ts)", 2L) - .addExpr("WEEK(ts)", 7L) - .addExpr("DAYOFMONTH(ts)", 15L) - .addExpr("DAYOFYEAR(ts)", 46L) - .addExpr("DAYOFWEEK(ts)", 7L) - .addExpr("HOUR(ts)", 11L) - .addExpr("MINUTE(ts)", 35L) - .addExpr("SECOND(ts)", 26L) - .addExpr("FLOOR(ts TO YEAR)", parseDate("1986-01-01 00:00:00")) - .addExpr("CEIL(ts TO YEAR)", parseDate("1987-01-01 00:00:00")) - ; - checker.buildRunAndCheck(); - } - - @Test public void testDateTimeFunctions_currentTime() throws Exception { - String sql = "SELECT " - + "LOCALTIME as l," - + "LOCALTIMESTAMP as l1," - + "CURRENT_DATE as c1," - + "CURRENT_TIME as c2," - + "CURRENT_TIMESTAMP as c3" - + " FROM PCOLLECTION" - ; - PCollection<BeamSqlRow> rows = getTestPCollection().apply( - BeamSql.simpleQuery(sql)); - PAssert.that(rows).satisfies(new Checker()); - pipeline.run(); - } - - private static class Checker implements SerializableFunction<Iterable<BeamSqlRow>, Void> { - @Override public Void apply(Iterable<BeamSqlRow> input) { - Iterator<BeamSqlRow> iter = input.iterator(); - assertTrue(iter.hasNext()); - BeamSqlRow row = iter.next(); - // LOCALTIME - Date date = new Date(); - assertTrue(date.getTime() - row.getGregorianCalendar(0).getTime().getTime() < 1000); - assertTrue(date.getTime() - row.getDate(1).getTime() < 1000); - assertTrue(date.getTime() - row.getDate(2).getTime() < 1000); - assertTrue(date.getTime() - row.getGregorianCalendar(3).getTime().getTime() < 1000); - assertTrue(date.getTime() - row.getDate(4).getTime() < 1000); - assertFalse(iter.hasNext()); - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java deleted file mode 100644 index 4ed1f86..0000000 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlLogicalFunctionsIntegrationTest.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.integrationtest; - -import org.junit.Test; - -/** - * Integration test for logical functions. - */ -public class BeamSqlLogicalFunctionsIntegrationTest - extends BeamSqlBuiltinFunctionsIntegrationTestBase { - @Test - public void testStringFunctions() throws Exception { - ExpressionChecker checker = new ExpressionChecker() - .addExpr("c_integer = 1 AND c_bigint = 1", true) - .addExpr("c_integer = 1 OR c_bigint = 2", true) - .addExpr("NOT c_bigint = 2", true) - .addExpr("(NOT c_bigint = 2) AND (c_integer = 1 OR c_bigint = 3)", true) - .addExpr("c_integer = 2 AND c_bigint = 1", false) - .addExpr("c_integer = 2 OR c_bigint = 2", false) - .addExpr("NOT c_bigint = 1", false) - .addExpr("(NOT c_bigint = 2) AND (c_integer = 2 OR c_bigint = 3)", false) - ; - - checker.buildRunAndCheck(); - } - -}