move all implementation classes/packages into impl package
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/febd044a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/febd044a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/febd044a Branch: refs/heads/DSL_SQL Commit: febd044ae306a28fa3797a1663e54c1d7fbe43ce Parents: c1b5482 Author: James Xu <xumingmi...@gmail.com> Authored: Mon Jul 31 17:11:53 2017 +0800 Committer: James Xu <xumingmi...@gmail.com> Committed: Mon Jul 31 17:11:53 2017 +0800 ---------------------------------------------------------------------- .../apache/beam/sdk/extensions/sql/BeamSql.java | 2 +- .../beam/sdk/extensions/sql/BeamSqlCli.java | 2 +- .../beam/sdk/extensions/sql/BeamSqlEnv.java | 4 +- .../interpreter/BeamSqlExpressionExecutor.java | 43 ++ .../sql/impl/interpreter/BeamSqlFnExecutor.java | 442 +++++++++++++++++++ .../operator/BeamSqlCaseExpression.java | 63 +++ .../operator/BeamSqlCastExpression.java | 131 ++++++ .../interpreter/operator/BeamSqlExpression.java | 78 ++++ .../operator/BeamSqlInputRefExpression.java | 43 ++ .../interpreter/operator/BeamSqlPrimitive.java | 152 +++++++ .../operator/BeamSqlReinterpretExpression.java | 54 +++ .../operator/BeamSqlUdfExpression.java | 86 ++++ .../operator/BeamSqlWindowEndExpression.java | 42 ++ .../operator/BeamSqlWindowExpression.java | 50 +++ .../operator/BeamSqlWindowStartExpression.java | 43 ++ .../arithmetic/BeamSqlArithmeticExpression.java | 122 +++++ .../arithmetic/BeamSqlDivideExpression.java | 37 ++ .../arithmetic/BeamSqlMinusExpression.java | 36 ++ .../arithmetic/BeamSqlModExpression.java | 36 ++ .../arithmetic/BeamSqlMultiplyExpression.java | 36 ++ .../arithmetic/BeamSqlPlusExpression.java | 36 ++ .../operator/arithmetic/package-info.java | 22 + .../comparison/BeamSqlCompareExpression.java | 96 ++++ .../comparison/BeamSqlEqualsExpression.java | 49 ++ .../BeamSqlGreaterThanExpression.java | 49 ++ .../BeamSqlGreaterThanOrEqualsExpression.java | 49 ++ .../comparison/BeamSqlIsNotNullExpression.java | 53 +++ .../comparison/BeamSqlIsNullExpression.java | 53 +++ .../comparison/BeamSqlLessThanExpression.java | 49 ++ .../BeamSqlLessThanOrEqualsExpression.java | 49 ++ .../comparison/BeamSqlNotEqualsExpression.java | 49 ++ .../operator/comparison/package-info.java | 22 + .../date/BeamSqlCurrentDateExpression.java | 44 ++ .../date/BeamSqlCurrentTimeExpression.java | 52 +++ .../date/BeamSqlCurrentTimestampExpression.java | 48 ++ .../date/BeamSqlDateCeilExpression.java | 54 +++ .../date/BeamSqlDateFloorExpression.java | 54 +++ .../operator/date/BeamSqlExtractExpression.java | 101 +++++ .../interpreter/operator/date/package-info.java | 22 + .../operator/logical/BeamSqlAndExpression.java | 47 ++ .../logical/BeamSqlLogicalExpression.java | 46 ++ .../operator/logical/BeamSqlNotExpression.java | 53 +++ .../operator/logical/BeamSqlOrExpression.java | 47 ++ .../operator/logical/package-info.java | 22 + .../operator/math/BeamSqlAbsExpression.java | 74 ++++ .../operator/math/BeamSqlAcosExpression.java | 40 ++ .../operator/math/BeamSqlAsinExpression.java | 40 ++ .../operator/math/BeamSqlAtan2Expression.java | 42 ++ .../operator/math/BeamSqlAtanExpression.java | 40 ++ .../operator/math/BeamSqlCeilExpression.java | 45 ++ .../operator/math/BeamSqlCosExpression.java | 40 ++ .../operator/math/BeamSqlCotExpression.java | 40 ++ .../operator/math/BeamSqlDegreesExpression.java | 40 ++ .../operator/math/BeamSqlExpExpression.java | 40 ++ .../operator/math/BeamSqlFloorExpression.java | 45 ++ .../operator/math/BeamSqlLnExpression.java | 40 ++ .../operator/math/BeamSqlLogExpression.java | 40 ++ .../math/BeamSqlMathBinaryExpression.java | 63 +++ .../math/BeamSqlMathUnaryExpression.java | 58 +++ .../operator/math/BeamSqlPiExpression.java | 42 ++ .../operator/math/BeamSqlPowerExpression.java | 44 ++ .../operator/math/BeamSqlRadiansExpression.java | 40 ++ .../operator/math/BeamSqlRandExpression.java | 54 +++ .../math/BeamSqlRandIntegerExpression.java | 58 +++ .../operator/math/BeamSqlRoundExpression.java | 107 +++++ .../operator/math/BeamSqlSignExpression.java | 72 +++ .../operator/math/BeamSqlSinExpression.java | 40 ++ .../operator/math/BeamSqlTanExpression.java | 40 ++ .../math/BeamSqlTruncateExpression.java | 75 ++++ .../interpreter/operator/math/package-info.java | 22 + .../impl/interpreter/operator/package-info.java | 22 + .../string/BeamSqlCharLengthExpression.java | 39 ++ .../string/BeamSqlConcatExpression.java | 62 +++ .../string/BeamSqlInitCapExpression.java | 55 +++ .../operator/string/BeamSqlLowerExpression.java | 39 ++ .../string/BeamSqlOverlayExpression.java | 76 ++++ .../string/BeamSqlPositionExpression.java | 72 +++ .../string/BeamSqlStringUnaryExpression.java | 44 ++ .../string/BeamSqlSubstringExpression.java | 82 ++++ .../operator/string/BeamSqlTrimExpression.java | 101 +++++ .../operator/string/BeamSqlUpperExpression.java | 39 ++ .../operator/string/package-info.java | 22 + .../sql/impl/interpreter/package-info.java | 22 + .../sql/impl/planner/BeamQueryPlanner.java | 167 +++++++ .../sql/impl/planner/BeamRelDataTypeSystem.java | 40 ++ .../sql/impl/planner/BeamRuleSets.java | 75 ++++ .../sql/impl/planner/package-info.java | 24 + .../sql/impl/rel/BeamAggregationRel.java | 182 ++++++++ .../extensions/sql/impl/rel/BeamFilterRel.java | 70 +++ .../extensions/sql/impl/rel/BeamIOSinkRel.java | 75 ++++ .../sql/impl/rel/BeamIOSourceRel.java | 63 +++ .../sql/impl/rel/BeamIntersectRel.java | 58 +++ .../extensions/sql/impl/rel/BeamJoinRel.java | 302 +++++++++++++ .../sql/impl/rel/BeamLogicalConvention.java | 72 +++ .../extensions/sql/impl/rel/BeamMinusRel.java | 56 +++ .../extensions/sql/impl/rel/BeamProjectRel.java | 81 ++++ .../extensions/sql/impl/rel/BeamRelNode.java | 38 ++ .../sql/impl/rel/BeamSetOperatorRelBase.java | 98 ++++ .../extensions/sql/impl/rel/BeamSortRel.java | 247 +++++++++++ .../sql/impl/rel/BeamSqlRelUtils.java | 72 +++ .../extensions/sql/impl/rel/BeamUnionRel.java | 88 ++++ .../extensions/sql/impl/rel/BeamValuesRel.java | 79 ++++ .../extensions/sql/impl/rel/package-info.java | 23 + .../sql/impl/rule/BeamAggregationRule.java | 162 +++++++ .../sql/impl/rule/BeamFilterRule.java | 49 ++ .../sql/impl/rule/BeamIOSinkRule.java | 81 ++++ .../sql/impl/rule/BeamIOSourceRule.java | 49 ++ .../sql/impl/rule/BeamIntersectRule.java | 50 +++ .../extensions/sql/impl/rule/BeamJoinRule.java | 53 +++ .../extensions/sql/impl/rule/BeamMinusRule.java | 50 +++ .../sql/impl/rule/BeamProjectRule.java | 50 +++ .../extensions/sql/impl/rule/BeamSortRule.java | 51 +++ .../extensions/sql/impl/rule/BeamUnionRule.java | 50 +++ .../sql/impl/rule/BeamValuesRule.java | 48 ++ .../extensions/sql/impl/rule/package-info.java | 23 + .../transform/BeamAggregationTransforms.java | 300 +++++++++++++ .../impl/transform/BeamBuiltinAggregations.java | 412 +++++++++++++++++ .../sql/impl/transform/BeamJoinTransforms.java | 166 +++++++ .../transform/BeamSetOperatorsTransforms.java | 111 +++++ .../sql/impl/transform/BeamSqlFilterFn.java | 62 +++ .../transform/BeamSqlOutputToConsoleFn.java | 41 ++ .../sql/impl/transform/BeamSqlProjectFn.java | 72 +++ .../sql/impl/transform/package-info.java | 22 + .../extensions/sql/impl/utils/CalciteUtils.java | 113 +++++ .../extensions/sql/impl/utils/package-info.java | 22 + .../interpreter/BeamSqlExpressionExecutor.java | 43 -- .../sql/interpreter/BeamSqlFnExecutor.java | 442 ------------------- .../operator/BeamSqlCaseExpression.java | 63 --- .../operator/BeamSqlCastExpression.java | 131 ------ .../interpreter/operator/BeamSqlExpression.java | 78 ---- .../operator/BeamSqlInputRefExpression.java | 43 -- .../interpreter/operator/BeamSqlPrimitive.java | 152 ------- .../operator/BeamSqlReinterpretExpression.java | 54 --- .../operator/BeamSqlUdfExpression.java | 86 ---- .../operator/BeamSqlWindowEndExpression.java | 42 -- .../operator/BeamSqlWindowExpression.java | 50 --- .../operator/BeamSqlWindowStartExpression.java | 43 -- .../arithmetic/BeamSqlArithmeticExpression.java | 122 ----- .../arithmetic/BeamSqlDivideExpression.java | 37 -- .../arithmetic/BeamSqlMinusExpression.java | 36 -- .../arithmetic/BeamSqlModExpression.java | 36 -- .../arithmetic/BeamSqlMultiplyExpression.java | 36 -- .../arithmetic/BeamSqlPlusExpression.java | 36 -- .../operator/arithmetic/package-info.java | 22 - .../comparison/BeamSqlCompareExpression.java | 96 ---- .../comparison/BeamSqlEqualsExpression.java | 49 -- .../BeamSqlGreaterThanExpression.java | 49 -- .../BeamSqlGreaterThanOrEqualsExpression.java | 49 -- .../comparison/BeamSqlIsNotNullExpression.java | 53 --- .../comparison/BeamSqlIsNullExpression.java | 53 --- .../comparison/BeamSqlLessThanExpression.java | 49 -- .../BeamSqlLessThanOrEqualsExpression.java | 49 -- .../comparison/BeamSqlNotEqualsExpression.java | 49 -- .../operator/comparison/package-info.java | 22 - .../date/BeamSqlCurrentDateExpression.java | 44 -- .../date/BeamSqlCurrentTimeExpression.java | 52 --- .../date/BeamSqlCurrentTimestampExpression.java | 48 -- .../date/BeamSqlDateCeilExpression.java | 54 --- .../date/BeamSqlDateFloorExpression.java | 54 --- .../operator/date/BeamSqlExtractExpression.java | 101 ----- .../interpreter/operator/date/package-info.java | 22 - .../operator/logical/BeamSqlAndExpression.java | 47 -- .../logical/BeamSqlLogicalExpression.java | 46 -- .../operator/logical/BeamSqlNotExpression.java | 53 --- .../operator/logical/BeamSqlOrExpression.java | 47 -- .../operator/logical/package-info.java | 22 - .../operator/math/BeamSqlAbsExpression.java | 74 ---- .../operator/math/BeamSqlAcosExpression.java | 40 -- .../operator/math/BeamSqlAsinExpression.java | 40 -- .../operator/math/BeamSqlAtan2Expression.java | 42 -- .../operator/math/BeamSqlAtanExpression.java | 40 -- .../operator/math/BeamSqlCeilExpression.java | 45 -- .../operator/math/BeamSqlCosExpression.java | 40 -- .../operator/math/BeamSqlCotExpression.java | 40 -- .../operator/math/BeamSqlDegreesExpression.java | 40 -- .../operator/math/BeamSqlExpExpression.java | 40 -- .../operator/math/BeamSqlFloorExpression.java | 45 -- .../operator/math/BeamSqlLnExpression.java | 40 -- .../operator/math/BeamSqlLogExpression.java | 40 -- .../math/BeamSqlMathBinaryExpression.java | 63 --- .../math/BeamSqlMathUnaryExpression.java | 58 --- .../operator/math/BeamSqlPiExpression.java | 42 -- .../operator/math/BeamSqlPowerExpression.java | 44 -- .../operator/math/BeamSqlRadiansExpression.java | 40 -- .../operator/math/BeamSqlRandExpression.java | 54 --- .../math/BeamSqlRandIntegerExpression.java | 58 --- .../operator/math/BeamSqlRoundExpression.java | 107 ----- .../operator/math/BeamSqlSignExpression.java | 72 --- .../operator/math/BeamSqlSinExpression.java | 40 -- .../operator/math/BeamSqlTanExpression.java | 40 -- .../math/BeamSqlTruncateExpression.java | 75 ---- .../interpreter/operator/math/package-info.java | 22 - .../sql/interpreter/operator/package-info.java | 22 - .../string/BeamSqlCharLengthExpression.java | 39 -- .../string/BeamSqlConcatExpression.java | 62 --- .../string/BeamSqlInitCapExpression.java | 55 --- .../operator/string/BeamSqlLowerExpression.java | 39 -- .../string/BeamSqlOverlayExpression.java | 76 ---- .../string/BeamSqlPositionExpression.java | 72 --- .../string/BeamSqlStringUnaryExpression.java | 44 -- .../string/BeamSqlSubstringExpression.java | 82 ---- .../operator/string/BeamSqlTrimExpression.java | 101 ----- .../operator/string/BeamSqlUpperExpression.java | 39 -- .../operator/string/package-info.java | 22 - .../sql/interpreter/package-info.java | 22 - .../sql/planner/BeamQueryPlanner.java | 167 ------- .../sql/planner/BeamRelDataTypeSystem.java | 40 -- .../extensions/sql/planner/BeamRuleSets.java | 75 ---- .../extensions/sql/planner/package-info.java | 24 - .../extensions/sql/rel/BeamAggregationRel.java | 182 -------- .../sdk/extensions/sql/rel/BeamFilterRel.java | 70 --- .../sdk/extensions/sql/rel/BeamIOSinkRel.java | 75 ---- .../sdk/extensions/sql/rel/BeamIOSourceRel.java | 63 --- .../extensions/sql/rel/BeamIntersectRel.java | 58 --- .../sdk/extensions/sql/rel/BeamJoinRel.java | 302 ------------- .../sql/rel/BeamLogicalConvention.java | 72 --- .../sdk/extensions/sql/rel/BeamMinusRel.java | 56 --- .../sdk/extensions/sql/rel/BeamProjectRel.java | 81 ---- .../sdk/extensions/sql/rel/BeamRelNode.java | 38 -- .../sql/rel/BeamSetOperatorRelBase.java | 98 ---- .../sdk/extensions/sql/rel/BeamSortRel.java | 247 ----------- .../sdk/extensions/sql/rel/BeamSqlRelUtils.java | 72 --- .../sdk/extensions/sql/rel/BeamUnionRel.java | 88 ---- .../sdk/extensions/sql/rel/BeamValuesRel.java | 79 ---- .../sdk/extensions/sql/rel/package-info.java | 23 - .../sql/rule/BeamAggregationRule.java | 162 ------- .../sdk/extensions/sql/rule/BeamFilterRule.java | 49 -- .../sdk/extensions/sql/rule/BeamIOSinkRule.java | 81 ---- .../extensions/sql/rule/BeamIOSourceRule.java | 49 -- .../extensions/sql/rule/BeamIntersectRule.java | 50 --- .../sdk/extensions/sql/rule/BeamJoinRule.java | 53 --- .../sdk/extensions/sql/rule/BeamMinusRule.java | 50 --- .../extensions/sql/rule/BeamProjectRule.java | 50 --- .../sdk/extensions/sql/rule/BeamSortRule.java | 51 --- .../sdk/extensions/sql/rule/BeamUnionRule.java | 50 --- .../sdk/extensions/sql/rule/BeamValuesRule.java | 48 -- .../sdk/extensions/sql/rule/package-info.java | 23 - .../sdk/extensions/sql/schema/BeamSqlRow.java | 2 +- .../extensions/sql/schema/BeamSqlRowCoder.java | 2 +- .../extensions/sql/schema/BeamTableUtils.java | 2 +- .../transform/BeamAggregationTransforms.java | 300 ------------- .../sql/transform/BeamBuiltinAggregations.java | 412 ----------------- .../sql/transform/BeamJoinTransforms.java | 166 ------- .../transform/BeamSetOperatorsTransforms.java | 111 ----- .../sql/transform/BeamSqlFilterFn.java | 62 --- .../sql/transform/BeamSqlOutputToConsoleFn.java | 41 -- .../sql/transform/BeamSqlProjectFn.java | 72 --- .../extensions/sql/transform/package-info.java | 22 - .../sdk/extensions/sql/utils/CalciteUtils.java | 113 ----- .../sdk/extensions/sql/utils/package-info.java | 22 - .../sdk/extensions/sql/BeamSqlDslJoinTest.java | 4 +- .../impl/interpreter/BeamSqlFnExecutorTest.java | 416 +++++++++++++++++ .../interpreter/BeamSqlFnExecutorTestBase.java | 92 ++++ .../operator/BeamNullExperssionTest.java | 55 +++ .../operator/BeamSqlAndOrExpressionTest.java | 61 +++ .../operator/BeamSqlCaseExpressionTest.java | 93 ++++ .../operator/BeamSqlCastExpressionTest.java | 125 ++++++ .../operator/BeamSqlCompareExpressionTest.java | 115 +++++ .../operator/BeamSqlInputRefExpressionTest.java | 57 +++ .../operator/BeamSqlPrimitiveTest.java | 59 +++ .../BeamSqlReinterpretExpressionTest.java | 75 ++++ .../operator/BeamSqlUdfExpressionTest.java | 51 +++ .../BeamSqlArithmeticExpressionTest.java | 237 ++++++++++ .../date/BeamSqlCurrentDateExpressionTest.java | 38 ++ .../date/BeamSqlCurrentTimeExpressionTest.java | 39 ++ .../BeamSqlCurrentTimestampExpressionTest.java | 39 ++ .../date/BeamSqlDateCeilExpressionTest.java | 50 +++ .../date/BeamSqlDateExpressionTestBase.java | 51 +++ .../date/BeamSqlDateFloorExpressionTest.java | 49 ++ .../date/BeamSqlExtractExpressionTest.java | 103 +++++ .../logical/BeamSqlNotExpressionTest.java | 47 ++ .../math/BeamSqlMathBinaryExpressionTest.java | 201 +++++++++ .../math/BeamSqlMathUnaryExpressionTest.java | 309 +++++++++++++ .../string/BeamSqlCharLengthExpressionTest.java | 44 ++ .../string/BeamSqlConcatExpressionTest.java | 66 +++ .../string/BeamSqlInitCapExpressionTest.java | 54 +++ .../string/BeamSqlLowerExpressionTest.java | 44 ++ .../string/BeamSqlOverlayExpressionTest.java | 87 ++++ .../string/BeamSqlPositionExpressionTest.java | 84 ++++ .../BeamSqlStringUnaryExpressionTest.java | 52 +++ .../string/BeamSqlSubstringExpressionTest.java | 101 +++++ .../string/BeamSqlTrimExpressionTest.java | 103 +++++ .../string/BeamSqlUpperExpressionTest.java | 44 ++ .../sql/impl/rel/BeamIntersectRelTest.java | 119 +++++ .../rel/BeamJoinRelBoundedVsBoundedTest.java | 204 +++++++++ .../rel/BeamJoinRelUnboundedVsBoundedTest.java | 241 ++++++++++ .../BeamJoinRelUnboundedVsUnboundedTest.java | 219 +++++++++ .../sql/impl/rel/BeamMinusRelTest.java | 118 +++++ .../impl/rel/BeamSetOperatorRelBaseTest.java | 106 +++++ .../sql/impl/rel/BeamSortRelTest.java | 237 ++++++++++ .../sql/impl/rel/BeamUnionRelTest.java | 104 +++++ .../sql/impl/rel/BeamValuesRelTest.java | 105 +++++ .../sdk/extensions/sql/impl/rel/CheckSize.java | 41 ++ .../sql/interpreter/BeamSqlFnExecutorTest.java | 416 ----------------- .../interpreter/BeamSqlFnExecutorTestBase.java | 92 ---- .../operator/BeamNullExperssionTest.java | 55 --- .../operator/BeamSqlAndOrExpressionTest.java | 61 --- .../operator/BeamSqlCaseExpressionTest.java | 93 ---- .../operator/BeamSqlCastExpressionTest.java | 125 ------ .../operator/BeamSqlCompareExpressionTest.java | 115 ----- .../operator/BeamSqlInputRefExpressionTest.java | 57 --- .../operator/BeamSqlPrimitiveTest.java | 59 --- .../BeamSqlReinterpretExpressionTest.java | 75 ---- .../operator/BeamSqlUdfExpressionTest.java | 51 --- .../BeamSqlArithmeticExpressionTest.java | 237 ---------- .../date/BeamSqlCurrentDateExpressionTest.java | 38 -- .../date/BeamSqlCurrentTimeExpressionTest.java | 39 -- .../BeamSqlCurrentTimestampExpressionTest.java | 39 -- .../date/BeamSqlDateCeilExpressionTest.java | 50 --- .../date/BeamSqlDateExpressionTestBase.java | 51 --- .../date/BeamSqlDateFloorExpressionTest.java | 49 -- .../date/BeamSqlExtractExpressionTest.java | 103 ----- .../logical/BeamSqlNotExpressionTest.java | 47 -- .../math/BeamSqlMathBinaryExpressionTest.java | 201 --------- .../math/BeamSqlMathUnaryExpressionTest.java | 309 ------------- .../string/BeamSqlCharLengthExpressionTest.java | 44 -- .../string/BeamSqlConcatExpressionTest.java | 66 --- .../string/BeamSqlInitCapExpressionTest.java | 54 --- .../string/BeamSqlLowerExpressionTest.java | 44 -- .../string/BeamSqlOverlayExpressionTest.java | 87 ---- .../string/BeamSqlPositionExpressionTest.java | 84 ---- .../BeamSqlStringUnaryExpressionTest.java | 52 --- .../string/BeamSqlSubstringExpressionTest.java | 101 ----- .../string/BeamSqlTrimExpressionTest.java | 103 ----- .../string/BeamSqlUpperExpressionTest.java | 44 -- .../sql/rel/BeamIntersectRelTest.java | 119 ----- .../rel/BeamJoinRelBoundedVsBoundedTest.java | 204 --------- .../rel/BeamJoinRelUnboundedVsBoundedTest.java | 241 ---------- .../BeamJoinRelUnboundedVsUnboundedTest.java | 219 --------- .../extensions/sql/rel/BeamMinusRelTest.java | 118 ----- .../sql/rel/BeamSetOperatorRelBaseTest.java | 106 ----- .../sdk/extensions/sql/rel/BeamSortRelTest.java | 237 ---------- .../extensions/sql/rel/BeamUnionRelTest.java | 104 ----- .../extensions/sql/rel/BeamValuesRelTest.java | 105 ----- .../beam/sdk/extensions/sql/rel/CheckSize.java | 41 -- .../sql/schema/BeamSqlRowCoderTest.java | 2 +- .../sql/schema/kafka/BeamKafkaCSVTableTest.java | 4 +- .../sql/schema/text/BeamTextCSVTableTest.java | 4 +- .../transform/BeamAggregationTransformTest.java | 6 +- .../schema/transform/BeamTransformBaseTest.java | 4 +- 340 files changed, 13117 insertions(+), 13117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java index d64ae41..e0d7a78 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java @@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql; import com.google.auto.value.AutoValue; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; import org.apache.beam.sdk.extensions.sql.schema.BeamPCollectionTable; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder; http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java index 714e102..3bea46a 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java @@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java index ca73b13..be0b0af 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java @@ -18,12 +18,12 @@ package org.apache.beam.sdk.extensions.sql; import java.io.Serializable; -import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf; -import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; import org.apache.calcite.DataContext; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.rel.type.RelDataType; http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java new file mode 100644 index 0000000..1ae6bb3 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.interpreter; + +import java.io.Serializable; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; + +/** + * {@code BeamSqlExpressionExecutor} fills the gap between relational + * expressions in Calcite SQL and executable code. + * + */ +public interface BeamSqlExpressionExecutor extends Serializable { + + /** + * invoked before data processing. + */ + void prepare(); + + /** + * apply transformation to input record {@link BeamSqlRow}. + * + */ + List<Object> execute(BeamSqlRow inputRow); + + void close(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java new file mode 100644 index 0000000..1f9e0e3 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.interpreter; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCaseExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCastExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlReinterpretExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlUdfExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowEndExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowStartExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlDivideExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlMinusExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlModExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlMultiplyExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlPlusExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlEqualsExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlGreaterThanExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlGreaterThanOrEqualsExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlIsNotNullExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlIsNullExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlLessThanExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlNotEqualsExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentDateExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimeExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimestampExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateCeilExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateFloorExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlExtractExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlAndExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlNotExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlOrExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAbsExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAcosExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAsinExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAtan2Expression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAtanExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlCeilExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlCosExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlCotExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlDegreesExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlExpExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlFloorExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlLnExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlLogExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlPiExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlPowerExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRadiansExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRandExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRandIntegerExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRoundExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlSignExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlSinExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlTanExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlTruncateExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlCharLengthExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlConcatExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlInitCapExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlLowerExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlOverlayExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlPositionExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlSubstringExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlTrimExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlUpperExpression; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.impl.ScalarFunctionImpl; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.validate.SqlUserDefinedFunction; +import org.apache.calcite.util.NlsString; + +/** + * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}. + * {@code BeamSqlFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression}, + * which can be evaluated against the {@link BeamSqlRow}. + * + */ +public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor { + protected List<BeamSqlExpression> exps; + + public BeamSqlFnExecutor(BeamRelNode relNode) { + this.exps = new ArrayList<>(); + if (relNode instanceof BeamFilterRel) { + BeamFilterRel filterNode = (BeamFilterRel) relNode; + RexNode condition = filterNode.getCondition(); + exps.add(buildExpression(condition)); + } else if (relNode instanceof BeamProjectRel) { + BeamProjectRel projectNode = (BeamProjectRel) relNode; + List<RexNode> projects = projectNode.getProjects(); + for (RexNode rexNode : projects) { + exps.add(buildExpression(rexNode)); + } + } else { + throw new UnsupportedOperationException( + String.format("%s is not supported yet!", relNode.getClass().toString())); + } + } + + /** + * {@link #buildExpression(RexNode)} visits the operands of {@link RexNode} recursively, + * and represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}. + */ + static BeamSqlExpression buildExpression(RexNode rexNode) { + BeamSqlExpression ret = null; + if (rexNode instanceof RexLiteral) { + RexLiteral node = (RexLiteral) rexNode; + SqlTypeName type = node.getTypeName(); + Object value = node.getValue(); + + if (SqlTypeName.CHAR_TYPES.contains(type) + && node.getValue() instanceof NlsString) { + // NlsString is not serializable, we need to convert + // it to string explicitly. + return BeamSqlPrimitive.of(type, ((NlsString) value).getValue()); + } else if (type == SqlTypeName.DATE && value instanceof Calendar) { + // does this actually make sense? + // Calcite actually treat Calendar as the java type of Date Literal + return BeamSqlPrimitive.of(type, ((Calendar) value).getTime()); + } else { + // node.getType().getSqlTypeName() and node.getSqlTypeName() can be different + // e.g. sql: "select 1" + // here the literal 1 will be parsed as a RexLiteral where: + // node.getType().getSqlTypeName() = INTEGER (the display type) + // node.getSqlTypeName() = DECIMAL (the actual internal storage format) + // So we need to do a convert here. + // check RexBuilder#makeLiteral for more information. + SqlTypeName realType = node.getType().getSqlTypeName(); + Object realValue = value; + if (type == SqlTypeName.DECIMAL) { + BigDecimal rawValue = (BigDecimal) value; + switch (realType) { + case TINYINT: + realValue = (byte) rawValue.intValue(); + break; + case SMALLINT: + realValue = (short) rawValue.intValue(); + break; + case INTEGER: + realValue = rawValue.intValue(); + break; + case BIGINT: + realValue = rawValue.longValue(); + break; + case DECIMAL: + realValue = rawValue; + break; + default: + throw new IllegalStateException("type/realType mismatch: " + + type + " VS " + realType); + } + } else if (type == SqlTypeName.DOUBLE) { + Double rawValue = (Double) value; + if (realType == SqlTypeName.FLOAT) { + realValue = rawValue.floatValue(); + } + } + return BeamSqlPrimitive.of(realType, realValue); + } + } else if (rexNode instanceof RexInputRef) { + RexInputRef node = (RexInputRef) rexNode; + ret = new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex()); + } else if (rexNode instanceof RexCall) { + RexCall node = (RexCall) rexNode; + String opName = node.op.getName(); + List<BeamSqlExpression> subExps = new ArrayList<>(); + for (RexNode subNode : node.getOperands()) { + subExps.add(buildExpression(subNode)); + } + switch (opName) { + // logical operators + case "AND": + ret = new BeamSqlAndExpression(subExps); + break; + case "OR": + ret = new BeamSqlOrExpression(subExps); + break; + case "NOT": + ret = new BeamSqlNotExpression(subExps); + break; + case "=": + ret = new BeamSqlEqualsExpression(subExps); + break; + case "<>": + ret = new BeamSqlNotEqualsExpression(subExps); + break; + case ">": + ret = new BeamSqlGreaterThanExpression(subExps); + break; + case ">=": + ret = new BeamSqlGreaterThanOrEqualsExpression(subExps); + break; + case "<": + ret = new BeamSqlLessThanExpression(subExps); + break; + case "<=": + ret = new BeamSqlLessThanOrEqualsExpression(subExps); + break; + + // arithmetic operators + case "+": + ret = new BeamSqlPlusExpression(subExps); + break; + case "-": + ret = new BeamSqlMinusExpression(subExps); + break; + case "*": + ret = new BeamSqlMultiplyExpression(subExps); + break; + case "/": + case "/INT": + ret = new BeamSqlDivideExpression(subExps); + break; + case "MOD": + ret = new BeamSqlModExpression(subExps); + break; + + case "ABS": + ret = new BeamSqlAbsExpression(subExps); + break; + case "ROUND": + ret = new BeamSqlRoundExpression(subExps); + break; + case "LN": + ret = new BeamSqlLnExpression(subExps); + break; + case "LOG10": + ret = new BeamSqlLogExpression(subExps); + break; + case "EXP": + ret = new BeamSqlExpExpression(subExps); + break; + case "ACOS": + ret = new BeamSqlAcosExpression(subExps); + break; + case "ASIN": + ret = new BeamSqlAsinExpression(subExps); + break; + case "ATAN": + ret = new BeamSqlAtanExpression(subExps); + break; + case "COT": + ret = new BeamSqlCotExpression(subExps); + break; + case "DEGREES": + ret = new BeamSqlDegreesExpression(subExps); + break; + case "RADIANS": + ret = new BeamSqlRadiansExpression(subExps); + break; + case "COS": + ret = new BeamSqlCosExpression(subExps); + break; + case "SIN": + ret = new BeamSqlSinExpression(subExps); + break; + case "TAN": + ret = new BeamSqlTanExpression(subExps); + break; + case "SIGN": + ret = new BeamSqlSignExpression(subExps); + break; + case "POWER": + ret = new BeamSqlPowerExpression(subExps); + break; + case "PI": + ret = new BeamSqlPiExpression(); + break; + case "ATAN2": + ret = new BeamSqlAtan2Expression(subExps); + break; + case "TRUNCATE": + ret = new BeamSqlTruncateExpression(subExps); + break; + case "RAND": + ret = new BeamSqlRandExpression(subExps); + break; + case "RAND_INTEGER": + ret = new BeamSqlRandIntegerExpression(subExps); + break; + + // string operators + case "||": + ret = new BeamSqlConcatExpression(subExps); + break; + case "POSITION": + ret = new BeamSqlPositionExpression(subExps); + break; + case "CHAR_LENGTH": + case "CHARACTER_LENGTH": + ret = new BeamSqlCharLengthExpression(subExps); + break; + case "UPPER": + ret = new BeamSqlUpperExpression(subExps); + break; + case "LOWER": + ret = new BeamSqlLowerExpression(subExps); + break; + case "TRIM": + ret = new BeamSqlTrimExpression(subExps); + break; + case "SUBSTRING": + ret = new BeamSqlSubstringExpression(subExps); + break; + case "OVERLAY": + ret = new BeamSqlOverlayExpression(subExps); + break; + case "INITCAP": + ret = new BeamSqlInitCapExpression(subExps); + break; + + // date functions + case "Reinterpret": + return new BeamSqlReinterpretExpression(subExps, node.type.getSqlTypeName()); + case "CEIL": + if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) { + return new BeamSqlCeilExpression(subExps); + } else { + return new BeamSqlDateCeilExpression(subExps); + } + case "FLOOR": + if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) { + return new BeamSqlFloorExpression(subExps); + } else { + return new BeamSqlDateFloorExpression(subExps); + } + case "EXTRACT_DATE": + case "EXTRACT": + return new BeamSqlExtractExpression(subExps); + + case "LOCALTIME": + case "CURRENT_TIME": + return new BeamSqlCurrentTimeExpression(subExps); + + case "CURRENT_TIMESTAMP": + case "LOCALTIMESTAMP": + return new BeamSqlCurrentTimestampExpression(subExps); + + case "CURRENT_DATE": + return new BeamSqlCurrentDateExpression(); + + + case "CASE": + ret = new BeamSqlCaseExpression(subExps); + break; + case "CAST": + ret = new BeamSqlCastExpression(subExps, node.type.getSqlTypeName()); + break; + + case "IS NULL": + ret = new BeamSqlIsNullExpression(subExps.get(0)); + break; + case "IS NOT NULL": + ret = new BeamSqlIsNotNullExpression(subExps.get(0)); + break; + + case "HOP": + case "TUMBLE": + case "SESSION": + ret = new BeamSqlWindowExpression(subExps, node.type.getSqlTypeName()); + break; + case "HOP_START": + case "TUMBLE_START": + case "SESSION_START": + ret = new BeamSqlWindowStartExpression(); + break; + case "HOP_END": + case "TUMBLE_END": + case "SESSION_END": + ret = new BeamSqlWindowEndExpression(); + break; + default: + //handle UDF + if (((RexCall) rexNode).getOperator() instanceof SqlUserDefinedFunction) { + SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) rexNode).getOperator(); + ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction(); + ret = new BeamSqlUdfExpression(fn.method, subExps, + ((RexCall) rexNode).type.getSqlTypeName()); + } else { + throw new UnsupportedOperationException("Operator: " + opName + " is not supported yet!"); + } + } + } else { + throw new UnsupportedOperationException( + String.format("%s is not supported yet!", rexNode.getClass().toString())); + } + + if (ret != null && !ret.accept()) { + throw new IllegalStateException(ret.getClass().getSimpleName() + + " does not accept the operands.(" + rexNode + ")"); + } + + return ret; + } + + @Override + public void prepare() { + } + + @Override + public List<Object> execute(BeamSqlRow inputRow) { + List<Object> results = new ArrayList<>(); + for (BeamSqlExpression exp : exps) { + results.add(exp.evaluate(inputRow).getValue()); + } + return results; + } + + @Override + public void close() { + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java new file mode 100644 index 0000000..61e8aae --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlCaseExpression} represents CASE, NULLIF, COALESCE in SQL. + */ +public class BeamSqlCaseExpression extends BeamSqlExpression { + public BeamSqlCaseExpression(List<BeamSqlExpression> operands) { + // the return type of CASE is the type of the `else` condition + super(operands, operands.get(operands.size() - 1).getOutputType()); + } + + @Override public boolean accept() { + // `when`-`then` pair + `else` + if (operands.size() % 2 != 1) { + return false; + } + + for (int i = 0; i < operands.size() - 1; i += 2) { + if (opType(i) != SqlTypeName.BOOLEAN) { + return false; + } else if (opType(i + 1) != outputType) { + return false; + } + } + + return true; + } + + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + for (int i = 0; i < operands.size() - 1; i += 2) { + if (opValueEvaluated(i, inputRow)) { + return BeamSqlPrimitive.of( + outputType, + opValueEvaluated(i + 1, inputRow) + ); + } + } + return BeamSqlPrimitive.of(outputType, + opValueEvaluated(operands.size() - 1, inputRow)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java new file mode 100644 index 0000000..c98c10d --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; + +import java.sql.Date; +import java.sql.Timestamp; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.calcite.runtime.SqlFunctions; +import org.apache.calcite.sql.type.SqlTypeName; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.DateTimeFormatterBuilder; +import org.joda.time.format.DateTimeParser; + +/** + * Base class to support 'CAST' operations for all {@link SqlTypeName}. + */ +public class BeamSqlCastExpression extends BeamSqlExpression { + + private static final int index = 0; + private static final String outputTimestampFormat = "yyyy-MM-dd HH:mm:ss"; + private static final String outputDateFormat = "yyyy-MM-dd"; + /** + * Date and Timestamp formats used to parse + * {@link SqlTypeName#DATE}, {@link SqlTypeName#TIMESTAMP}. + */ + private static final DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder() + .append(null/*printer*/, new DateTimeParser[] { + // date formats + DateTimeFormat.forPattern("yy-MM-dd").getParser(), + DateTimeFormat.forPattern("yy/MM/dd").getParser(), + DateTimeFormat.forPattern("yy.MM.dd").getParser(), + DateTimeFormat.forPattern("yyMMdd").getParser(), + DateTimeFormat.forPattern("yyyyMMdd").getParser(), + DateTimeFormat.forPattern("yyyy-MM-dd").getParser(), + DateTimeFormat.forPattern("yyyy/MM/dd").getParser(), + DateTimeFormat.forPattern("yyyy.MM.dd").getParser(), + // datetime formats + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").getParser(), + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssz").getParser(), + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss z").getParser(), + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS").getParser(), + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSSz").getParser(), + DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS z").getParser() }).toFormatter() + .withPivotYear(2020); + + public BeamSqlCastExpression(List<BeamSqlExpression> operands, SqlTypeName castType) { + super(operands, castType); + } + + @Override + public boolean accept() { + return numberOfOperands() == 1; + } + + @Override + public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + SqlTypeName castOutputType = getOutputType(); + switch (castOutputType) { + case INTEGER: + return BeamSqlPrimitive + .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow))); + case DOUBLE: + return BeamSqlPrimitive + .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRow))); + case SMALLINT: + return BeamSqlPrimitive + .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRow))); + case TINYINT: + return BeamSqlPrimitive + .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRow))); + case BIGINT: + return BeamSqlPrimitive + .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow))); + case DECIMAL: + return BeamSqlPrimitive.of(SqlTypeName.DECIMAL, + SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow))); + case FLOAT: + return BeamSqlPrimitive + .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow))); + case CHAR: + case VARCHAR: + return BeamSqlPrimitive + .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow).toString()); + case DATE: + return BeamSqlPrimitive + .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRow), outputDateFormat)); + case TIMESTAMP: + return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, + toTimeStamp(opValueEvaluated(index, inputRow), outputTimestampFormat)); + } + throw new UnsupportedOperationException( + String.format("Cast to type %s not supported", castOutputType)); + } + + private Date toDate(Object inputDate, String outputFormat) { + try { + return Date + .valueOf(dateTimeFormatter.parseLocalDate(inputDate.toString()).toString(outputFormat)); + } catch (IllegalArgumentException | UnsupportedOperationException e) { + throw new UnsupportedOperationException("Can't be cast to type 'Date'"); + } + } + + private Timestamp toTimeStamp(Object inputTimestamp, String outputFormat) { + try { + return Timestamp.valueOf( + dateTimeFormatter.parseDateTime(inputTimestamp.toString()).secondOfMinute() + .roundCeilingCopy().toString(outputFormat)); + } catch (IllegalArgumentException | UnsupportedOperationException e) { + throw new UnsupportedOperationException("Can't be cast to type 'Timestamp'"); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java new file mode 100644 index 0000000..dc5db81 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; + +import java.io.Serializable; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} is an equivalent expression in BeamSQL, of {@link RexNode} in Calcite. + * + * <p>An implementation of {@link BeamSqlExpression} takes one or more {@code BeamSqlExpression} + * as its operands, and return a value with type {@link SqlTypeName}. + * + */ +public abstract class BeamSqlExpression implements Serializable { + protected List<BeamSqlExpression> operands; + protected SqlTypeName outputType; + + protected BeamSqlExpression(){} + + public BeamSqlExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { + this.operands = operands; + this.outputType = outputType; + } + + public BeamSqlExpression op(int idx) { + return operands.get(idx); + } + + public SqlTypeName opType(int idx) { + return op(idx).getOutputType(); + } + + public <T> T opValueEvaluated(int idx, BeamSqlRow row) { + return (T) op(idx).evaluate(row).getValue(); + } + + /** + * assertion to make sure the input and output are supported in this expression. + */ + public abstract boolean accept(); + + /** + * Apply input record {@link BeamSqlRow} to this expression, + * the output value is wrapped with {@link BeamSqlPrimitive}. + */ + public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRow); + + public List<BeamSqlExpression> getOperands() { + return operands; + } + + public SqlTypeName getOutputType() { + return outputType; + } + + public int numberOfOperands() { + return operands.size(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java new file mode 100644 index 0000000..7aba024 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; + +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * An primitive operation for direct field extraction. + */ +public class BeamSqlInputRefExpression extends BeamSqlExpression { + private int inputRef; + + public BeamSqlInputRefExpression(SqlTypeName sqlTypeName, int inputRef) { + super(null, sqlTypeName); + this.inputRef = inputRef; + } + + @Override + public boolean accept() { + return true; + } + + @Override + public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java new file mode 100644 index 0000000..6380af9 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; + +import java.math.BigDecimal; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.NlsString; + +/** + * {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}. + * It holds the value, and return it directly during {@link #evaluate(BeamSqlRow)}. + * + */ +public class BeamSqlPrimitive<T> extends BeamSqlExpression { + private T value; + + private BeamSqlPrimitive() { + } + + private BeamSqlPrimitive(List<BeamSqlExpression> operands, SqlTypeName outputType) { + super(operands, outputType); + } + + /** + * A builder function to create from Type and value directly. + */ + public static <T> BeamSqlPrimitive<T> of(SqlTypeName outputType, T value){ + BeamSqlPrimitive<T> exp = new BeamSqlPrimitive<>(); + exp.outputType = outputType; + exp.value = value; + if (!exp.accept()) { + throw new IllegalArgumentException( + String.format("value [%s] doesn't match type [%s].", value, outputType)); + } + return exp; + } + + public SqlTypeName getOutputType() { + return outputType; + } + + public T getValue() { + return value; + } + + public long getLong() { + return (Long) getValue(); + } + + public double getDouble() { + return (Double) getValue(); + } + + public float getFloat() { + return (Float) getValue(); + } + + public int getInteger() { + return (Integer) getValue(); + } + + public short getShort() { + return (Short) getValue(); + } + + public byte getByte() { + return (Byte) getValue(); + } + public boolean getBoolean() { + return (Boolean) getValue(); + } + + public String getString() { + return (String) getValue(); + } + + public Date getDate() { + return (Date) getValue(); + } + + public BigDecimal getDecimal() { + return (BigDecimal) getValue(); + } + + @Override + public boolean accept() { + if (value == null) { + return true; + } + + switch (outputType) { + case BIGINT: + return value instanceof Long; + case DECIMAL: + return value instanceof BigDecimal; + case DOUBLE: + return value instanceof Double; + case FLOAT: + return value instanceof Float; + case INTEGER: + return value instanceof Integer; + case SMALLINT: + return value instanceof Short; + case TINYINT: + return value instanceof Byte; + case BOOLEAN: + return value instanceof Boolean; + case CHAR: + case VARCHAR: + return value instanceof String || value instanceof NlsString; + case TIME: + return value instanceof GregorianCalendar; + case TIMESTAMP: + case DATE: + return value instanceof Date; + case INTERVAL_HOUR: + return value instanceof BigDecimal; + case INTERVAL_MINUTE: + return value instanceof BigDecimal; + case SYMBOL: + // for SYMBOL, it supports anything... + return true; + default: + throw new UnsupportedOperationException(outputType.name()); + } + } + + @Override + public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRow) { + return this; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java new file mode 100644 index 0000000..243baaa --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; + +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for REINTERPRET. + * + * <p>Currently only converting from {@link SqlTypeName#DATETIME_TYPES} + * to {@code BIGINT} is supported. + */ +public class BeamSqlReinterpretExpression extends BeamSqlExpression { + public BeamSqlReinterpretExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { + super(operands, outputType); + } + + @Override public boolean accept() { + return getOperands().size() == 1 + && outputType == SqlTypeName.BIGINT + && SqlTypeName.DATETIME_TYPES.contains(opType(0)); + } + + @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + if (opType(0) == SqlTypeName.TIME) { + GregorianCalendar date = opValueEvaluated(0, inputRow); + return BeamSqlPrimitive.of(outputType, date.getTimeInMillis()); + + } else { + Date date = opValueEvaluated(0, inputRow); + return BeamSqlPrimitive.of(outputType, date.getTime()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java new file mode 100644 index 0000000..eebb97c --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * invoke a UDF function. + */ +public class BeamSqlUdfExpression extends BeamSqlExpression { + //as Method is not Serializable, need to keep class/method information, and rebuild it. + private transient Method method; + private String className; + private String methodName; + private List<String> paraClassName = new ArrayList<>(); + + public BeamSqlUdfExpression(Method method, List<BeamSqlExpression> subExps, + SqlTypeName sqlTypeName) { + super(subExps, sqlTypeName); + this.method = method; + + this.className = method.getDeclaringClass().getName(); + this.methodName = method.getName(); + for (Class<?> c : method.getParameterTypes()) { + paraClassName.add(c.getName()); + } + } + + @Override + public boolean accept() { + return true; + } + + @Override + public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { + if (method == null) { + reConstructMethod(); + } + try { + List<Object> paras = new ArrayList<>(); + for (BeamSqlExpression e : getOperands()) { + paras.add(e.evaluate(inputRow).getValue()); + } + + return BeamSqlPrimitive.of(getOutputType(), + method.invoke(null, paras.toArray(new Object[]{}))); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /** + * re-construct method from class/method. + */ + private void reConstructMethod() { + try { + List<Class<?>> paraClass = new ArrayList<>(); + for (String pc : paraClassName) { + paraClass.add(Class.forName(pc)); + } + method = Class.forName(className).getMethod(methodName, paraClass.toArray(new Class<?>[] {})); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java new file mode 100644 index 0000000..0bd68df --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; + +import java.util.Date; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for {@code HOP_END}, {@code TUMBLE_END}, {@code SESSION_END} operation. + * + * <p>These operators returns the <em>end</em> timestamp of window. + */ +public class BeamSqlWindowEndExpression extends BeamSqlExpression { + + @Override + public boolean accept() { + return true; + } + + @Override + public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) { + return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, + new Date(inputRow.getWindowEnd().getMillis())); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java new file mode 100644 index 0000000..b560ef8 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; + +import java.util.Date; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for {@code HOP}, {@code TUMBLE}, {@code SESSION} operation. + * + * <p>These functions don't change the timestamp field, instead it's used to indicate + * the event_timestamp field, and how the window is defined. + */ +public class BeamSqlWindowExpression extends BeamSqlExpression { + + public BeamSqlWindowExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { + super(operands, outputType); + } + + @Override + public boolean accept() { + return operands.get(0).getOutputType().equals(SqlTypeName.DATE) + || operands.get(0).getOutputType().equals(SqlTypeName.TIME) + || operands.get(0).getOutputType().equals(SqlTypeName.TIMESTAMP); + } + + @Override + public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) { + return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, + (Date) operands.get(0).evaluate(inputRow).getValue()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java new file mode 100644 index 0000000..e2c1b34 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator; + +import java.util.Date; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * {@code BeamSqlExpression} for {@code HOP_START}, {@code TUMBLE_START}, + * {@code SESSION_START} operation. + * + * <p>These operators returns the <em>start</em> timestamp of window. + */ +public class BeamSqlWindowStartExpression extends BeamSqlExpression { + + @Override + public boolean accept() { + return true; + } + + @Override + public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) { + return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, + new Date(inputRow.getWindowStart().getMillis())); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java new file mode 100644 index 0000000..b07b28f --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Base class for all arithmetic operators. + */ +public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression { + private static final List<SqlTypeName> ORDERED_APPROX_TYPES = new ArrayList<>(); + static { + ORDERED_APPROX_TYPES.add(SqlTypeName.TINYINT); + ORDERED_APPROX_TYPES.add(SqlTypeName.SMALLINT); + ORDERED_APPROX_TYPES.add(SqlTypeName.INTEGER); + ORDERED_APPROX_TYPES.add(SqlTypeName.BIGINT); + ORDERED_APPROX_TYPES.add(SqlTypeName.FLOAT); + ORDERED_APPROX_TYPES.add(SqlTypeName.DOUBLE); + ORDERED_APPROX_TYPES.add(SqlTypeName.DECIMAL); + } + + protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands) { + super(operands, deduceOutputType(operands.get(0).getOutputType(), + operands.get(1).getOutputType())); + } + + protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { + super(operands, outputType); + } + + @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) { + BigDecimal left = BigDecimal.valueOf( + Double.valueOf(opValueEvaluated(0, inputRow).toString())); + BigDecimal right = BigDecimal.valueOf( + Double.valueOf(opValueEvaluated(1, inputRow).toString())); + + BigDecimal result = calc(left, right); + return getCorrectlyTypedResult(result); + } + + protected abstract BigDecimal calc(BigDecimal left, BigDecimal right); + + protected static SqlTypeName deduceOutputType(SqlTypeName left, SqlTypeName right) { + int leftIndex = ORDERED_APPROX_TYPES.indexOf(left); + int rightIndex = ORDERED_APPROX_TYPES.indexOf(right); + if ((left == SqlTypeName.FLOAT || right == SqlTypeName.FLOAT) + && (left == SqlTypeName.DECIMAL || right == SqlTypeName.DECIMAL)) { + return SqlTypeName.DOUBLE; + } + + if (leftIndex < rightIndex) { + return right; + } else if (leftIndex > rightIndex) { + return left; + } else { + return left; + } + } + + @Override public boolean accept() { + if (operands.size() != 2) { + return false; + } + + for (BeamSqlExpression operand : operands) { + if (!SqlTypeName.NUMERIC_TYPES.contains(operand.getOutputType())) { + return false; + } + } + return true; + } + + protected BeamSqlPrimitive<? extends Number> getCorrectlyTypedResult(BigDecimal rawResult) { + Number actualValue; + switch (outputType) { + case TINYINT: + actualValue = rawResult.byteValue(); + break; + case SMALLINT: + actualValue = rawResult.shortValue(); + break; + case INTEGER: + actualValue = rawResult.intValue(); + break; + case BIGINT: + actualValue = rawResult.longValue(); + break; + case FLOAT: + actualValue = rawResult.floatValue(); + break; + case DOUBLE: + actualValue = rawResult.doubleValue(); + break; + case DECIMAL: + default: + actualValue = rawResult; + } + return BeamSqlPrimitive.of(outputType, actualValue); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlDivideExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlDivideExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlDivideExpression.java new file mode 100644 index 0000000..d62a3f8 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlDivideExpression.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression; + +/** + * '/' operator. + */ +public class BeamSqlDivideExpression extends BeamSqlArithmeticExpression { + public BeamSqlDivideExpression(List<BeamSqlExpression> operands) { + super(operands); + } + + @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) { + return left.divide(right, 10, RoundingMode.HALF_EVEN); + } +}