[BEAM-2560] Add integration test for arithmetic operators. And also refactor BeamSqlStringFunctionsIntegrationTest to use ExpressionChecker
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a976ec04 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a976ec04 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a976ec04 Branch: refs/heads/DSL_SQL Commit: a976ec042d9c2a2925b4b3c9f31261ea7b324f46 Parents: 5fea746 Author: James Xu <xumingmi...@gmail.com> Authored: Fri Jul 7 11:04:46 2017 +0800 Committer: James Xu <xumingmi...@gmail.com> Committed: Fri Jul 14 11:15:53 2017 +0800 ---------------------------------------------------------------------- .../dsls/sql/interpreter/BeamSqlFnExecutor.java | 3 + .../arithmetic/BeamSqlArithmeticExpression.java | 120 +++++++------ .../arithmetic/BeamSqlDivideExpression.java | 13 +- .../arithmetic/BeamSqlMinusExpression.java | 10 +- .../arithmetic/BeamSqlModExpression.java | 12 +- .../arithmetic/BeamSqlMultiplyExpression.java | 10 +- .../arithmetic/BeamSqlPlusExpression.java | 10 +- .../apache/beam/dsls/sql/schema/BeamSqlRow.java | 27 +-- .../beam/dsls/sql/schema/BeamSqlRowCoder.java | 2 + .../beam/dsls/sql/utils/CalciteUtils.java | 1 + .../org/apache/beam/dsls/sql/TestUtils.java | 18 +- ...amSqlArithmeticOperatorsIntegrationTest.java | 162 ++++++++++++++++++ ...mSqlBuiltinFunctionsIntegrationTestBase.java | 168 +++++++++++++++++++ .../BeamSqlStringFunctionsIntegrationTest.java | 85 +++------- .../BeamSqlArithmeticExpressionTest.java | 42 ++--- .../beam/dsls/sql/mock/MockedBoundedTable.java | 10 +- .../dsls/sql/mock/MockedUnboundedTable.java | 3 +- 17 files changed, 500 insertions(+), 196 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java index 5d0ce29..de4112d 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java @@ -165,6 +165,9 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor { case BIGINT: realValue = rawValue.longValue(); break; + case DECIMAL: + realValue = rawValue; + break; default: throw new IllegalStateException("type/realType mismatch: " + type + " VS " + realType); http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java index f3fd68f..eac4c72 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java @@ -18,8 +18,9 @@ package org.apache.beam.dsls.sql.interpreter.operator.arithmetic; +import java.math.BigDecimal; +import java.util.ArrayList; import java.util.List; - import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; import org.apache.beam.dsls.sql.schema.BeamSqlRow; @@ -29,14 +30,53 @@ import org.apache.calcite.sql.type.SqlTypeName; * Base class for all arithmetic operators. */ public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression { - private BeamSqlArithmeticExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { + private static final List<SqlTypeName> ORDERED_APPROX_TYPES = new ArrayList<>(); + static { + ORDERED_APPROX_TYPES.add(SqlTypeName.TINYINT); + ORDERED_APPROX_TYPES.add(SqlTypeName.SMALLINT); + ORDERED_APPROX_TYPES.add(SqlTypeName.INTEGER); + ORDERED_APPROX_TYPES.add(SqlTypeName.BIGINT); + ORDERED_APPROX_TYPES.add(SqlTypeName.FLOAT); + ORDERED_APPROX_TYPES.add(SqlTypeName.DOUBLE); + ORDERED_APPROX_TYPES.add(SqlTypeName.DECIMAL); + } + + protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands) { + super(operands, deduceOutputType(operands.get(0).getOutputType(), + operands.get(1).getOutputType())); + } + + protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { super(operands, outputType); } - public BeamSqlArithmeticExpression(List<BeamSqlExpression> operands) { - // the outputType can not be determined in constructor - // will be determined in evaluate() method. ANY here is just a placeholder. - super(operands, SqlTypeName.ANY); + @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRecord) { + BigDecimal left = BigDecimal.valueOf( + Double.valueOf(opValueEvaluated(0, inputRecord).toString())); + BigDecimal right = BigDecimal.valueOf( + Double.valueOf(opValueEvaluated(1, inputRecord).toString())); + + BigDecimal result = calc(left, right); + return getCorrectlyTypedResult(result); + } + + protected abstract BigDecimal calc(BigDecimal left, BigDecimal right); + + protected static SqlTypeName deduceOutputType(SqlTypeName left, SqlTypeName right) { + int leftIndex = ORDERED_APPROX_TYPES.indexOf(left); + int rightIndex = ORDERED_APPROX_TYPES.indexOf(right); + if ((left == SqlTypeName.FLOAT || right == SqlTypeName.FLOAT) + && (left == SqlTypeName.DECIMAL || right == SqlTypeName.DECIMAL)) { + return SqlTypeName.DOUBLE; + } + + if (leftIndex < rightIndex) { + return right; + } else if (leftIndex > rightIndex) { + return left; + } else { + return left; + } } @Override public boolean accept() { @@ -52,49 +92,31 @@ public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression { return true; } - /** - * https://dev.mysql.com/doc/refman/5.7/en/arithmetic-functions.html. - */ - @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRecord) { - BeamSqlExpression leftOp = operands.get(0); - BeamSqlExpression rightOp = operands.get(1); - - // In the case of -, +, and *, the result is calculated as Long if both - // operands are INT_TYPES(byte, short, integer, long). - if (SqlTypeName.INT_TYPES.contains(leftOp.getOutputType()) - && SqlTypeName.INT_TYPES.contains(rightOp.getOutputType())) { - Long leftValue = Long.valueOf(leftOp.evaluate(inputRecord).getValue().toString()); - Long rightValue = Long.valueOf(rightOp.evaluate(inputRecord).getValue().toString()); - Long ret = calc(leftValue, rightValue); - return BeamSqlPrimitive.of(SqlTypeName.BIGINT, ret); - } else { - // If any of the operands of a +, -, /, *, % is a real - // OR - // It is a division calculation - // we treat them as Double - double leftValue = getDouble(inputRecord, leftOp); - double rightValue = getDouble(inputRecord, rightOp); - return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, calc(leftValue, rightValue)); + protected BeamSqlPrimitive<? extends Number> getCorrectlyTypedResult(BigDecimal rawResult) { + Number actualValue; + switch (outputType) { + case TINYINT: + actualValue = rawResult.byteValue(); + break; + case SMALLINT: + actualValue = rawResult.shortValue(); + break; + case INTEGER: + actualValue = rawResult.intValue(); + break; + case BIGINT: + actualValue = rawResult.longValue(); + break; + case FLOAT: + actualValue = rawResult.floatValue(); + break; + case DOUBLE: + actualValue = rawResult.doubleValue(); + break; + case DECIMAL: + default: + actualValue = rawResult; } + return BeamSqlPrimitive.of(outputType, actualValue); } - - private double getDouble(BeamSqlRow inputRecord, BeamSqlExpression op) { - Object raw = op.evaluate(inputRecord).getValue(); - if (SqlTypeName.NUMERIC_TYPES.contains(op.getOutputType())) { - return ((Number) raw).doubleValue(); - } - throw new IllegalStateException( - String.format("Can't build a valid arithmetic expression with argument %s", raw)); - } - - /** - * For {@link SqlTypeName#INT_TYPES} calculation of '+', '-', '*'. - */ - public abstract Long calc(Long left, Long right); - - - /** - * For other {@link SqlTypeName#NUMERIC_TYPES} of '+', '-', '*', '/'. - */ - public abstract Double calc(Number left, Number right); } http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java index 907b1fc..db3fac6 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java @@ -18,8 +18,8 @@ package org.apache.beam.dsls.sql.interpreter.operator.arithmetic; +import java.math.BigDecimal; import java.util.List; - import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; /** @@ -30,14 +30,7 @@ public class BeamSqlDivideExpression extends BeamSqlArithmeticExpression { super(operands); } - @Override public Long calc(Long left, Long right) { - return left / right; - } - - @Override public Double calc(Number left, Number right) { - if (right.doubleValue() == 0) { - throw new IllegalArgumentException("divisor cannot be 0"); - } - return left.doubleValue() / right.doubleValue(); + @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) { + return left.divide(right); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java index c6d7ca0..fe08870 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java @@ -18,8 +18,8 @@ package org.apache.beam.dsls.sql.interpreter.operator.arithmetic; +import java.math.BigDecimal; import java.util.List; - import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; /** @@ -30,11 +30,7 @@ public class BeamSqlMinusExpression extends BeamSqlArithmeticExpression { super(operands); } - @Override public Long calc(Long left, Long right) { - return left - right; - } - - @Override public Double calc(Number left, Number right) { - return left.doubleValue() - right.doubleValue(); + @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) { + return left.subtract(right); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java index 6323e95..11ecf25 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java @@ -18,8 +18,8 @@ package org.apache.beam.dsls.sql.interpreter.operator.arithmetic; +import java.math.BigDecimal; import java.util.List; - import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; /** @@ -27,14 +27,10 @@ import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; */ public class BeamSqlModExpression extends BeamSqlArithmeticExpression { public BeamSqlModExpression(List<BeamSqlExpression> operands) { - super(operands); - } - - @Override public Long calc(Long left, Long right) { - return left % right; + super(operands, operands.get(1).getOutputType()); } - @Override public Double calc(Number left, Number right) { - return left.doubleValue() % right.doubleValue(); + @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) { + return BigDecimal.valueOf(left.doubleValue() % right.doubleValue()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java index 42ba4a5..e16d3cb 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java @@ -18,8 +18,8 @@ package org.apache.beam.dsls.sql.interpreter.operator.arithmetic; +import java.math.BigDecimal; import java.util.List; - import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; /** @@ -30,11 +30,7 @@ public class BeamSqlMultiplyExpression extends BeamSqlArithmeticExpression { super(operands); } - @Override public Long calc(Long left, Long right) { - return left * right; - } - - @Override public Double calc(Number left, Number right) { - return left.doubleValue() * right.doubleValue(); + @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) { + return left.multiply(right); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java index 59be053..5804279 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java @@ -18,8 +18,8 @@ package org.apache.beam.dsls.sql.interpreter.operator.arithmetic; +import java.math.BigDecimal; import java.util.List; - import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; /** @@ -30,11 +30,7 @@ public class BeamSqlPlusExpression extends BeamSqlArithmeticExpression { super(operands); } - @Override public Double calc(Number left, Number right) { - return left.doubleValue() + right.doubleValue(); - } - - @Override public Long calc(Long left, Long right) { - return left + right; + @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) { + return left.add(right); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java index db0ce04..b21a018 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java @@ -87,63 +87,62 @@ public class BeamSqlRow implements Serializable { case INTEGER: if (!(fieldValue instanceof Integer)) { throw new IllegalArgumentException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + getTypeMismatchErrorMessage(fieldValue, fieldType)); } break; case SMALLINT: if (!(fieldValue instanceof Short)) { throw new IllegalArgumentException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + getTypeMismatchErrorMessage(fieldValue, fieldType)); } break; case TINYINT: if (!(fieldValue instanceof Byte)) { throw new IllegalArgumentException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + getTypeMismatchErrorMessage(fieldValue, fieldType)); } break; case DOUBLE: if (!(fieldValue instanceof Double)) { throw new IllegalArgumentException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + getTypeMismatchErrorMessage(fieldValue, fieldType)); } break; case BIGINT: if (!(fieldValue instanceof Long)) { throw new IllegalArgumentException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + getTypeMismatchErrorMessage(fieldValue, fieldType)); } break; case FLOAT: if (!(fieldValue instanceof Float)) { throw new IllegalArgumentException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + getTypeMismatchErrorMessage(fieldValue, fieldType)); } break; case DECIMAL: if (!(fieldValue instanceof BigDecimal)) { - throw new IllegalArgumentException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + throw new IllegalArgumentException(getTypeMismatchErrorMessage(fieldValue, fieldType)); } break; case VARCHAR: case CHAR: if (!(fieldValue instanceof String)) { throw new IllegalArgumentException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + getTypeMismatchErrorMessage(fieldValue, fieldType)); } break; case TIME: if (!(fieldValue instanceof GregorianCalendar)) { throw new IllegalArgumentException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + getTypeMismatchErrorMessage(fieldValue, fieldType)); } break; case TIMESTAMP: case DATE: if (!(fieldValue instanceof Date)) { throw new IllegalArgumentException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); + getTypeMismatchErrorMessage(fieldValue, fieldType)); } break; default: @@ -152,6 +151,11 @@ public class BeamSqlRow implements Serializable { dataValues.set(index, fieldValue); } + private String getTypeMismatchErrorMessage(Object fieldValue, SqlTypeName fieldType) { + return String.format("[%s](%s) doesn't match type [%s]", + fieldValue, fieldValue.getClass(), fieldType); + } + public Object getFieldValue(String fieldName) { return getFieldValue(dataType.getFieldsName().indexOf(fieldName)); } @@ -270,6 +274,7 @@ public class BeamSqlRow implements Serializable { return fieldValue; } case TIMESTAMP: + case DATE: if (!(fieldValue instanceof Date)) { throw new IllegalArgumentException( String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java index d53ba8d..8be5212 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java @@ -91,6 +91,7 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { case TIME: longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), outStream); break; + case DATE: case TIMESTAMP: longCoder.encode(value.getDate(idx).getTime(), outStream); break; @@ -147,6 +148,7 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { calendar.setTime(new Date(longCoder.decode(inStream))); record.addField(idx, calendar); break; + case DATE: case TIMESTAMP: record.addField(idx, new Date(longCoder.decode(inStream))); break; http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java index ac395d3..6aa6e62 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java @@ -50,6 +50,7 @@ public class CalciteUtils { JAVA_TO_CALCITE_MAPPING.put(Types.CHAR, SqlTypeName.CHAR); JAVA_TO_CALCITE_MAPPING.put(Types.VARCHAR, SqlTypeName.VARCHAR); + JAVA_TO_CALCITE_MAPPING.put(Types.DATE, SqlTypeName.DATE); JAVA_TO_CALCITE_MAPPING.put(Types.TIME, SqlTypeName.TIME); JAVA_TO_CALCITE_MAPPING.put(Types.TIMESTAMP, SqlTypeName.TIMESTAMP); http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java index 3294592..8c0a28d 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java @@ -19,6 +19,7 @@ package org.apache.beam.dsls.sql; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; @@ -28,7 +29,6 @@ import org.apache.beam.sdk.transforms.DoFn; * Test utilities. */ public class TestUtils { - /** * A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code String}. */ @@ -116,6 +116,16 @@ public class TestUtils { * <p>Note: check the class javadoc for for detailed example. */ public RowsBuilder addRows(final Object... args) { + this.rows.addAll(buildRows(type, Arrays.asList(args))); + return this; + } + + /** + * Add rows to the builder. + * + * <p>Note: check the class javadoc for for detailed example. + */ + public RowsBuilder addRows(final List args) { this.rows.addAll(buildRows(type, args)); return this; } @@ -169,14 +179,14 @@ public class TestUtils { * ) * }</pre> */ - public static List<BeamSqlRow> buildRows(BeamSqlRecordType type, Object... args) { + public static List<BeamSqlRow> buildRows(BeamSqlRecordType type, List args) { List<BeamSqlRow> rows = new ArrayList<>(); int fieldCount = type.size(); - for (int i = 0; i < args.length; i += fieldCount) { + for (int i = 0; i < args.size(); i += fieldCount) { BeamSqlRow row = new BeamSqlRow(type); for (int j = 0; j < fieldCount; j++) { - row.addField(j, args[i + j]); + row.addField(j, args.get(i + j)); } rows.add(row); } http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java new file mode 100644 index 0000000..3d7bf28 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlArithmeticOperatorsIntegrationTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.integrationtest; + +import java.math.BigDecimal; +import org.junit.Test; + +/** + * Integration test for arithmetic operators. + */ +public class BeamSqlArithmeticOperatorsIntegrationTest + extends BeamSqlBuiltinFunctionsIntegrationTestBase { + + private static final BigDecimal ZERO = BigDecimal.valueOf(0.0); + private static final BigDecimal ONE0 = BigDecimal.valueOf(1); + private static final BigDecimal ONE = BigDecimal.valueOf(1.0); + private static final BigDecimal ONE2 = BigDecimal.valueOf(1.0).multiply(BigDecimal.valueOf(1.0)); + private static final BigDecimal TWO = BigDecimal.valueOf(2.0); + + @Test + public void testPlus() throws Exception { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("1 + 1", 2) + .addExpr("1.0 + 1", TWO) + .addExpr("1 + 1.0", TWO) + .addExpr("1.0 + 1.0", TWO) + .addExpr("c_tinyint + c_tinyint", (byte) 2) + .addExpr("c_smallint + c_smallint", (short) 2) + .addExpr("c_bigint + c_bigint", 2L) + .addExpr("c_decimal + c_decimal", TWO) + .addExpr("c_tinyint + c_decimal", TWO) + .addExpr("c_float + c_decimal", 2.0) + .addExpr("c_double + c_decimal", 2.0) + .addExpr("c_float + c_float", 2.0f) + .addExpr("c_double + c_float", 2.0) + .addExpr("c_double + c_double", 2.0) + .addExpr("c_float + c_bigint", 2.0f) + .addExpr("c_double + c_bigint", 2.0) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testPlus_overflow() throws Exception { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("c_tinyint_max + c_tinyint_max", -2) + .addExpr("c_smallint_max + c_smallint_max", -2) + .addExpr("c_integer_max + c_integer_max", -2) + // yeah, I know 384L is strange, but since it is already overflowed + // what the actualy result is not so important, it is wrong any way. + .addExpr("c_bigint_max + c_bigint_max", 384L) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testMinus() throws Exception { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("1 - 1", 0) + .addExpr("1.0 - 1", ZERO) + .addExpr("1 - 0.0", ONE) + .addExpr("1.0 - 1.0", ZERO) + .addExpr("c_tinyint - c_tinyint", (byte) 0) + .addExpr("c_smallint - c_smallint", (short) 0) + .addExpr("c_bigint - c_bigint", 0L) + .addExpr("c_decimal - c_decimal", ZERO) + .addExpr("c_tinyint - c_decimal", ZERO) + .addExpr("c_float - c_decimal", 0.0) + .addExpr("c_double - c_decimal", 0.0) + .addExpr("c_float - c_float", 0.0f) + .addExpr("c_double - c_float", 0.0) + .addExpr("c_double - c_double", 0.0) + .addExpr("c_float - c_bigint", 0.0f) + .addExpr("c_double - c_bigint", 0.0) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testMultiply() throws Exception { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("1 * 1", 1) + .addExpr("1.0 * 1", ONE2) + .addExpr("1 * 1.0", ONE2) + .addExpr("1.0 * 1.0", ONE2) + .addExpr("c_tinyint * c_tinyint", (byte) 1) + .addExpr("c_smallint * c_smallint", (short) 1) + .addExpr("c_bigint * c_bigint", 1L) + .addExpr("c_decimal * c_decimal", ONE2) + .addExpr("c_tinyint * c_decimal", ONE2) + .addExpr("c_float * c_decimal", 1.0) + .addExpr("c_double * c_decimal", 1.0) + .addExpr("c_float * c_float", 1.0f) + .addExpr("c_double * c_float", 1.0) + .addExpr("c_double * c_double", 1.0) + .addExpr("c_float * c_bigint", 1.0f) + .addExpr("c_double * c_bigint", 1.0) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testDivide() throws Exception { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("1 / 1", 1) + .addExpr("1.0 / 1", ONE0) + .addExpr("1 / 1.0", ONE0) + .addExpr("1.0 / 1.0", ONE0) + .addExpr("c_tinyint / c_tinyint", (byte) 1) + .addExpr("c_smallint / c_smallint", (short) 1) + .addExpr("c_bigint / c_bigint", 1L) + .addExpr("c_decimal / c_decimal", ONE0) + .addExpr("c_tinyint / c_decimal", ONE0) + .addExpr("c_float / c_decimal", 1.0) + .addExpr("c_double / c_decimal", 1.0) + .addExpr("c_float / c_float", 1.0f) + .addExpr("c_double / c_float", 1.0) + .addExpr("c_double / c_double", 1.0) + .addExpr("c_float / c_bigint", 1.0f) + .addExpr("c_double / c_bigint", 1.0) + ; + + checker.buildRunAndCheck(); + } + + @Test + public void testMod() throws Exception { + ExpressionChecker checker = new ExpressionChecker() + .addExpr("mod(1, 1)", 0) + .addExpr("mod(1.0, 1)", 0) + .addExpr("mod(1, 1.0)", ZERO) + .addExpr("mod(1.0, 1.0)", ZERO) + .addExpr("mod(c_tinyint, c_tinyint)", (byte) 0) + .addExpr("mod(c_smallint, c_smallint)", (short) 0) + .addExpr("mod(c_bigint, c_bigint)", 0L) + .addExpr("mod(c_decimal, c_decimal)", ZERO) + .addExpr("mod(c_tinyint, c_decimal)", ZERO) + ; + + checker.buildRunAndCheck(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java new file mode 100644 index 0000000..e65e747 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.integrationtest; + +import com.google.common.base.Joiner; +import java.math.BigDecimal; +import java.sql.Types; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import org.apache.beam.dsls.sql.BeamSql; +import org.apache.beam.dsls.sql.TestUtils; +import org.apache.beam.dsls.sql.mock.MockedBoundedTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.util.Pair; +import org.junit.Rule; + +/** + * Base class for all built-in functions integration tests. + */ +public class BeamSqlBuiltinFunctionsIntegrationTestBase { + private static final Map<Class, Integer> JAVA_CLASS_TO_SQL_TYPE = new HashMap<>(); + static { + JAVA_CLASS_TO_SQL_TYPE.put(Byte.class, Types.TINYINT); + JAVA_CLASS_TO_SQL_TYPE.put(Short.class, Types.SMALLINT); + JAVA_CLASS_TO_SQL_TYPE.put(Integer.class, Types.INTEGER); + JAVA_CLASS_TO_SQL_TYPE.put(Long.class, Types.BIGINT); + JAVA_CLASS_TO_SQL_TYPE.put(Float.class, Types.FLOAT); + JAVA_CLASS_TO_SQL_TYPE.put(Double.class, Types.DOUBLE); + JAVA_CLASS_TO_SQL_TYPE.put(BigDecimal.class, Types.DECIMAL); + JAVA_CLASS_TO_SQL_TYPE.put(String.class, Types.VARCHAR); + JAVA_CLASS_TO_SQL_TYPE.put(Date.class, Types.DATE); + } + + @Rule + public final TestPipeline pipeline = TestPipeline.create(); + + protected PCollection<BeamSqlRow> getTestPCollection() { + BeamSqlRecordType type = BeamSqlRecordType.create( + Arrays.asList("ts", "c_tinyint", "c_smallint", + "c_integer", "c_bigint", "c_float", "c_double", "c_decimal", + "c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"), + Arrays.asList(Types.DATE, Types.TINYINT, Types.SMALLINT, + Types.INTEGER, Types.BIGINT, Types.FLOAT, Types.DOUBLE, Types.DECIMAL, + Types.TINYINT, Types.SMALLINT, Types.INTEGER, Types.BIGINT) + ); + try { + return MockedBoundedTable + .of(type) + .addRows( + parseDate("1986-02-15 11:35:26"), + (byte) 1, + (short) 1, + 1, + 1L, + 1.0f, + 1.0, + BigDecimal.ONE, + (byte) 127, + (short) 32767, + 2147483647, + 9223372036854775807L + ) + .buildIOReader(pipeline) + .setCoder(new BeamSqlRowCoder(type)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + protected static Date parseDate(String str) { + try { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + sdf.setTimeZone(TimeZone.getTimeZone("GMT")); + return sdf.parse(str); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + /** + * Helper class to make write integration test for built-in functions easier. + * + * <p>example usage: + * <pre>{@code + * ExpressionChecker checker = new ExpressionChecker() + * .addExpr("1 + 1", 2) + * .addExpr("1.0 + 1", 2.0) + * .addExpr("1 + 1.0", 2.0) + * .addExpr("1.0 + 1.0", 2.0) + * .addExpr("c_tinyint + c_tinyint", (byte) 2); + * checker.buildRunAndCheck(inputCollections); + * }</pre> + */ + public class ExpressionChecker { + private transient List<Pair<String, Object>> exps = new ArrayList<>(); + + public ExpressionChecker addExpr(String expression, Object expectedValue) { + exps.add(Pair.of(expression, expectedValue)); + return this; + } + + private String getSql() { + List<String> expStrs = new ArrayList<>(); + for (Pair<String, Object> pair : exps) { + expStrs.add(pair.getKey()); + } + return "SELECT " + Joiner.on(",\n ").join(expStrs) + " FROM PCOLLECTION"; + } + + /** + * Build the corresponding SQL, compile to Beam Pipeline, run it, and check the result. + */ + public void buildRunAndCheck() { + PCollection<BeamSqlRow> inputCollection = getTestPCollection(); + System.out.println("SQL:>\n" + getSql()); + try { + List<String> names = new ArrayList<>(); + List<Integer> types = new ArrayList<>(); + List<Object> values = new ArrayList<>(); + + for (Pair<String, Object> pair : exps) { + names.add(pair.getKey()); + types.add(JAVA_CLASS_TO_SQL_TYPE.get(pair.getValue().getClass())); + values.add(pair.getValue()); + } + + PCollection<BeamSqlRow> rows = inputCollection.apply(BeamSql.simpleQuery(getSql())); + PAssert.that(rows).containsInAnyOrder( + TestUtils.RowsBuilder + .of(BeamSqlRecordType.create(names, types)) + .addRows(values) + .getRows() + ); + inputCollection.getPipeline().run(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java index 11465f5..e28581f 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlStringFunctionsIntegrationTest.java @@ -17,76 +17,35 @@ */ package org.apache.beam.dsls.sql.integrationtest; -import java.sql.Types; -import org.apache.beam.dsls.sql.BeamSqlCli; -import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.TestUtils; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.junit.Rule; import org.junit.Test; /** * Integration test for string functions. */ -public class BeamSqlStringFunctionsIntegrationTest { - static BeamSqlEnv sqlEnv = new BeamSqlEnv(); - - @Rule - public final TestPipeline pipeline = TestPipeline.create(); - +public class BeamSqlStringFunctionsIntegrationTest + extends BeamSqlBuiltinFunctionsIntegrationTestBase { @Test public void testStringFunctions() throws Exception { - String sql = "SELECT " - + "'hello' || ' world' as concat," - + "CHAR_LENGTH('hello') as cl," - + "CHARACTER_LENGTH('hello') as cl1," - + "UPPER('hello') as up," - + "LOWER('HELLO') as lo," - + "POSITION('world' IN 'helloworld') as po," - + "POSITION('world' IN 'helloworldworld' FROM 7) as po1," - + "TRIM(' hello ') as tr," - + "TRIM(LEADING ' ' FROM ' hello ') as tr1," - + "TRIM(TRAILING ' ' FROM ' hello ') as tr2," - + "TRIM(BOTH ' ' FROM ' hello ') as tr3," - + "OVERLAY('w3333333rce' PLACING 'resou' FROM 3) as ol," - + "SUBSTRING('hello' FROM 2) as ss," - + "SUBSTRING('hello' FROM 2 FOR 2) as ss1," - + "INITCAP('hello world') as ss1" - ; + ExpressionChecker checker = new ExpressionChecker() + .addExpr("'hello' || ' world'", "hello world") + .addExpr("CHAR_LENGTH('hello')", 5) + .addExpr("CHARACTER_LENGTH('hello')", 5) + .addExpr("UPPER('hello')", "HELLO") + .addExpr("LOWER('HELLO')", "hello") - PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, sqlEnv); - PAssert.that(rows).containsInAnyOrder( - TestUtils.RowsBuilder.of( - // 1 -> 5 - Types.VARCHAR, "concat", - Types.INTEGER, "cl", - Types.INTEGER, "cl1", - Types.VARCHAR, "up", - Types.VARCHAR, "lo", - // 6 -> 10 - Types.INTEGER, "po", - Types.INTEGER, "po1", - Types.VARCHAR, "tr", - Types.VARCHAR, "tr1", - Types.VARCHAR, "tr2", - // 11 -> 15 - Types.VARCHAR, "tr3", - Types.VARCHAR, "ol", - Types.VARCHAR, "ss", - Types.VARCHAR, "ss1", - Types.VARCHAR, "ic" - ).addRows( - // 1 -> 5(lo) - "hello world", 5, 5, "HELLO", "hello", - // 6 -> 10() - 5, 10, "hello", "hello ", " hello", - // 11 -> 15 - "hello", "w3resou3rce", "ello", "el", "Hello World" - ).getRows()); - pipeline.run(); - } + .addExpr("POSITION('world' IN 'helloworld')", 5) + .addExpr("POSITION('world' IN 'helloworldworld' FROM 7)", 10) + .addExpr("TRIM(' hello ')", "hello") + .addExpr("TRIM(LEADING ' ' FROM ' hello ')", "hello ") + .addExpr("TRIM(TRAILING ' ' FROM ' hello ')", " hello") + .addExpr("TRIM(BOTH ' ' FROM ' hello ')", "hello") + .addExpr("OVERLAY('w3333333rce' PLACING 'resou' FROM 3)", "w3resou3rce") + .addExpr("SUBSTRING('hello' FROM 2)", "ello") + .addExpr("SUBSTRING('hello' FROM 2 FOR 2)", "el") + .addExpr("INITCAP('hello world')", "Hello World") + ; + + checker.buildRunAndCheck(); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java index fc28180..a34f109 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.List; - import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; @@ -82,10 +81,10 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase { @Test public void testPlus() { List<BeamSqlExpression> operands = new ArrayList<>(); - // integer + integer => long + // integer + integer => integer operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - assertEquals(2L, new BeamSqlPlusExpression(operands).evaluate(record).getValue()); + assertEquals(2, new BeamSqlPlusExpression(operands).evaluate(record).getValue()); // integer + long => long operands.clear(); @@ -99,11 +98,11 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase { operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L)); assertEquals(2L, new BeamSqlPlusExpression(operands).evaluate(record).getValue()); - // float + long => double + // float + long => float operands.clear(); operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 1.1F)); operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L)); - assertEquals(Double.valueOf(Double.valueOf(1.1F) + 1), + assertEquals(Float.valueOf(1.1F + 1), new BeamSqlPlusExpression(operands).evaluate(record).getValue()); // double + long => double @@ -119,7 +118,7 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase { // integer + integer => long operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2)); operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - assertEquals(1L, new BeamSqlMinusExpression(operands).evaluate(record).getValue()); + assertEquals(1, new BeamSqlMinusExpression(operands).evaluate(record).getValue()); // integer + long => long operands.clear(); @@ -137,8 +136,8 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase { operands.clear(); operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F)); operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L)); - assertEquals(Double.valueOf(Double.valueOf(2.1F) - 1), - new BeamSqlMinusExpression(operands).evaluate(record).getValue()); + assertEquals(2.1F - 1L, + new BeamSqlMinusExpression(operands).evaluate(record).getValue().floatValue(), 0.1); // double + long => double operands.clear(); @@ -150,10 +149,10 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase { @Test public void testMultiply() { List<BeamSqlExpression> operands = new ArrayList<>(); - // integer + integer => long + // integer + integer => integer operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2)); operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - assertEquals(2L, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue()); + assertEquals(2, new BeamSqlMultiplyExpression(operands).evaluate(record).getValue()); // integer + long => long operands.clear(); @@ -171,7 +170,7 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase { operands.clear(); operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F)); operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L)); - assertEquals(Double.valueOf(Double.valueOf(2.1F) * 1), + assertEquals(Float.valueOf(2.1F * 1L), new BeamSqlMultiplyExpression(operands).evaluate(record).getValue()); // double + long => double @@ -184,10 +183,10 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase { @Test public void testDivide() { List<BeamSqlExpression> operands = new ArrayList<>(); - // integer + integer => long + // integer + integer => integer operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2)); operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)); - assertEquals(2L, new BeamSqlDivideExpression(operands).evaluate(record).getValue()); + assertEquals(2, new BeamSqlDivideExpression(operands).evaluate(record).getValue()); // integer + long => long operands.clear(); @@ -205,7 +204,7 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase { operands.clear(); operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 2.1F)); operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1L)); - assertEquals(Double.valueOf(Double.valueOf(2.1F) / 1), + assertEquals(2.1F / 1, new BeamSqlDivideExpression(operands).evaluate(record).getValue()); // double + long => double @@ -221,7 +220,7 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase { // integer + integer => long operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 3)); operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 2)); - assertEquals(1L, new BeamSqlModExpression(operands).evaluate(record).getValue()); + assertEquals(1, new BeamSqlModExpression(operands).evaluate(record).getValue()); // integer + long => long operands.clear(); @@ -234,18 +233,5 @@ public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase { operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 3L)); operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L)); assertEquals(1L, new BeamSqlModExpression(operands).evaluate(record).getValue()); - - // float + long => double - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.FLOAT, 3.1F)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L)); - assertEquals(Double.valueOf(Double.valueOf(3.1F) % 2), - new BeamSqlModExpression(operands).evaluate(record).getValue()); - - // double + long => double - operands.clear(); - operands.add(BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 3.1)); - operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT, 2L)); - assertEquals(1.1, new BeamSqlModExpression(operands).evaluate(record).getValue()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java index 0fb8a80..84f49a9 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java @@ -21,6 +21,7 @@ import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRecordType; import static org.apache.beam.dsls.sql.TestUtils.buildRows; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.beam.dsls.sql.schema.BeamIOType; @@ -65,6 +66,13 @@ public class MockedBoundedTable extends MockedTable { return new MockedBoundedTable(buildBeamSqlRecordType(args)); } + /** + * Build a mocked bounded table with the specified type. + */ + public static MockedBoundedTable of(final BeamSqlRecordType type) { + return new MockedBoundedTable(type); + } + /** * Add rows to the builder. @@ -80,7 +88,7 @@ public class MockedBoundedTable extends MockedTable { * }</pre> */ public MockedBoundedTable addRows(Object... args) { - List<BeamSqlRow> rows = buildRows(getRecordType(), args); + List<BeamSqlRow> rows = buildRows(getRecordType(), Arrays.asList(args)); this.rows.addAll(rows); return this; } http://git-wip-us.apache.org/repos/asf/beam/blob/a976ec04/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java index 12d8d37..0f8c912 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java @@ -22,6 +22,7 @@ import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRecordType; import static org.apache.beam.dsls.sql.TestUtils.buildRows; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.beam.dsls.sql.schema.BeamIOType; import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; @@ -84,7 +85,7 @@ public class MockedUnboundedTable extends MockedTable { * }</pre> */ public MockedUnboundedTable addRows(Duration duration, Object... args) { - List<BeamSqlRow> rows = buildRows(getRecordType(), args); + List<BeamSqlRow> rows = buildRows(getRecordType(), Arrays.asList(args)); // record the watermark + rows this.timestampedRows.add(Pair.of(duration, rows)); return this;