MockedBeamSqlTable -> MockedBoundedTable
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bc66698e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bc66698e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bc66698e Branch: refs/heads/DSL_SQL Commit: bc66698e6880c7788bcea78006c67bfca66b17ce Parents: 2149719 Author: James Xu <xumingmi...@gmail.com> Authored: Fri Jun 30 14:54:26 2017 +0800 Committer: Luke Cwik <lc...@google.com> Committed: Wed Jul 5 09:33:53 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/dsls/sql/TestUtils.java | 81 +++++++--- .../beam/dsls/sql/mock/MockedBoundedTable.java | 126 +++++++++++++++ .../apache/beam/dsls/sql/mock/MockedTable.java | 42 +++++ .../dsls/sql/mock/MockedUnboundedTable.java | 113 +++++++++++++ .../dsls/sql/planner/MockedBeamSqlTable.java | 162 ------------------- .../beam/dsls/sql/planner/MockedTable.java | 33 ---- .../dsls/sql/planner/MockedUnboundedTable.java | 120 -------------- .../beam/dsls/sql/rel/BeamIntersectRelTest.java | 78 ++++----- .../rel/BeamJoinRelBoundedVsBoundedTest.java | 141 ++++++++-------- .../rel/BeamJoinRelUnboundedVsBoundedTest.java | 21 ++- .../BeamJoinRelUnboundedVsUnboundedTest.java | 10 +- .../beam/dsls/sql/rel/BeamMinusRelTest.java | 77 +++++---- .../sql/rel/BeamSetOperatorRelBaseTest.java | 68 +++----- .../beam/dsls/sql/rel/BeamSortRelTest.java | 161 +++++++++--------- .../beam/dsls/sql/rel/BeamUnionRelTest.java | 47 +++--- .../beam/dsls/sql/rel/BeamValuesRelTest.java | 72 +++++---- 16 files changed, 691 insertions(+), 661 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java index 375027a..cfad333 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java @@ -20,7 +20,6 @@ package org.apache.beam.dsls.sql; import java.util.ArrayList; import java.util.List; - import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.transforms.DoFn; @@ -62,7 +61,7 @@ public class TestUtils { * Types.INTEGER, "order_id", * Types.INTEGER, "sum_site_id", * Types.VARCHAR, "buyer" - * ).values( + * ).addRows( * 1, 3, "james", * 2, 5, "bond" * ).getStringRows() @@ -81,15 +80,7 @@ public class TestUtils { * @args pairs of column type and column names. */ public static RowsBuilder of(final Object... args) { - List<Integer> types = new ArrayList<>(); - List<String> names = new ArrayList<>(); - int lastTypeIndex = 0; - for (; lastTypeIndex < args.length; lastTypeIndex += 2) { - types.add((int) args[lastTypeIndex]); - names.add((String) args[lastTypeIndex + 1]); - } - - BeamSqlRecordType beamSQLRecordType = BeamSqlRecordType.create(names, types); + BeamSqlRecordType beamSQLRecordType = buildBeamSqlRecordType(args); RowsBuilder builder = new RowsBuilder(); builder.type = beamSQLRecordType; @@ -97,20 +88,12 @@ public class TestUtils { } /** - * Add values to the builder. + * Add rows to the builder. * * <p>Note: check the class javadoc for for detailed example. */ - public RowsBuilder values(final Object... args) { - int fieldCount = type.size(); - for (int i = 0; i < args.length; i += fieldCount) { - BeamSqlRow row = new BeamSqlRow(type); - for (int j = 0; j < fieldCount; j++) { - row.addField(j, args[i + j]); - } - this.rows.add(row); - } - + public RowsBuilder addRows(final Object... args) { + this.rows.addAll(buildRows(type, args)); return this; } @@ -122,4 +105,58 @@ public class TestUtils { return beamSqlRows2Strings(rows); } } + + /** + * Convenient way to build a {@code BeamSqlRecordType}. + * + * <p>e.g. + * + * <pre>{@code + * buildBeamSqlRecordType( + * Types.BIGINT, "order_id", + * Types.INTEGER, "site_id", + * Types.DOUBLE, "price", + * Types.TIMESTAMP, "order_time" + * ) + * }</pre> + */ + public static BeamSqlRecordType buildBeamSqlRecordType(Object... args) { + List<Integer> types = new ArrayList<>(); + List<String> names = new ArrayList<>(); + + for (int i = 0; i < args.length - 1; i += 2) { + types.add((int) args[i]); + names.add((String) args[i + 1]); + } + + return BeamSqlRecordType.create(names, types); + } + + /** + * Convenient way to build a {@code BeamSqlRow}s. + * + * <p>e.g. + * + * <pre>{@code + * buildRows( + * recordType, + * 1, 1, 1, // the first row + * 2, 2, 2, // the second row + * ... + * ) + * }</pre> + */ + public static List<BeamSqlRow> buildRows(BeamSqlRecordType type, Object... args) { + List<BeamSqlRow> rows = new ArrayList<>(); + int fieldCount = type.size(); + + for (int i = 0; i < args.length; i += fieldCount) { + BeamSqlRow row = new BeamSqlRow(type); + for (int j = 0; j < fieldCount; j++) { + row.addField(j, args[i + j]); + } + rows.add(row); + } + return rows; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java new file mode 100644 index 0000000..0fb8a80 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.mock; + +import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRecordType; +import static org.apache.beam.dsls.sql.TestUtils.buildRows; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.beam.dsls.sql.schema.BeamIOType; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +/** + * Mocked table for bounded data sources. + */ +public class MockedBoundedTable extends MockedTable { + /** rows written to this table. */ + private static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>(); + /** rows flow out from this table. */ + private final List<BeamSqlRow> rows = new ArrayList<>(); + + public MockedBoundedTable(BeamSqlRecordType beamSqlRecordType) { + super(beamSqlRecordType); + } + + /** + * Convenient way to build a mocked bounded table. + * + * <p>e.g. + * + * <pre>{@code + * MockedUnboundedTable + * .of(Types.BIGINT, "order_id", + * Types.INTEGER, "site_id", + * Types.DOUBLE, "price", + * Types.TIMESTAMP, "order_time") + * }</pre> + */ + public static MockedBoundedTable of(final Object... args){ + return new MockedBoundedTable(buildBeamSqlRecordType(args)); + } + + + /** + * Add rows to the builder. + * + * <p>Sample usage: + * + * <pre>{@code + * addRows( + * 1, 3, "james", -- first row + * 2, 5, "bond" -- second row + * ... + * ) + * }</pre> + */ + public MockedBoundedTable addRows(Object... args) { + List<BeamSqlRow> rows = buildRows(getRecordType(), args); + this.rows.addAll(rows); + return this; + } + + @Override + public BeamIOType getSourceType() { + return BeamIOType.BOUNDED; + } + + @Override + public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { + return PBegin.in(pipeline).apply( + "MockedBoundedTable_Reader_" + COUNTER.incrementAndGet(), Create.of(rows)); + } + + @Override public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { + return new OutputStore(); + } + + /** + * Keep output in {@code CONTENT} for validation. + * + */ + public static class OutputStore extends PTransform<PCollection<BeamSqlRow>, PDone> { + + @Override + public PDone expand(PCollection<BeamSqlRow> input) { + input.apply(ParDo.of(new DoFn<BeamSqlRow, Void>() { + @ProcessElement + public void processElement(ProcessContext c) { + CONTENT.add(c.element()); + } + + @Teardown + public void close() { + CONTENT.clear(); + } + + })); + return PDone.in(input.getPipeline()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java new file mode 100644 index 0000000..eed740a --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.mock; + +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +/** + * Base class for mocked table. + */ +public abstract class MockedTable extends BaseBeamTable { + public static final AtomicInteger COUNTER = new AtomicInteger(); + public MockedTable(BeamSqlRecordType beamSqlRecordType) { + super(beamSqlRecordType); + } + + @Override + public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { + throw new UnsupportedOperationException("buildIOWriter unsupported!"); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java new file mode 100644 index 0000000..12d8d37 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.mock; + +import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRecordType; +import static org.apache.beam.dsls.sql.TestUtils.buildRows; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.dsls.sql.schema.BeamIOType; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.calcite.util.Pair; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * A mocked unbounded table. + */ +public class MockedUnboundedTable extends MockedTable { + /** rows flow out from this table with the specified watermark instant. */ + private final List<Pair<Duration, List<BeamSqlRow>>> timestampedRows = new ArrayList<>(); + /** specify the index of column in the row which stands for the event time field. */ + private int timestampField; + private MockedUnboundedTable(BeamSqlRecordType beamSqlRecordType) { + super(beamSqlRecordType); + } + + /** + * Convenient way to build a mocked unbounded table. + * + * <p>e.g. + * + * <pre>{@code + * MockedUnboundedTable + * .of(Types.BIGINT, "order_id", + * Types.INTEGER, "site_id", + * Types.DOUBLE, "price", + * Types.TIMESTAMP, "order_time") + * }</pre> + */ + public static MockedUnboundedTable of(final Object... args){ + return new MockedUnboundedTable(buildBeamSqlRecordType(args)); + } + + public MockedUnboundedTable timestampColumnIndex(int idx) { + this.timestampField = idx; + return this; + } + + /** + * Add rows to the builder. + * + * <p>Sample usage: + * + * <pre>{@code + * addRows( + * duration, -- duration which stands for the corresponding watermark instant + * 1, 3, "james", -- first row + * 2, 5, "bond" -- second row + * ... + * ) + * }</pre> + */ + public MockedUnboundedTable addRows(Duration duration, Object... args) { + List<BeamSqlRow> rows = buildRows(getRecordType(), args); + // record the watermark + rows + this.timestampedRows.add(Pair.of(duration, rows)); + return this; + } + + @Override public BeamIOType getSourceType() { + return BeamIOType.UNBOUNDED; + } + + @Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { + TestStream.Builder<BeamSqlRow> values = TestStream.create( + new BeamSqlRowCoder(beamSqlRecordType)); + + for (Pair<Duration, List<BeamSqlRow>> pair : timestampedRows) { + values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey())); + for (int i = 0; i < pair.getValue().size(); i++) { + values = values.addElements(TimestampedValue.of(pair.getValue().get(i), + new Instant(pair.getValue().get(i).getDate(timestampField)))); + } + } + + return pipeline.begin().apply( + "MockedUnboundedTable_" + COUNTER.incrementAndGet(), + values.advanceWatermarkToInfinity()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java deleted file mode 100644 index bb10369..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.planner; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.beam.dsls.sql.schema.BaseBeamTable; -import org.apache.beam.dsls.sql.schema.BeamIOType; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * Mocked table for bounded data sources. - */ -public class MockedBeamSqlTable extends BaseBeamTable { - private static final AtomicInteger COUNTER = new AtomicInteger(); - private static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>(); - private List<BeamSqlRow> inputRecords; - public MockedBeamSqlTable(BeamSqlRecordType beamSqlRecordType) { - super(beamSqlRecordType); - } - - /** - * Convenient way to build a mocked table with mock data: - * - * <p>e.g. - * - * <pre>{@code - * MockedBeamSqlTable - * .of(SqlTypeName.BIGINT, "order_id", - * SqlTypeName.INTEGER, "site_id", - * SqlTypeName.DOUBLE, "price", - * SqlTypeName.TIMESTAMP, "order_time", - * - * 1L, 2, 1.0, new Date(), - * 1L, 1, 2.0, new Date(), - * 2L, 4, 3.0, new Date(), - * 2L, 1, 4.0, new Date(), - * 5L, 5, 5.0, new Date(), - * 6L, 6, 6.0, new Date(), - * 7L, 7, 7.0, new Date(), - * 8L, 8888, 8.0, new Date(), - * 8L, 999, 9.0, new Date(), - * 10L, 100, 10.0, new Date()) - * }</pre> - */ - // FIXME: refactor this method - // 1) use Types rather than SqlTypeName - // 2) use RowsBuilder rather than duplicate the logic here - public static MockedBeamSqlTable of(final Object... args){ - final RelProtoDataType protoRowType = new RelProtoDataType() { - @Override - public RelDataType apply(RelDataTypeFactory a0) { - RelDataTypeFactory.FieldInfoBuilder builder = a0.builder(); - - int lastTypeIndex = 0; - for (; lastTypeIndex < args.length; lastTypeIndex += 2) { - if (args[lastTypeIndex] instanceof SqlTypeName) { - builder.add(args[lastTypeIndex + 1].toString(), - (SqlTypeName) args[lastTypeIndex]); - } else { - break; - } - } - return builder.build(); - } - }; - - List<BeamSqlRow> rows = new ArrayList<>(); - BeamSqlRecordType beamSQLRecordType = CalciteUtils - .toBeamRecordType(protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)); - int fieldCount = beamSQLRecordType.size(); - - for (int i = fieldCount * 2; i < args.length; i += fieldCount) { - BeamSqlRow row = new BeamSqlRow(beamSQLRecordType); - for (int j = 0; j < fieldCount; j++) { - row.addField(j, args[i + j]); - } - rows.add(row); - } - MockedBeamSqlTable table = new MockedBeamSqlTable(beamSQLRecordType); - table.inputRecords = rows; - - return table; - } - - @Override - public BeamIOType getSourceType() { - return BeamIOType.BOUNDED; - } - - @Override - - public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { - return PBegin.in(pipeline).apply( - "MockedBeamSQLTable_Reader_" + COUNTER.incrementAndGet(), Create.of(inputRecords)); - } - - @Override - public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { - return new OutputStore(); - } - - public List<BeamSqlRow> getInputRecords() { - return inputRecords; - } - - /** - * Keep output in {@code CONTENT} for validation. - * - */ - public static class OutputStore extends PTransform<PCollection<BeamSqlRow>, PDone> { - - @Override - public PDone expand(PCollection<BeamSqlRow> input) { - input.apply(ParDo.of(new DoFn<BeamSqlRow, Void>() { - @ProcessElement - public void processElement(ProcessContext c) { - CONTENT.add(c.element()); - } - - @Teardown - public void close() { - - } - - })); - return PDone.in(input.getPipeline()); - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java deleted file mode 100644 index d096a61..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.planner; - -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.beam.dsls.sql.schema.BaseBeamTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; - -/** - * Base class for mocked table. - */ -public abstract class MockedTable extends BaseBeamTable { - public static final AtomicInteger COUNTER = new AtomicInteger(); - public MockedTable(BeamSqlRecordType beamSqlRecordType) { - super(beamSqlRecordType); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java deleted file mode 100644 index 3f22df3..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.planner; - -import java.util.ArrayList; -import java.util.List; -import org.apache.beam.dsls.sql.schema.BeamIOType; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.testing.TestStream; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.TimestampedValue; -import org.apache.calcite.util.Pair; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * A mocked unbounded table. - */ -public class MockedUnboundedTable extends MockedTable { - private List<Pair<Duration, List<BeamSqlRow>>> timestampedRows = new ArrayList<>(); - private int timestampField; - private MockedUnboundedTable(BeamSqlRecordType beamSqlRecordType) { - super(beamSqlRecordType); - } - - /** - * Convenient way to build a mocked table. - * - * <p>e.g. - * - * <pre>{@code - * MockedUnboundedTable - * .of(Types.BIGINT, "order_id", - * Types.INTEGER, "site_id", - * Types.DOUBLE, "price", - * Types.TIMESTAMP, "order_time") - * }</pre> - */ - public static MockedUnboundedTable of(final Object... args){ - List<Integer> types = new ArrayList<>(); - List<String> names = new ArrayList<>(); - int lastTypeIndex = 0; - for (; lastTypeIndex < args.length; lastTypeIndex += 2) { - types.add((int) args[lastTypeIndex]); - names.add((String) args[lastTypeIndex + 1]); - } - - return new MockedUnboundedTable( - BeamSqlRecordType.create(names, types) - ); - } - - public MockedUnboundedTable timestampColumnIndex(int idx) { - this.timestampField = idx; - return this; - } - - public MockedUnboundedTable addRows(Duration duration, Object... args) { - List<BeamSqlRow> rows = new ArrayList<>(); - int fieldCount = getRecordType().size(); - - for (int i = 0; i < args.length; i += fieldCount) { - BeamSqlRow row = new BeamSqlRow(getRecordType()); - for (int j = 0; j < fieldCount; j++) { - row.addField(j, args[i + j]); - } - rows.add(row); - } - - // record the watermark + rows - this.timestampedRows.add(Pair.of(duration, rows)); - return this; - } - - @Override public BeamIOType getSourceType() { - return BeamIOType.UNBOUNDED; - } - - @Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { - TestStream.Builder<BeamSqlRow> values = TestStream.create( - new BeamSqlRowCoder(beamSqlRecordType)); - - for (Pair<Duration, List<BeamSqlRow>> pair : timestampedRows) { - values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey())); - for (int i = 0; i < pair.getValue().size(); i++) { - values = values.addElements(TimestampedValue.of(pair.getValue().get(i), - new Instant(pair.getValue().get(i).getDate(timestampField)))); - } - } - - return pipeline.begin().apply( - "MockedUnboundedTable_" + COUNTER.incrementAndGet(), - values.advanceWatermarkToInfinity()); - } - - @Override public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { - throw new UnsupportedOperationException("MockedUnboundedTable#buildIOWriter unsupported!"); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java index 47fdc16..3b37143 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java @@ -18,14 +18,15 @@ package org.apache.beam.dsls.sql.rel; +import java.sql.Types; import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.TestUtils; +import org.apache.beam.dsls.sql.mock.MockedBoundedTable; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.sql.type.SqlTypeName; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -38,29 +39,33 @@ public class BeamIntersectRelTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); - private static MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable - .of(SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", - 1L, 1, 1.0, - 1L, 1, 1.0, - 2L, 2, 2.0, - 4L, 4, 4.0 - ); - - private static MockedBeamSqlTable orderDetailsTable2 = MockedBeamSqlTable - .of(SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", - 1L, 1, 1.0, - 2L, 2, 2.0, - 3L, 3, 3.0 - ); @BeforeClass - public static void setUp() { - sqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1); - sqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2); + public static void prepare() { + sqlEnv.registerTable("ORDER_DETAILS1", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 1, 1.0, + 1L, 1, 1.0, + 2L, 2, 2.0, + 4L, 4, 4.0 + ) + ); + + sqlEnv.registerTable("ORDER_DETAILS2", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 1, 1.0, + 2L, 2, 2.0, + 3L, 3, 3.0 + ) + ); } @Test @@ -74,14 +79,14 @@ public class BeamIntersectRelTest { PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( - MockedBeamSqlTable.of( - SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", - - 1L, 1, 1.0, - 2L, 2, 2.0 - ).getInputRecords()); + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 1, 1.0, + 2L, 2, 2.0 + ).getRows()); pipeline.run().waitUntilFinish(); } @@ -99,14 +104,15 @@ public class BeamIntersectRelTest { PAssert.that(rows).satisfies(new CheckSize(3)); PAssert.that(rows).containsInAnyOrder( - MockedBeamSqlTable.of( - SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( 1L, 1, 1.0, 1L, 1, 1.0, 2L, 2, 2.0 - ).getInputRecords()); + ).getRows()); pipeline.run(); } http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java index 505b742..d15cb81 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java @@ -18,14 +18,15 @@ package org.apache.beam.dsls.sql.rel; +import java.sql.Types; import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.TestUtils; +import org.apache.beam.dsls.sql.mock.MockedBoundedTable; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.sql.type.SqlTypeName; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -41,24 +42,28 @@ public class BeamJoinRelBoundedVsBoundedTest { @BeforeClass public static void prepare() { beamSqlEnv.registerTable("ORDER_DETAILS", - MockedBeamSqlTable - .of(SqlTypeName.INTEGER, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.INTEGER, "price", - - 1, 2, 3, - 2, 3, 3, - 3, 4, 5)); + MockedBoundedTable.of( + Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price" + ).addRows( + 1, 2, 3, + 2, 3, 3, + 3, 4, 5 + ) + ); beamSqlEnv.registerTable("ORDER_DETAILS0", - MockedBeamSqlTable - .of(SqlTypeName.INTEGER, "order_id0", - SqlTypeName.INTEGER, "site_id0", - SqlTypeName.INTEGER, "price0", - - 1, 2, 3, - 2, 3, 3, - 3, 4, 5)); + MockedBoundedTable.of( + Types.INTEGER, "order_id0", + Types.INTEGER, "site_id0", + Types.INTEGER, "price0" + ).addRows( + 1, 2, 3, + 2, 3, 3, + 3, 4, 5 + ) + ); } @@ -73,16 +78,17 @@ public class BeamJoinRelBoundedVsBoundedTest { ; PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( - SqlTypeName.INTEGER, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.INTEGER, "price", - SqlTypeName.INTEGER, "order_id0", - SqlTypeName.INTEGER, "site_id0", - SqlTypeName.INTEGER, "price0", - - 2, 3, 3, 1, 2, 3 - ).getInputRecords()); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price", + Types.INTEGER, "order_id0", + Types.INTEGER, "site_id0", + Types.INTEGER, "price0" + ).addRows( + 2, 3, 3, 1, 2, 3 + ).getRows()); pipeline.run(); } @@ -98,18 +104,19 @@ public class BeamJoinRelBoundedVsBoundedTest { PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); pipeline.enableAbandonedNodeEnforcement(false); - PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( - SqlTypeName.INTEGER, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.INTEGER, "price", - SqlTypeName.INTEGER, "order_id0", - SqlTypeName.INTEGER, "site_id0", - SqlTypeName.INTEGER, "price0", - - 1, 2, 3, null, null, null, - 2, 3, 3, 1, 2, 3, - 3, 4, 5, null, null, null - ).getInputRecords()); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price", + Types.INTEGER, "order_id0", + Types.INTEGER, "site_id0", + Types.INTEGER, "price0" + ).addRows( + 1, 2, 3, null, null, null, + 2, 3, 3, 1, 2, 3, + 3, 4, 5, null, null, null + ).getRows()); pipeline.run(); } @@ -124,18 +131,19 @@ public class BeamJoinRelBoundedVsBoundedTest { ; PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( - SqlTypeName.INTEGER, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.INTEGER, "price", - SqlTypeName.INTEGER, "order_id0", - SqlTypeName.INTEGER, "site_id0", - SqlTypeName.INTEGER, "price0", - - 2, 3, 3, 1, 2, 3, - null, null, null, 2, 3, 3, - null, null, null, 3, 4, 5 - ).getInputRecords()); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price", + Types.INTEGER, "order_id0", + Types.INTEGER, "site_id0", + Types.INTEGER, "price0" + ).addRows( + 2, 3, 3, 1, 2, 3, + null, null, null, 2, 3, 3, + null, null, null, 3, 4, 5 + ).getRows()); pipeline.run(); } @@ -150,20 +158,21 @@ public class BeamJoinRelBoundedVsBoundedTest { ; PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv); - PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( - SqlTypeName.INTEGER, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.INTEGER, "price", - SqlTypeName.INTEGER, "order_id0", - SqlTypeName.INTEGER, "site_id0", - SqlTypeName.INTEGER, "price0", - - 2, 3, 3, 1, 2, 3, - 1, 2, 3, null, null, null, - 3, 4, 5, null, null, null, - null, null, null, 2, 3, 3, - null, null, null, 3, 4, 5 - ).getInputRecords()); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "order_id", + Types.INTEGER, "site_id", + Types.INTEGER, "price", + Types.INTEGER, "order_id0", + Types.INTEGER, "site_id0", + Types.INTEGER, "price0" + ).addRows( + 2, 3, 3, 1, 2, 3, + 1, 2, 3, null, null, null, + 3, 4, 5, null, null, null, + null, null, null, 2, 3, 3, + null, null, null, 3, 4, 5 + ).getRows()); pipeline.run(); } http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java index 2ddb00b..3f0c98e 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java @@ -23,15 +23,14 @@ import java.util.Date; import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.TestUtils; -import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; -import org.apache.beam.dsls.sql.planner.MockedUnboundedTable; +import org.apache.beam.dsls.sql.mock.MockedBoundedTable; +import org.apache.beam.dsls.sql.mock.MockedUnboundedTable; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.transform.BeamSqlOutputToConsoleFn; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.sql.type.SqlTypeName; import org.joda.time.Duration; import org.junit.BeforeClass; import org.junit.Rule; @@ -79,10 +78,10 @@ public class BeamJoinRelUnboundedVsBoundedTest { ) ); - beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBeamSqlTable - .of(SqlTypeName.INTEGER, "order_id", - SqlTypeName.VARCHAR, "buyer", - + beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBoundedTable + .of(Types.INTEGER, "order_id", + Types.VARCHAR, "buyer" + ).addRows( 1, "james", 2, "bond" )); @@ -106,7 +105,7 @@ public class BeamJoinRelUnboundedVsBoundedTest { Types.INTEGER, "order_id", Types.INTEGER, "sum_site_id", Types.VARCHAR, "buyer" - ).values( + ).addRows( 1, 3, "james", 2, 5, "bond" ).getStringRows() @@ -132,7 +131,7 @@ public class BeamJoinRelUnboundedVsBoundedTest { Types.INTEGER, "order_id", Types.INTEGER, "sum_site_id", Types.VARCHAR, "buyer" - ).values( + ).addRows( 1, 3, "james", 2, 5, "bond" ).getStringRows() @@ -159,7 +158,7 @@ public class BeamJoinRelUnboundedVsBoundedTest { Types.INTEGER, "order_id", Types.INTEGER, "sum_site_id", Types.VARCHAR, "buyer" - ).values( + ).addRows( 1, 3, "james", 2, 5, "bond", 3, 3, null @@ -200,7 +199,7 @@ public class BeamJoinRelUnboundedVsBoundedTest { Types.INTEGER, "order_id", Types.INTEGER, "sum_site_id", Types.VARCHAR, "buyer" - ).values( + ).addRows( 1, 3, "james", 2, 5, "bond", 3, 3, null http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java index 18a5f60..d76e875 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java @@ -23,7 +23,7 @@ import java.util.Date; import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.TestUtils; -import org.apache.beam.dsls.sql.planner.MockedUnboundedTable; +import org.apache.beam.dsls.sql.mock.MockedUnboundedTable; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.transform.BeamSqlOutputToConsoleFn; import org.apache.beam.sdk.testing.PAssert; @@ -95,7 +95,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest { Types.INTEGER, "order_id", Types.INTEGER, "sum_site_id", Types.INTEGER, "order_id0", - Types.INTEGER, "sum_site_id0").values( + Types.INTEGER, "sum_site_id0").addRows( 1, 3, 1, 3, 2, 5, 2, 5 ).getStringRows() @@ -129,7 +129,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest { Types.INTEGER, "sum_site_id", Types.INTEGER, "order_id0", Types.INTEGER, "sum_site_id0" - ).values( + ).addRows( 1, 1, 1, 3, 2, 2, null, null, 2, 2, 2, 5, @@ -159,7 +159,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest { Types.INTEGER, "sum_site_id", Types.INTEGER, "order_id0", Types.INTEGER, "sum_site_id0" - ).values( + ).addRows( 1, 3, 1, 1, null, null, 2, 2, 2, 5, 2, 2, @@ -190,7 +190,7 @@ public class BeamJoinRelUnboundedVsUnboundedTest { Types.INTEGER, "sum_site_id", Types.INTEGER, "order_id", Types.INTEGER, "sum_site_id0" - ).values( + ).addRows( 1, 1, 1, 3, 6, 2, null, null, 7, 2, null, null, http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java index bb5e7ee..80da8fb 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java @@ -18,15 +18,16 @@ package org.apache.beam.dsls.sql.rel; +import java.sql.Types; import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.TestUtils; +import org.apache.beam.dsls.sql.mock.MockedBoundedTable; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -38,30 +39,34 @@ public class BeamMinusRelTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); - private MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable - .of(SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", - 1L, 1, 1.0, - 1L, 1, 1.0, - 2L, 2, 2.0, - 4L, 4, 4.0, - 4L, 4, 4.0 - ); - private MockedBeamSqlTable orderDetailsTable2 = MockedBeamSqlTable - .of(SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", - 1L, 1, 1.0, - 2L, 2, 2.0, - 3L, 3, 3.0 - ); + @BeforeClass + public static void prepare() { + sqlEnv.registerTable("ORDER_DETAILS1", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 1, 1.0, + 1L, 1, 1.0, + 2L, 2, 2.0, + 4L, 4, 4.0, + 4L, 4, 4.0 + ) + ); - @Before - public void setUp() { - sqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1); - sqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2); + sqlEnv.registerTable("ORDER_DETAILS2", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 1, 1.0, + 2L, 2, 2.0, + 3L, 3, 3.0 + ) + ); } @Test @@ -75,12 +80,13 @@ public class BeamMinusRelTest { PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( - MockedBeamSqlTable.of( - SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( 4L, 4, 4.0 - ).getInputRecords()); + ).getRows()); pipeline.run(); } @@ -98,13 +104,14 @@ public class BeamMinusRelTest { PAssert.that(rows).satisfies(new CheckSize(2)); PAssert.that(rows).containsInAnyOrder( - MockedBeamSqlTable.of( - SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( 4L, 4, 4.0, 4L, 4, 4.0 - ).getInputRecords()); + ).getRows()); pipeline.run(); } http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java index f10a767..d0b01df 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java @@ -18,22 +18,19 @@ package org.apache.beam.dsls.sql.rel; -import java.util.ArrayList; +import java.sql.Types; import java.util.Date; -import java.util.List; - import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.TestUtils; +import org.apache.beam.dsls.sql.mock.MockedBoundedTable; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.sql.type.SqlTypeName; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -46,20 +43,21 @@ public class BeamSetOperatorRelBaseTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); - public static final Date THE_DATE = new Date(); - private static MockedBeamSqlTable orderDetailsTable = MockedBeamSqlTable - .of(SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", - SqlTypeName.TIMESTAMP, "order_time", - - 1L, 1, 1.0, THE_DATE, - 2L, 2, 2.0, THE_DATE); + public static final Date THE_DATE = new Date(100000); @BeforeClass public static void prepare() { - THE_DATE.setTime(100000); - sqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable); + sqlEnv.registerTable("ORDER_DETAILS", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price", + Types.TIMESTAMP, "order_time" + ).addRows( + 1L, 1, 1.0, THE_DATE, + 2L, 2, 2.0, THE_DATE + ) + ); } @Test @@ -74,17 +72,17 @@ public class BeamSetOperatorRelBaseTest { + ", TUMBLE(order_time, INTERVAL '1' HOUR) "; PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - List<BeamSqlRow> expRows = - MockedBeamSqlTable.of( - SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.BIGINT, "cnt", - - 1L, 1, 1L, - 2L, 2, 1L - ).getInputRecords(); // compare valueInString to ignore the windowStart & windowEnd - PAssert.that(rows.apply(ParDo.of(new ToString()))).containsInAnyOrder(toString(expRows)); + PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn()))) + .containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.BIGINT, "cnt" + ).addRows( + 1L, 1, 1L, + 2L, 2, 1L + ).getStringRows()); pipeline.run(); } @@ -105,20 +103,4 @@ public class BeamSetOperatorRelBaseTest { BeamSqlCli.compilePipeline(sql, pipeline1, sqlEnv); pipeline.run(); } - - static class ToString extends DoFn<BeamSqlRow, String> { - @ProcessElement - public void processElement(ProcessContext ctx) { - ctx.output(ctx.element().valueInString()); - } - } - - static List<String> toString (List<BeamSqlRow> rows) { - List<String> strs = new ArrayList<>(); - for (BeamSqlRow row : rows) { - strs.add(row.valueInString()); - } - - return strs; - } } http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java index d5c18fc..1067926 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java @@ -18,15 +18,16 @@ package org.apache.beam.dsls.sql.rel; +import java.sql.Types; import java.util.Date; import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.TestUtils; +import org.apache.beam.dsls.sql.mock.MockedBoundedTable; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.sql.type.SqlTypeName; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -40,27 +41,35 @@ public class BeamSortRelTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); - private static MockedBeamSqlTable subOrderRamTable = MockedBeamSqlTable.of( - SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price"); - - private static MockedBeamSqlTable orderDetailTable = MockedBeamSqlTable - .of(SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", - SqlTypeName.TIMESTAMP, "order_time", - - 1L, 2, 1.0, new Date(), - 1L, 1, 2.0, new Date(), - 2L, 4, 3.0, new Date(), - 2L, 1, 4.0, new Date(), - 5L, 5, 5.0, new Date(), - 6L, 6, 6.0, new Date(), - 7L, 7, 7.0, new Date(), - 8L, 8888, 8.0, new Date(), - 8L, 999, 9.0, new Date(), - 10L, 100, 10.0, new Date()); + @Before + public void prepare() { + sqlEnv.registerTable("ORDER_DETAILS", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price", + Types.TIMESTAMP, "order_time" + ).addRows( + 1L, 2, 1.0, new Date(), + 1L, 1, 2.0, new Date(), + 2L, 4, 3.0, new Date(), + 2L, 1, 4.0, new Date(), + 5L, 5, 5.0, new Date(), + 6L, 6, 6.0, new Date(), + 7L, 7, 7.0, new Date(), + 8L, 8888, 8.0, new Date(), + 8L, 999, 9.0, new Date(), + 10L, 100, 10.0, new Date() + ) + ); + sqlEnv.registerTable("SUB_ORDER_RAM", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ) + ); + } @Test public void testOrderBy_basic() throws Exception { @@ -70,34 +79,38 @@ public class BeamSortRelTest { + "ORDER BY order_id asc, site_id desc limit 4"; PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( - SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", + PAssert.that(rows).containsInAnyOrder(TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( 1L, 2, 1.0, 1L, 1, 2.0, 2L, 4, 3.0, 2L, 1, 4.0 - ).getInputRecords()); + ).getRows()); pipeline.run().waitUntilFinish(); } @Test public void testOrderBy_nullsFirst() throws Exception { - sqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable - .of(SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", - + sqlEnv.registerTable("ORDER_DETAILS", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( 1L, 2, 1.0, 1L, null, 2.0, 2L, 1, 3.0, 2L, null, 4.0, - 5L, 5, 5.0)); - sqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable - .of(SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price")); + 5L, 5, 5.0 + ) + ); + sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable + .of(Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price")); String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + " order_id, site_id, price " @@ -106,36 +119,36 @@ public class BeamSortRelTest { PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( - MockedBeamSqlTable.of( - SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", - + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( 1L, null, 2.0, 1L, 2, 1.0, 2L, null, 4.0, 2L, 1, 3.0 - ).getInputRecords() + ).getRows() ); pipeline.run().waitUntilFinish(); } @Test public void testOrderBy_nullsLast() throws Exception { - sqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable - .of(SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", - + sqlEnv.registerTable("ORDER_DETAILS", MockedBoundedTable + .of(Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( 1L, 2, 1.0, 1L, null, 2.0, 2L, 1, 3.0, 2L, null, 4.0, 5L, 5, 5.0)); - sqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable - .of(SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price")); + sqlEnv.registerTable("SUB_ORDER_RAM", MockedBoundedTable + .of(Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price")); String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT " + " order_id, site_id, price " @@ -144,16 +157,16 @@ public class BeamSortRelTest { PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( - MockedBeamSqlTable.of( - SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", - + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( 1L, 2, 1.0, 1L, null, 2.0, 2L, 1, 3.0, 2L, null, 4.0 - ).getInputRecords() + ).getRows() ); pipeline.run().waitUntilFinish(); } @@ -167,16 +180,16 @@ public class BeamSortRelTest { PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( - MockedBeamSqlTable.of( - SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", - + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( 5L, 5, 5.0, 6L, 6, 6.0, 7L, 7, 7.0, 8L, 8888, 8.0 - ).getInputRecords() + ).getRows() ); pipeline.run().waitUntilFinish(); } @@ -190,11 +203,11 @@ public class BeamSortRelTest { PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( - MockedBeamSqlTable.of( - SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", - + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( 1L, 2, 1.0, 1L, 1, 2.0, 2L, 4, 3.0, @@ -205,7 +218,7 @@ public class BeamSortRelTest { 8L, 8888, 8.0, 8L, 999, 9.0, 10L, 100, 10.0 - ).getInputRecords() + ).getRows() ); pipeline.run().waitUntilFinish(); } @@ -221,10 +234,4 @@ public class BeamSortRelTest { TestPipeline pipeline = TestPipeline.create(); BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); } - - @Before - public void prepare() { - sqlEnv.registerTable("ORDER_DETAILS", orderDetailTable); - sqlEnv.registerTable("SUB_ORDER_RAM", subOrderRamTable); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java index c5aa132..cad3290 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java @@ -18,14 +18,15 @@ package org.apache.beam.dsls.sql.rel; +import java.sql.Types; import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.TestUtils; +import org.apache.beam.dsls.sql.mock.MockedBoundedTable; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.sql.type.SqlTypeName; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -38,17 +39,19 @@ public class BeamUnionRelTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); - private static MockedBeamSqlTable orderDetailsTable = MockedBeamSqlTable - .of(SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", - - 1L, 1, 1.0, - 2L, 2, 2.0); @BeforeClass public static void prepare() { - sqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable); + sqlEnv.registerTable("ORDER_DETAILS", + MockedBoundedTable.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( + 1L, 1, 1.0, + 2L, 2, 2.0 + ) + ); } @Test @@ -62,14 +65,14 @@ public class BeamUnionRelTest { PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( - MockedBeamSqlTable.of( - SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", - + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( 1L, 1, 1.0, 2L, 2, 2.0 - ).getInputRecords() + ).getRows() ); pipeline.run(); } @@ -85,16 +88,16 @@ public class BeamUnionRelTest { PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); PAssert.that(rows).containsInAnyOrder( - MockedBeamSqlTable.of( - SqlTypeName.BIGINT, "order_id", - SqlTypeName.INTEGER, "site_id", - SqlTypeName.DOUBLE, "price", - + TestUtils.RowsBuilder.of( + Types.BIGINT, "order_id", + Types.INTEGER, "site_id", + Types.DOUBLE, "price" + ).addRows( 1L, 1, 1.0, 1L, 1, 1.0, 2L, 2, 2.0, 2L, 2, 2.0 - ).getInputRecords() + ).getRows() ); pipeline.run(); } http://git-wip-us.apache.org/repos/asf/beam/blob/bc66698e/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java index 81b1a13..9d13f9b 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java @@ -18,14 +18,15 @@ package org.apache.beam.dsls.sql.rel; +import java.sql.Types; import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.TestUtils; +import org.apache.beam.dsls.sql.mock.MockedBoundedTable; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.sql.type.SqlTypeName; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -38,24 +39,37 @@ public class BeamValuesRelTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); - private static MockedBeamSqlTable stringTable = MockedBeamSqlTable - .of(SqlTypeName.VARCHAR, "name", - SqlTypeName.VARCHAR, "description"); - private static MockedBeamSqlTable intTable = MockedBeamSqlTable - .of(SqlTypeName.INTEGER, "c0", - SqlTypeName.INTEGER, "c1"); + @BeforeClass + public static void prepare() { + sqlEnv.registerTable("string_table", + MockedBoundedTable.of( + Types.VARCHAR, "name", + Types.VARCHAR, "description" + ) + ); + sqlEnv.registerTable("int_table", + MockedBoundedTable.of( + Types.INTEGER, "c0", + Types.INTEGER, "c1" + ) + ); + } @Test public void testValues() throws Exception { String sql = "insert into string_table(name, description) values " + "('hello', 'world'), ('james', 'bond')"; PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( - SqlTypeName.VARCHAR, "name", - SqlTypeName.VARCHAR, "description", - "hello", "world", - "james", "bond").getInputRecords()); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.VARCHAR, "name", + Types.VARCHAR, "description" + ).addRows( + "hello", "world", + "james", "bond" + ).getRows() + ); pipeline.run(); } @@ -63,11 +77,14 @@ public class BeamValuesRelTest { public void testValues_castInt() throws Exception { String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))"; PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( - SqlTypeName.INTEGER, "c0", - SqlTypeName.INTEGER, "c1", - 1, 2 - ).getInputRecords()); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "c0", + Types.INTEGER, "c1" + ).addRows( + 1, 2 + ).getRows() + ); pipeline.run(); } @@ -75,17 +92,14 @@ public class BeamValuesRelTest { public void testValues_onlySelect() throws Exception { String sql = "select 1, '1'"; PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( - SqlTypeName.INTEGER, "EXPR$0", - SqlTypeName.CHAR, "EXPR$1", - 1, "1" - ).getInputRecords()); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder.of( + Types.INTEGER, "EXPR$0", + Types.CHAR, "EXPR$1" + ).addRows( + 1, "1" + ).getRows() + ); pipeline.run(); } - - @BeforeClass - public static void prepareClass() { - sqlEnv.registerTable("string_table", stringTable); - sqlEnv.registerTable("int_table", intTable); - } }