Repository: beam Updated Branches: refs/heads/DSL_SQL caf647daf -> a1f7cf6de
rebased, add RAND/RAND_INTEGER update as commented Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97c9b075 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97c9b075 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97c9b075 Branch: refs/heads/DSL_SQL Commit: 97c9b07576f9baa1bf10aaf1817f731137c1a1a0 Parents: caf647d Author: mingmxu <ming...@ebay.com> Authored: Tue Jul 18 00:09:39 2017 -0700 Committer: Tyler Akidau <taki...@apache.org> Committed: Tue Jul 18 17:54:43 2017 -0700 ---------------------------------------------------------------------- .../dsls/sql/interpreter/BeamSqlFnExecutor.java | 8 +++ .../operator/math/BeamSqlRandExpression.java | 54 ++++++++++++++++ .../math/BeamSqlRandIntegerExpression.java | 58 +++++++++++++++++ .../BeamSqlMathFunctionsIntegrationTest.java | 67 ++++++++++++++++++++ 4 files changed, 187 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/97c9b075/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java index 08d124f..64bc880 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java @@ -68,6 +68,8 @@ import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlLogExpression; import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlPiExpression; import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlPowerExpression; import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRadiansExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRandExpression; +import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRandIntegerExpression; import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRoundExpression; import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlSignExpression; import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlSinExpression; @@ -299,6 +301,12 @@ public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor { case "TRUNCATE": ret = new BeamSqlTruncateExpression(subExps); break; + case "RAND": + ret = new BeamSqlRandExpression(subExps); + break; + case "RAND_INTEGER": + ret = new BeamSqlRandIntegerExpression(subExps); + break; // string operators case "||": http://git-wip-us.apache.org/repos/asf/beam/blob/97c9b075/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRandExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRandExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRandExpression.java new file mode 100644 index 0000000..944936b --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRandExpression.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator.math; + +import java.util.List; +import java.util.Random; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlMathUnaryExpression} for 'RAND([seed])' function. + */ +public class BeamSqlRandExpression extends BeamSqlExpression { + private Random rand = new Random(); + private Integer seed = null; + + public BeamSqlRandExpression(List<BeamSqlExpression> subExps) { + super(subExps, SqlTypeName.DOUBLE); + } + + @Override + public boolean accept() { + return true; + } + + @Override + public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { + if (operands.size() == 1) { + int rowSeed = opValueEvaluated(0, inputRecord); + if (seed == null || seed != rowSeed) { + rand.setSeed(rowSeed); + } + } + return BeamSqlPrimitive.of(SqlTypeName.DOUBLE, rand.nextDouble()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/97c9b075/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRandIntegerExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRandIntegerExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRandIntegerExpression.java new file mode 100644 index 0000000..02e464f --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlRandIntegerExpression.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.dsls.sql.interpreter.operator.math; + +import java.util.List; +import java.util.Random; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlMathUnaryExpression} for 'RAND_INTEGER([seed, ] numeric)' + * function. + */ +public class BeamSqlRandIntegerExpression extends BeamSqlExpression { + private Random rand = new Random(); + private Integer seed = null; + + public BeamSqlRandIntegerExpression(List<BeamSqlExpression> subExps) { + super(subExps, SqlTypeName.INTEGER); + } + + @Override + public boolean accept() { + return true; + } + + @Override + public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) { + int numericIdx = 0; + if (operands.size() == 2) { + int rowSeed = opValueEvaluated(0, inputRecord); + if (seed == null || seed != rowSeed) { + rand.setSeed(rowSeed); + } + numericIdx = 1; + } + return BeamSqlPrimitive.of(SqlTypeName.INTEGER, + rand.nextInt((int) opValueEvaluated(numericIdx, inputRecord))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/97c9b075/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlMathFunctionsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlMathFunctionsIntegrationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlMathFunctionsIntegrationTest.java new file mode 100644 index 0000000..b8b8151 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlMathFunctionsIntegrationTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.integrationtest; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.Random; +import org.apache.beam.dsls.sql.BeamSql; +import org.apache.beam.dsls.sql.BeamSqlDslBase; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test cases for built-in Math functions. + */ +public class BeamSqlMathFunctionsIntegrationTest extends BeamSqlDslBase implements Serializable { + + @Test + public void testRandRandInteger() throws Exception { + String sql = "SELECT RAND(f_int) as a, RAND(100) as b, RAND() as c, " + + "RAND_INTEGER(10) as d, RAND_INTEGER(10, 100) as e " + + "FROM PCOLLECTION"; + + PCollection<BeamSqlRow> result = boundedInput2 + .apply("testRandRandInteger", BeamSql.simpleQuery(sql)); + + PAssert.that(result).satisfies(new SerializableFunction<Iterable<BeamSqlRow>, Void>() { + @Override + public Void apply(Iterable<BeamSqlRow> input) { + Iterator<BeamSqlRow> ite = input.iterator(); + Assert.assertTrue(ite.hasNext()); + BeamSqlRow row = ite.next(); + + Assert.assertEquals(new Random(1).nextDouble(), row.getDouble(0), 0); + Assert.assertEquals(new Random(100).nextDouble(), row.getDouble(1), 0); + Assert.assertTrue(row.getDouble(2) >= 0 && row.getDouble(2) < 1); + + Assert.assertTrue(row.getInteger(3) >= 0 && row.getInteger(3) < 10); + Assert.assertEquals(new Random(10).nextInt(100), row.getInteger(4)); + + Assert.assertFalse(ite.hasNext()); + return null; + } + }); + + pipeline.run().waitUntilFinish(); + } +}