http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java new file mode 100644 index 0000000..9d2815c --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.io.Serializable; +import java.util.List; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} is an equivalent expression in BeamSQL, of {@link RexNode} in Calcite. + * + * <p>An implementation of {@link BeamSqlExpression} takes one or more {@code BeamSqlExpression} + * as its operands, and return a value with type {@link SqlTypeName}. + * + */ +public abstract class BeamSqlExpression implements Serializable { + protected List<BeamSqlExpression> operands; + protected SqlTypeName outputType; + + protected BeamSqlExpression(){} + + public BeamSqlExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { + this.operands = operands; + this.outputType = outputType; + } + + public BeamSqlExpression op(int idx) { + return operands.get(idx); + } + + public SqlTypeName opType(int idx) { + return op(idx).getOutputType(); + } + + public <T> T opValueEvaluated(int idx, BeamSqlRow row) { + return (T) op(idx).evaluate(row).getValue(); + } + + /** + * assertion to make sure the input and output are supported in this expression. + */ + public abstract boolean accept(); + + /** + * Apply input record {@link BeamSqlRow} to this expression, + * the output value is wrapped with {@link BeamSqlPrimitive}. + */ + public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRow); + + public List<BeamSqlExpression> getOperands() { + return operands; + } + + public SqlTypeName getOutputType() { + return outputType; + } + + public int numberOfOperands() { + return operands.size(); + } +}
http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java new file mode 100644 index 0000000..710460b --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * An primitive operation for direct field extraction. + */ +public class BeamSqlInputRefExpression extends BeamSqlExpression { + private int inputRef; + + public BeamSqlInputRefExpression(SqlTypeName sqlTypeName, int inputRef) { + super(null, sqlTypeName); + this.inputRef = inputRef; + } + + @Override + public boolean accept() { + return true; + } + + @Override + public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java new file mode 100644 index 0000000..51724bb --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.math.BigDecimal; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.List; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.NlsString; + +/** + * {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}. + * It holds the value, and return it directly during {@link #evaluate(BeamSqlRow)}. + * + */ +public class BeamSqlPrimitive<T> extends BeamSqlExpression { + private T value; + + private BeamSqlPrimitive() { + } + + private BeamSqlPrimitive(List<BeamSqlExpression> operands, SqlTypeName outputType) { + super(operands, outputType); + } + + /** + * A builder function to create from Type and value directly. + */ + public static <T> BeamSqlPrimitive<T> of(SqlTypeName outputType, T value){ + BeamSqlPrimitive<T> exp = new BeamSqlPrimitive<>(); + exp.outputType = outputType; + exp.value = value; + if (!exp.accept()) { + throw new IllegalArgumentException( + String.format("value [%s] doesn't match type [%s].", value, outputType)); + } + return exp; + } + + public SqlTypeName getOutputType() { + return outputType; + } + + public T getValue() { + return value; + } + + public long getLong() { + return (Long) getValue(); + } + + public double getDouble() { + return (Double) getValue(); + } + + public float getFloat() { + return (Float) getValue(); + } + + public int getInteger() { + return (Integer) getValue(); + } + + public short getShort() { + return (Short) getValue(); + } + + public byte getByte() { + return (Byte) getValue(); + } + public boolean getBoolean() { + return (Boolean) getValue(); + } + + public String getString() { + return (String) getValue(); + } + + public Date getDate() { + return (Date) getValue(); + } + + public BigDecimal getDecimal() { + return (BigDecimal) getValue(); + } + + @Override + public boolean accept() { + if (value == null) { + return true; + } + + switch (outputType) { + case BIGINT: + return value instanceof Long; + case DECIMAL: + return value instanceof BigDecimal; + case DOUBLE: + return value instanceof Double; + case FLOAT: + return value instanceof Float; + case INTEGER: + return value instanceof Integer; + case SMALLINT: + return value instanceof Short; + case TINYINT: + return value instanceof Byte; + case BOOLEAN: + return value instanceof Boolean; + case CHAR: + case VARCHAR: + return value instanceof String || value instanceof NlsString; + case TIME: + return value instanceof GregorianCalendar; + case TIMESTAMP: + case DATE: + return value instanceof Date; + case INTERVAL_HOUR: + return value instanceof BigDecimal; + case INTERVAL_MINUTE: + return value instanceof BigDecimal; + case SYMBOL: + // for SYMBOL, it supports anything... + return true; + default: + throw new UnsupportedOperationException(outputType.name()); + } + } + + @Override + public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRow) { + return this; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java new file mode 100644 index 0000000..efdb2df --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.List; + +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for REINTERPRET. + * + * <p>Currently only converting from {@link SqlTypeName#DATETIME_TYPES} + * to {@code BIGINT} is supported. + */ +public class BeamSqlReinterpretExpression extends BeamSqlExpression { + public BeamSqlReinterpretExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { + super(operands, outputType); + } + + @Override public boolean accept() { + return getOperands().size() == 1 + && outputType == SqlTypeName.BIGINT + && SqlTypeName.DATETIME_TYPES.contains(opType(0)); + } + + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + if (opType(0) == SqlTypeName.TIME) { + GregorianCalendar date = opValueEvaluated(0, inputRow); + return BeamSqlPrimitive.of(outputType, date.getTimeInMillis()); + + } else { + Date date = opValueEvaluated(0, inputRow); + return BeamSqlPrimitive.of(outputType, date.getTime()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java new file mode 100644 index 0000000..e389ef9 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * invoke a UDF function. + */ +public class BeamSqlUdfExpression extends BeamSqlExpression { + //as Method is not Serializable, need to keep class/method information, and rebuild it. + private transient Method method; + private String className; + private String methodName; + private List<String> paraClassName = new ArrayList<>(); + + public BeamSqlUdfExpression(Method method, List<BeamSqlExpression> subExps, + SqlTypeName sqlTypeName) { + super(subExps, sqlTypeName); + this.method = method; + + this.className = method.getDeclaringClass().getName(); + this.methodName = method.getName(); + for (Class<?> c : method.getParameterTypes()) { + paraClassName.add(c.getName()); + } + } + + @Override + public boolean accept() { + return true; + } + + @Override + public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + if (method == null) { + reConstructMethod(); + } + try { + List<Object> paras = new ArrayList<>(); + for (BeamSqlExpression e : getOperands()) { + paras.add(e.evaluate(inputRow).getValue()); + } + + return BeamSqlPrimitive.of(getOutputType(), + method.invoke(null, paras.toArray(new Object[]{}))); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /** + * re-construct method from class/method. + */ + private void reConstructMethod() { + try { + List<Class<?>> paraClass = new ArrayList<>(); + for (String pc : paraClassName) { + paraClass.add(Class.forName(pc)); + } + method = Class.forName(className).getMethod(methodName, paraClass.toArray(new Class<?>[] {})); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java new file mode 100644 index 0000000..ecc6939 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.util.Date; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for {@code HOP_END}, {@code TUMBLE_END}, {@code SESSION_END} operation. + * + * <p>These operators returns the <em>end</em> timestamp of window. + */ +public class BeamSqlWindowEndExpression extends BeamSqlExpression { + + @Override + public boolean accept() { + return true; + } + + @Override + public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) { + return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, + new Date(inputRow.getWindowEnd().getMillis())); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java new file mode 100644 index 0000000..71f0672 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.util.Date; +import java.util.List; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for {@code HOP}, {@code TUMBLE}, {@code SESSION} operation. + * + * <p>These functions don't change the timestamp field, instead it's used to indicate + * the event_timestamp field, and how the window is defined. + */ +public class BeamSqlWindowExpression extends BeamSqlExpression { + + public BeamSqlWindowExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { + super(operands, outputType); + } + + @Override + public boolean accept() { + return operands.get(0).getOutputType().equals(SqlTypeName.DATE) + || operands.get(0).getOutputType().equals(SqlTypeName.TIME) + || operands.get(0).getOutputType().equals(SqlTypeName.TIMESTAMP); + } + + @Override + public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) { + return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, + (Date) operands.get(0).evaluate(inputRow).getValue()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java new file mode 100644 index 0000000..f3aba2e --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.util.Date; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for {@code HOP_START}, {@code TUMBLE_START}, + * {@code SESSION_START} operation. + * + * <p>These operators returns the <em>start</em> timestamp of window. + */ +public class BeamSqlWindowStartExpression extends BeamSqlExpression { + + @Override + public boolean accept() { + return true; + } + + @Override + public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) { + return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, + new Date(inputRow.getWindowStart().getMillis())); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java new file mode 100644 index 0000000..d62123c --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator.arithmetic; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Base class for all arithmetic operators. + */ +public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression { + private static final List<SqlTypeName> ORDERED_APPROX_TYPES = new ArrayList<>(); + static { + ORDERED_APPROX_TYPES.add(SqlTypeName.TINYINT); + ORDERED_APPROX_TYPES.add(SqlTypeName.SMALLINT); + ORDERED_APPROX_TYPES.add(SqlTypeName.INTEGER); + ORDERED_APPROX_TYPES.add(SqlTypeName.BIGINT); + ORDERED_APPROX_TYPES.add(SqlTypeName.FLOAT); + ORDERED_APPROX_TYPES.add(SqlTypeName.DOUBLE); + ORDERED_APPROX_TYPES.add(SqlTypeName.DECIMAL); + } + + protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands) { + super(operands, deduceOutputType(operands.get(0).getOutputType(), + operands.get(1).getOutputType())); + } + + protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { + super(operands, outputType); + } + + @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) { + BigDecimal left = BigDecimal.valueOf( + Double.valueOf(opValueEvaluated(0, inputRow).toString())); + BigDecimal right = BigDecimal.valueOf( + Double.valueOf(opValueEvaluated(1, inputRow).toString())); + + BigDecimal result = calc(left, right); + return getCorrectlyTypedResult(result); + } + + protected abstract BigDecimal calc(BigDecimal left, BigDecimal right); + + protected static SqlTypeName deduceOutputType(SqlTypeName left, SqlTypeName right) { + int leftIndex = ORDERED_APPROX_TYPES.indexOf(left); + int rightIndex = ORDERED_APPROX_TYPES.indexOf(right); + if ((left == SqlTypeName.FLOAT || right == SqlTypeName.FLOAT) + && (left == SqlTypeName.DECIMAL || right == SqlTypeName.DECIMAL)) { + return SqlTypeName.DOUBLE; + } + + if (leftIndex < rightIndex) { + return right; + } else if (leftIndex > rightIndex) { + return left; + } else { + return left; + } + } + + @Override public boolean accept() { + if (operands.size() != 2) { + return false; + } + + for (BeamSqlExpression operand : operands) { + if (!SqlTypeName.NUMERIC_TYPES.contains(operand.getOutputType())) { + return false; + } + } + return true; + } + + protected BeamSqlPrimitive<? extends Number> getCorrectlyTypedResult(BigDecimal rawResult) { + Number actualValue; + switch (outputType) { + case TINYINT: + actualValue = rawResult.byteValue(); + break; + case SMALLINT: + actualValue = rawResult.shortValue(); + break; + case INTEGER: + actualValue = rawResult.intValue(); + break; + case BIGINT: + actualValue = rawResult.longValue(); + break; + case FLOAT: + actualValue = rawResult.floatValue(); + break; + case DOUBLE: + actualValue = rawResult.doubleValue(); + break; + case DECIMAL: + default: + actualValue = rawResult; + } + return BeamSqlPrimitive.of(outputType, actualValue); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java new file mode 100644 index 0000000..c5fe02b --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator.arithmetic; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; + +/** + * '/' operator. + */ +public class BeamSqlDivideExpression extends BeamSqlArithmeticExpression { + public BeamSqlDivideExpression(List<BeamSqlExpression> operands) { + super(operands); + } + + @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) { + return left.divide(right, 10, RoundingMode.HALF_EVEN); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java new file mode 100644 index 0000000..fe08870 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator.arithmetic; + +import java.math.BigDecimal; +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; + +/** + * '-' operator. + */ +public class BeamSqlMinusExpression extends BeamSqlArithmeticExpression { + public BeamSqlMinusExpression(List<BeamSqlExpression> operands) { + super(operands); + } + + @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) { + return left.subtract(right); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java new file mode 100644 index 0000000..11ecf25 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlModExpression.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator.arithmetic; + +import java.math.BigDecimal; +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; + +/** + * '%' operator. + */ +public class BeamSqlModExpression extends BeamSqlArithmeticExpression { + public BeamSqlModExpression(List<BeamSqlExpression> operands) { + super(operands, operands.get(1).getOutputType()); + } + + @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) { + return BigDecimal.valueOf(left.doubleValue() % right.doubleValue()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java new file mode 100644 index 0000000..e16d3cb --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlMultiplyExpression.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator.arithmetic; + +import java.math.BigDecimal; +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; + +/** + * '*' operator. + */ +public class BeamSqlMultiplyExpression extends BeamSqlArithmeticExpression { + public BeamSqlMultiplyExpression(List<BeamSqlExpression> operands) { + super(operands); + } + + @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) { + return left.multiply(right); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java new file mode 100644 index 0000000..5804279 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlPlusExpression.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator.arithmetic; + +import java.math.BigDecimal; +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; + +/** + * '+' operator. + */ +public class BeamSqlPlusExpression extends BeamSqlArithmeticExpression { + public BeamSqlPlusExpression(List<BeamSqlExpression> operands) { + super(operands); + } + + @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) { + return left.add(right); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/package-info.java new file mode 100644 index 0000000..b8f2175 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Arithmetic operators. + */ +package org.apache.beam.dsls.sql.interpreter.operator.arithmetic; http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java new file mode 100644 index 0000000..80f0853 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlCompareExpression.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator.comparison; + +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@link BeamSqlCompareExpression} is used for compare operations. + * + * <p>See {@link BeamSqlEqualsExpression}, {@link BeamSqlLessThanExpression}, + * {@link BeamSqlLessThanOrEqualsExpression}, {@link BeamSqlGreaterThanExpression}, + * {@link BeamSqlGreaterThanOrEqualsExpression} and {@link BeamSqlNotEqualsExpression} + * for more details. + * + */ +public abstract class BeamSqlCompareExpression extends BeamSqlExpression { + + private BeamSqlCompareExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { + super(operands, outputType); + } + + public BeamSqlCompareExpression(List<BeamSqlExpression> operands) { + this(operands, SqlTypeName.BOOLEAN); + } + + /** + * Compare operation must have 2 operands. + */ + @Override + public boolean accept() { + return operands.size() == 2; + } + + @Override + public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) { + Object leftValue = operands.get(0).evaluate(inputRow).getValue(); + Object rightValue = operands.get(1).evaluate(inputRow).getValue(); + switch (operands.get(0).getOutputType()) { + case BIGINT: + case DECIMAL: + case DOUBLE: + case FLOAT: + case INTEGER: + case SMALLINT: + case TINYINT: + return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, + compare((Number) leftValue, (Number) rightValue)); + case BOOLEAN: + return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, + compare((Boolean) leftValue, (Boolean) rightValue)); + case VARCHAR: + return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, + compare((CharSequence) leftValue, (CharSequence) rightValue)); + default: + throw new UnsupportedOperationException(toString()); + } + } + + /** + * Compare between String values, mapping to {@link SqlTypeName#VARCHAR}. + */ + public abstract Boolean compare(CharSequence leftValue, CharSequence rightValue); + + /** + * Compare between Boolean values, mapping to {@link SqlTypeName#BOOLEAN}. + */ + public abstract Boolean compare(Boolean leftValue, Boolean rightValue); + + /** + * Compare between Number values, including {@link SqlTypeName#BIGINT}, + * {@link SqlTypeName#DECIMAL}, {@link SqlTypeName#DOUBLE}, {@link SqlTypeName#FLOAT}, + * {@link SqlTypeName#INTEGER}, {@link SqlTypeName#SMALLINT} and {@link SqlTypeName#TINYINT}. + */ + public abstract Boolean compare(Number leftValue, Number rightValue); + + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java new file mode 100644 index 0000000..40b015e --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlEqualsExpression.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator.comparison; + +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; + +/** + * {@code BeamSqlExpression} for {@code =} operation. + */ +public class BeamSqlEqualsExpression extends BeamSqlCompareExpression { + + public BeamSqlEqualsExpression(List<BeamSqlExpression> operands) { + super(operands); + } + + @Override + public Boolean compare(CharSequence leftValue, CharSequence rightValue) { + return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) == 0; + } + + @Override + public Boolean compare(Boolean leftValue, Boolean rightValue) { + return !(leftValue ^ rightValue); + } + + @Override + public Boolean compare(Number leftValue, Number rightValue) { + return (leftValue == null && rightValue == null) + || (leftValue != null && rightValue != null + && leftValue.floatValue() == (rightValue).floatValue()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java new file mode 100644 index 0000000..8bfa511 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanExpression.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator.comparison; + +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; + +/** + * {@code BeamSqlExpression} for {@code >} operation. + */ +public class BeamSqlGreaterThanExpression extends BeamSqlCompareExpression { + + public BeamSqlGreaterThanExpression(List<BeamSqlExpression> operands) { + super(operands); + } + + @Override + public Boolean compare(CharSequence leftValue, CharSequence rightValue) { + return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) > 0; + } + + @Override + public Boolean compare(Boolean leftValue, Boolean rightValue) { + throw new IllegalArgumentException("> is not supported for Boolean."); + } + + @Override + public Boolean compare(Number leftValue, Number rightValue) { + return (leftValue == null && rightValue == null) + || (leftValue != null && rightValue != null + && leftValue.floatValue() > (rightValue).floatValue()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanOrEqualsExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanOrEqualsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanOrEqualsExpression.java new file mode 100644 index 0000000..54faa35 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlGreaterThanOrEqualsExpression.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator.comparison; + +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; + +/** + * {@code BeamSqlExpression} for {@code >=} operation. + */ +public class BeamSqlGreaterThanOrEqualsExpression extends BeamSqlCompareExpression { + + public BeamSqlGreaterThanOrEqualsExpression(List<BeamSqlExpression> operands) { + super(operands); + } + + @Override + public Boolean compare(CharSequence leftValue, CharSequence rightValue) { + return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) >= 0; + } + + @Override + public Boolean compare(Boolean leftValue, Boolean rightValue) { + throw new IllegalArgumentException(">= is not supported for Boolean."); + } + + @Override + public Boolean compare(Number leftValue, Number rightValue) { + return (leftValue == null && rightValue == null) + || (leftValue != null && rightValue != null + && leftValue.floatValue() >= (rightValue).floatValue()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java new file mode 100644 index 0000000..6d93c5d --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNotNullExpression.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator.comparison; + +import java.util.Arrays; +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for 'IS NOT NULL' operation. + */ +public class BeamSqlIsNotNullExpression extends BeamSqlExpression { + + private BeamSqlIsNotNullExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { + super(operands, outputType); + } + + public BeamSqlIsNotNullExpression(BeamSqlExpression operand){ + this(Arrays.asList(operand), SqlTypeName.BOOLEAN); + } + + /** + * only one operand is required. + */ + @Override + public boolean accept() { + return operands.size() == 1; + } + + @Override + public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) { + Object leftValue = operands.get(0).evaluate(inputRow).getValue(); + return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNullExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNullExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNullExpression.java new file mode 100644 index 0000000..4450f3a --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlIsNullExpression.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator.comparison; + +import java.util.Arrays; +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for 'IS NULL' operation. + */ +public class BeamSqlIsNullExpression extends BeamSqlExpression { + + private BeamSqlIsNullExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { + super(operands, outputType); + } + + public BeamSqlIsNullExpression(BeamSqlExpression operand){ + this(Arrays.asList(operand), SqlTypeName.BOOLEAN); + } + + /** + * only one operand is required. + */ + @Override + public boolean accept() { + return operands.size() == 1; + } + + @Override + public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) { + Object leftValue = operands.get(0).evaluate(inputRow).getValue(); + return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanExpression.java new file mode 100644 index 0000000..7ae6dad --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanExpression.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator.comparison; + +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; + +/** + * {@code BeamSqlExpression} for {@code <} operation. + */ +public class BeamSqlLessThanExpression extends BeamSqlCompareExpression { + + public BeamSqlLessThanExpression(List<BeamSqlExpression> operands) { + super(operands); + } + + @Override + public Boolean compare(CharSequence leftValue, CharSequence rightValue) { + return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) < 0; + } + + @Override + public Boolean compare(Boolean leftValue, Boolean rightValue) { + throw new IllegalArgumentException("< is not supported for Boolean."); + } + + @Override + public Boolean compare(Number leftValue, Number rightValue) { + return (leftValue == null && rightValue == null) + || (leftValue != null && rightValue != null + && leftValue.floatValue() < (rightValue).floatValue()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanOrEqualsExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanOrEqualsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanOrEqualsExpression.java new file mode 100644 index 0000000..4a2cef2 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlLessThanOrEqualsExpression.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator.comparison; + +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; + +/** + * {@code BeamSqlExpression} for {@code <=} operation. + */ +public class BeamSqlLessThanOrEqualsExpression extends BeamSqlCompareExpression { + + public BeamSqlLessThanOrEqualsExpression(List<BeamSqlExpression> operands) { + super(operands); + } + + @Override + public Boolean compare(CharSequence leftValue, CharSequence rightValue) { + return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) <= 0; + } + + @Override + public Boolean compare(Boolean leftValue, Boolean rightValue) { + throw new IllegalArgumentException("<= is not supported for Boolean."); + } + + @Override + public Boolean compare(Number leftValue, Number rightValue) { + return (leftValue == null && rightValue == null) + || (leftValue != null && rightValue != null + && leftValue.floatValue() <= (rightValue).floatValue()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlNotEqualsExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlNotEqualsExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlNotEqualsExpression.java new file mode 100644 index 0000000..e02df3d --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/BeamSqlNotEqualsExpression.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator.comparison; + +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; + +/** + * {@code BeamSqlExpression} for {@code <>} operation. + */ +public class BeamSqlNotEqualsExpression extends BeamSqlCompareExpression { + + public BeamSqlNotEqualsExpression(List<BeamSqlExpression> operands) { + super(operands); + } + + @Override + public Boolean compare(CharSequence leftValue, CharSequence rightValue) { + return String.valueOf(leftValue).compareTo(String.valueOf(rightValue)) != 0; + } + + @Override + public Boolean compare(Boolean leftValue, Boolean rightValue) { + return leftValue ^ rightValue; + } + + @Override + public Boolean compare(Number leftValue, Number rightValue) { + return (leftValue == null && rightValue == null) + || (leftValue != null && rightValue != null + && leftValue.floatValue() != (rightValue).floatValue()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/package-info.java new file mode 100644 index 0000000..eea18ff --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/comparison/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Comparison operators. + */ +package org.apache.beam.dsls.sql.interpreter.operator.comparison; http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java new file mode 100644 index 0000000..c7df5ab --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator.date; + +import java.util.Collections; +import java.util.Date; + +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for CURRENT_DATE and LOCALTIME. + * + * <p>Returns the current date in the session time zone, in a value of datatype DATE. + */ +public class BeamSqlCurrentDateExpression extends BeamSqlExpression { + public BeamSqlCurrentDateExpression() { + super(Collections.<BeamSqlExpression>emptyList(), SqlTypeName.DATE); + } + @Override public boolean accept() { + return getOperands().size() == 0; + } + + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + return BeamSqlPrimitive.of(outputType, new Date()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java new file mode 100644 index 0000000..46e5a43 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator.date; + +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.List; +import java.util.TimeZone; + +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for LOCALTIME and CURRENT_TIME. + * + * <p>Returns the current date and time in the session time zone in a value of datatype TIME, with + * precision digits of precision. + * + * <p>NOTE: for simplicity, we will ignore the {@code precision} param. + */ +public class BeamSqlCurrentTimeExpression extends BeamSqlExpression { + public BeamSqlCurrentTimeExpression(List<BeamSqlExpression> operands) { + super(operands, SqlTypeName.TIME); + } + @Override public boolean accept() { + int opCount = getOperands().size(); + return opCount <= 1; + } + + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + GregorianCalendar ret = new GregorianCalendar(TimeZone.getDefault()); + ret.setTime(new Date()); + return BeamSqlPrimitive.of(outputType, ret); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java new file mode 100644 index 0000000..303846d --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimestampExpression.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator.date; + +import java.util.Date; +import java.util.List; + +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for LOCALTIMESTAMP and CURRENT_TIMESTAMP. + * + * <p>Returns the current date and time in the session time zone in a value of datatype TIMESTAMP, + * with precision digits of precision. + * + * <p>NOTE: for simplicity, we will ignore the {@code precision} param. + */ +public class BeamSqlCurrentTimestampExpression extends BeamSqlExpression { + public BeamSqlCurrentTimestampExpression(List<BeamSqlExpression> operands) { + super(operands, SqlTypeName.TIMESTAMP); + } + @Override public boolean accept() { + int opCount = getOperands().size(); + return opCount <= 1; + } + + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + return BeamSqlPrimitive.of(outputType, new Date()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java new file mode 100644 index 0000000..59e3e9c --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator.date; + +import java.util.Date; +import java.util.List; + +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.avatica.util.DateTimeUtils; +import org.apache.calcite.avatica.util.TimeUnitRange; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for CEIL(date). + * + * <p>NOTE: only support CEIL for {@link TimeUnitRange#YEAR} and {@link TimeUnitRange#MONTH}. + */ +public class BeamSqlDateCeilExpression extends BeamSqlExpression { + public BeamSqlDateCeilExpression(List<BeamSqlExpression> operands) { + super(operands, SqlTypeName.TIMESTAMP); + } + @Override public boolean accept() { + return operands.size() == 2 + && opType(1) == SqlTypeName.SYMBOL; + } + + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + Date date = opValueEvaluated(0, inputRow); + long time = date.getTime(); + TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue(); + + long newTime = DateTimeUtils.unixTimestampCeil(unit, time); + Date newDate = new Date(newTime); + + return BeamSqlPrimitive.of(outputType, newDate); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java new file mode 100644 index 0000000..64234f5 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator.date; + +import java.util.Date; +import java.util.List; + +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.avatica.util.DateTimeUtils; +import org.apache.calcite.avatica.util.TimeUnitRange; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for FLOOR(date). + * + * <p>NOTE: only support FLOOR for {@link TimeUnitRange#YEAR} and {@link TimeUnitRange#MONTH}. + */ +public class BeamSqlDateFloorExpression extends BeamSqlExpression { + public BeamSqlDateFloorExpression(List<BeamSqlExpression> operands) { + super(operands, SqlTypeName.DATE); + } + @Override public boolean accept() { + return operands.size() == 2 + && opType(1) == SqlTypeName.SYMBOL; + } + + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + Date date = opValueEvaluated(0, inputRow); + long time = date.getTime(); + TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue(); + + long newTime = DateTimeUtils.unixTimestampFloor(unit, time); + Date newDate = new Date(newTime); + + return BeamSqlPrimitive.of(outputType, newDate); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java new file mode 100644 index 0000000..d41a249 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator.date; + +import java.util.Calendar; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.avatica.util.DateTimeUtils; +import org.apache.calcite.avatica.util.TimeUnitRange; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for EXTRACT. + * + * <p>The following date functions also implicitly converted to {@code EXTRACT}: + * <ul> + * <li>YEAR(date) => EXTRACT(YEAR FROM date)</li> + * <li>MONTH(date) => EXTRACT(MONTH FROM date)</li> + * <li>DAY(date) => EXTRACT(DAY FROM date)</li> + * <li>QUARTER(date) => EXTRACT(QUARTER FROM date)</li> + * <li>WEEK(date) => EXTRACT(WEEK FROM date)</li> + * <li>DAYOFYEAR(date) => EXTRACT(DOY FROM date)</li> + * <li>DAYOFMONTH(date) => EXTRACT(DAY FROM date)</li> + * <li>DAYOFWEEK(date) => EXTRACT(DOW FROM date)</li> + * </ul> + */ +public class BeamSqlExtractExpression extends BeamSqlExpression { + private static final Map<TimeUnitRange, Integer> typeMapping = new HashMap<>(); + static { + typeMapping.put(TimeUnitRange.DOW, Calendar.DAY_OF_WEEK); + typeMapping.put(TimeUnitRange.DOY, Calendar.DAY_OF_YEAR); + typeMapping.put(TimeUnitRange.WEEK, Calendar.WEEK_OF_YEAR); + } + + public BeamSqlExtractExpression(List<BeamSqlExpression> operands) { + super(operands, SqlTypeName.BIGINT); + } + @Override public boolean accept() { + return operands.size() == 2 + && opType(1) == SqlTypeName.BIGINT; + } + + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + Long time = opValueEvaluated(1, inputRow); + + TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(0)).getValue(); + + switch (unit) { + case YEAR: + case MONTH: + case DAY: + Long timeByDay = time / 1000 / 3600 / 24; + Long extracted = DateTimeUtils.unixDateExtract( + unit, + timeByDay + ); + return BeamSqlPrimitive.of(outputType, extracted); + + case DOY: + case DOW: + case WEEK: + Calendar calendar = Calendar.getInstance(); + calendar.setTime(new Date(time)); + return BeamSqlPrimitive.of(outputType, (long) calendar.get(typeMapping.get(unit))); + + case QUARTER: + calendar = Calendar.getInstance(); + calendar.setTime(new Date(time)); + long ret = calendar.get(Calendar.MONTH) / 3; + if (ret * 3 < calendar.get(Calendar.MONTH)) { + ret += 1; + } + return BeamSqlPrimitive.of(outputType, ret); + + default: + throw new UnsupportedOperationException( + "Extract for time unit: " + unit + " not supported!"); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/package-info.java new file mode 100644 index 0000000..d3cc98f --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * date functions. + */ +package org.apache.beam.dsls.sql.interpreter.operator.date; http://git-wip-us.apache.org/repos/asf/beam/blob/ba493f85/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java new file mode 100644 index 0000000..5f6abe0 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/logical/BeamSqlAndExpression.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator.logical; + +import java.util.List; + +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for 'AND' operation. + */ +public class BeamSqlAndExpression extends BeamSqlLogicalExpression { + public BeamSqlAndExpression(List<BeamSqlExpression> operands) { + super(operands); + } + + @Override + public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRow) { + boolean result = true; + for (BeamSqlExpression exp : operands) { + BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRow); + result = result && expOut.getValue(); + if (!result) { + break; + } + } + return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, result); + } + +}