rename package org.apache.beam.dsls.sql to org.apache.beam.sdk.extensions.sql
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c1b5482d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c1b5482d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c1b5482d Branch: refs/heads/DSL_SQL Commit: c1b5482d3fb13a14926f0ffc23d0810b3105ed24 Parents: ba493f8 Author: James Xu <xumingmi...@gmail.com> Authored: Sun Jul 30 23:58:02 2017 +0800 Committer: James Xu <xumingmi...@gmail.com> Committed: Sun Jul 30 23:58:02 2017 +0800 ---------------------------------------------------------------------- .../java/org/apache/beam/dsls/sql/BeamSql.java | 244 ---------- .../org/apache/beam/dsls/sql/BeamSqlCli.java | 65 --- .../org/apache/beam/dsls/sql/BeamSqlEnv.java | 120 ----- .../beam/dsls/sql/example/BeamSqlExample.java | 97 ---- .../beam/dsls/sql/example/package-info.java | 23 - .../interpreter/BeamSqlExpressionExecutor.java | 43 -- .../dsls/sql/interpreter/BeamSqlFnExecutor.java | 442 ------------------ .../operator/BeamSqlCaseExpression.java | 64 --- .../operator/BeamSqlCastExpression.java | 132 ------ .../interpreter/operator/BeamSqlExpression.java | 78 ---- .../operator/BeamSqlInputRefExpression.java | 43 -- .../interpreter/operator/BeamSqlPrimitive.java | 152 ------- .../operator/BeamSqlReinterpretExpression.java | 55 --- .../operator/BeamSqlUdfExpression.java | 86 ---- .../operator/BeamSqlWindowEndExpression.java | 42 -- .../operator/BeamSqlWindowExpression.java | 50 -- .../operator/BeamSqlWindowStartExpression.java | 43 -- .../arithmetic/BeamSqlArithmeticExpression.java | 122 ----- .../arithmetic/BeamSqlDivideExpression.java | 37 -- .../arithmetic/BeamSqlMinusExpression.java | 36 -- .../arithmetic/BeamSqlModExpression.java | 36 -- .../arithmetic/BeamSqlMultiplyExpression.java | 36 -- .../arithmetic/BeamSqlPlusExpression.java | 36 -- .../operator/arithmetic/package-info.java | 22 - .../comparison/BeamSqlCompareExpression.java | 96 ---- .../comparison/BeamSqlEqualsExpression.java | 49 -- .../BeamSqlGreaterThanExpression.java | 49 -- .../BeamSqlGreaterThanOrEqualsExpression.java | 49 -- .../comparison/BeamSqlIsNotNullExpression.java | 53 --- .../comparison/BeamSqlIsNullExpression.java | 53 --- .../comparison/BeamSqlLessThanExpression.java | 49 -- .../BeamSqlLessThanOrEqualsExpression.java | 49 -- .../comparison/BeamSqlNotEqualsExpression.java | 49 -- .../operator/comparison/package-info.java | 22 - .../date/BeamSqlCurrentDateExpression.java | 45 -- .../date/BeamSqlCurrentTimeExpression.java | 53 --- .../date/BeamSqlCurrentTimestampExpression.java | 49 -- .../date/BeamSqlDateCeilExpression.java | 55 --- .../date/BeamSqlDateFloorExpression.java | 55 --- .../operator/date/BeamSqlExtractExpression.java | 101 ----- .../interpreter/operator/date/package-info.java | 22 - .../operator/logical/BeamSqlAndExpression.java | 48 -- .../logical/BeamSqlLogicalExpression.java | 47 -- .../operator/logical/BeamSqlNotExpression.java | 54 --- .../operator/logical/BeamSqlOrExpression.java | 48 -- .../operator/logical/package-info.java | 22 - .../operator/math/BeamSqlAbsExpression.java | 74 --- .../operator/math/BeamSqlAcosExpression.java | 41 -- .../operator/math/BeamSqlAsinExpression.java | 41 -- .../operator/math/BeamSqlAtan2Expression.java | 43 -- .../operator/math/BeamSqlAtanExpression.java | 41 -- .../operator/math/BeamSqlCeilExpression.java | 46 -- .../operator/math/BeamSqlCosExpression.java | 41 -- .../operator/math/BeamSqlCotExpression.java | 41 -- .../operator/math/BeamSqlDegreesExpression.java | 41 -- .../operator/math/BeamSqlExpExpression.java | 41 -- .../operator/math/BeamSqlFloorExpression.java | 46 -- .../operator/math/BeamSqlLnExpression.java | 41 -- .../operator/math/BeamSqlLogExpression.java | 41 -- .../math/BeamSqlMathBinaryExpression.java | 64 --- .../math/BeamSqlMathUnaryExpression.java | 58 --- .../operator/math/BeamSqlPiExpression.java | 42 -- .../operator/math/BeamSqlPowerExpression.java | 45 -- .../operator/math/BeamSqlRadiansExpression.java | 41 -- .../operator/math/BeamSqlRandExpression.java | 54 --- .../math/BeamSqlRandIntegerExpression.java | 58 --- .../operator/math/BeamSqlRoundExpression.java | 108 ----- .../operator/math/BeamSqlSignExpression.java | 72 --- .../operator/math/BeamSqlSinExpression.java | 41 -- .../operator/math/BeamSqlTanExpression.java | 41 -- .../math/BeamSqlTruncateExpression.java | 76 ---- .../interpreter/operator/math/package-info.java | 22 - .../sql/interpreter/operator/package-info.java | 22 - .../string/BeamSqlCharLengthExpression.java | 40 -- .../string/BeamSqlConcatExpression.java | 63 --- .../string/BeamSqlInitCapExpression.java | 56 --- .../operator/string/BeamSqlLowerExpression.java | 40 -- .../string/BeamSqlOverlayExpression.java | 77 ---- .../string/BeamSqlPositionExpression.java | 73 --- .../string/BeamSqlStringUnaryExpression.java | 45 -- .../string/BeamSqlSubstringExpression.java | 83 ---- .../operator/string/BeamSqlTrimExpression.java | 101 ----- .../operator/string/BeamSqlUpperExpression.java | 40 -- .../operator/string/package-info.java | 22 - .../beam/dsls/sql/interpreter/package-info.java | 22 - .../org/apache/beam/dsls/sql/package-info.java | 22 - .../beam/dsls/sql/planner/BeamQueryPlanner.java | 167 ------- .../dsls/sql/planner/BeamRelDataTypeSystem.java | 40 -- .../beam/dsls/sql/planner/BeamRuleSets.java | 75 --- .../beam/dsls/sql/planner/package-info.java | 24 - .../beam/dsls/sql/rel/BeamAggregationRel.java | 182 -------- .../apache/beam/dsls/sql/rel/BeamFilterRel.java | 70 --- .../apache/beam/dsls/sql/rel/BeamIOSinkRel.java | 75 --- .../beam/dsls/sql/rel/BeamIOSourceRel.java | 63 --- .../beam/dsls/sql/rel/BeamIntersectRel.java | 58 --- .../apache/beam/dsls/sql/rel/BeamJoinRel.java | 302 ------------- .../dsls/sql/rel/BeamLogicalConvention.java | 72 --- .../apache/beam/dsls/sql/rel/BeamMinusRel.java | 56 --- .../beam/dsls/sql/rel/BeamProjectRel.java | 81 ---- .../apache/beam/dsls/sql/rel/BeamRelNode.java | 38 -- .../dsls/sql/rel/BeamSetOperatorRelBase.java | 98 ---- .../apache/beam/dsls/sql/rel/BeamSortRel.java | 247 ---------- .../beam/dsls/sql/rel/BeamSqlRelUtils.java | 73 --- .../apache/beam/dsls/sql/rel/BeamUnionRel.java | 88 ---- .../apache/beam/dsls/sql/rel/BeamValuesRel.java | 79 ---- .../apache/beam/dsls/sql/rel/package-info.java | 23 - .../beam/dsls/sql/rule/BeamAggregationRule.java | 162 ------- .../beam/dsls/sql/rule/BeamFilterRule.java | 49 -- .../beam/dsls/sql/rule/BeamIOSinkRule.java | 82 ---- .../beam/dsls/sql/rule/BeamIOSourceRule.java | 49 -- .../beam/dsls/sql/rule/BeamIntersectRule.java | 51 --- .../apache/beam/dsls/sql/rule/BeamJoinRule.java | 53 --- .../beam/dsls/sql/rule/BeamMinusRule.java | 51 --- .../beam/dsls/sql/rule/BeamProjectRule.java | 50 -- .../apache/beam/dsls/sql/rule/BeamSortRule.java | 52 --- .../beam/dsls/sql/rule/BeamUnionRule.java | 50 -- .../beam/dsls/sql/rule/BeamValuesRule.java | 48 -- .../apache/beam/dsls/sql/rule/package-info.java | 23 - .../beam/dsls/sql/schema/BaseBeamTable.java | 34 -- .../apache/beam/dsls/sql/schema/BeamIOType.java | 28 -- .../dsls/sql/schema/BeamPCollectionTable.java | 61 --- .../apache/beam/dsls/sql/schema/BeamSqlRow.java | 314 ------------- .../beam/dsls/sql/schema/BeamSqlRowCoder.java | 185 -------- .../beam/dsls/sql/schema/BeamSqlRowType.java | 40 -- .../beam/dsls/sql/schema/BeamSqlTable.java | 52 --- .../beam/dsls/sql/schema/BeamSqlUdaf.java | 72 --- .../apache/beam/dsls/sql/schema/BeamSqlUdf.java | 41 -- .../beam/dsls/sql/schema/BeamTableUtils.java | 122 ----- .../sql/schema/kafka/BeamKafkaCSVTable.java | 111 ----- .../dsls/sql/schema/kafka/BeamKafkaTable.java | 109 ----- .../dsls/sql/schema/kafka/package-info.java | 22 - .../beam/dsls/sql/schema/package-info.java | 22 - .../dsls/sql/schema/text/BeamTextCSVTable.java | 70 --- .../schema/text/BeamTextCSVTableIOReader.java | 59 --- .../schema/text/BeamTextCSVTableIOWriter.java | 59 --- .../dsls/sql/schema/text/BeamTextTable.java | 42 -- .../beam/dsls/sql/schema/text/package-info.java | 22 - .../transform/BeamAggregationTransforms.java | 300 ------------ .../sql/transform/BeamBuiltinAggregations.java | 412 ----------------- .../dsls/sql/transform/BeamJoinTransforms.java | 166 ------- .../transform/BeamSetOperatorsTransforms.java | 112 ----- .../dsls/sql/transform/BeamSqlFilterFn.java | 62 --- .../sql/transform/BeamSqlOutputToConsoleFn.java | 41 -- .../dsls/sql/transform/BeamSqlProjectFn.java | 72 --- .../beam/dsls/sql/transform/package-info.java | 22 - .../beam/dsls/sql/utils/CalciteUtils.java | 113 ----- .../beam/dsls/sql/utils/package-info.java | 22 - .../apache/beam/sdk/extensions/sql/BeamSql.java | 244 ++++++++++ .../beam/sdk/extensions/sql/BeamSqlCli.java | 65 +++ .../beam/sdk/extensions/sql/BeamSqlEnv.java | 119 +++++ .../extensions/sql/example/BeamSqlExample.java | 97 ++++ .../extensions/sql/example/package-info.java | 23 + .../interpreter/BeamSqlExpressionExecutor.java | 43 ++ .../sql/interpreter/BeamSqlFnExecutor.java | 442 ++++++++++++++++++ .../operator/BeamSqlCaseExpression.java | 63 +++ .../operator/BeamSqlCastExpression.java | 131 ++++++ .../interpreter/operator/BeamSqlExpression.java | 78 ++++ .../operator/BeamSqlInputRefExpression.java | 43 ++ .../interpreter/operator/BeamSqlPrimitive.java | 152 +++++++ .../operator/BeamSqlReinterpretExpression.java | 54 +++ .../operator/BeamSqlUdfExpression.java | 86 ++++ .../operator/BeamSqlWindowEndExpression.java | 42 ++ .../operator/BeamSqlWindowExpression.java | 50 ++ .../operator/BeamSqlWindowStartExpression.java | 43 ++ .../arithmetic/BeamSqlArithmeticExpression.java | 122 +++++ .../arithmetic/BeamSqlDivideExpression.java | 37 ++ .../arithmetic/BeamSqlMinusExpression.java | 36 ++ .../arithmetic/BeamSqlModExpression.java | 36 ++ .../arithmetic/BeamSqlMultiplyExpression.java | 36 ++ .../arithmetic/BeamSqlPlusExpression.java | 36 ++ .../operator/arithmetic/package-info.java | 22 + .../comparison/BeamSqlCompareExpression.java | 96 ++++ .../comparison/BeamSqlEqualsExpression.java | 49 ++ .../BeamSqlGreaterThanExpression.java | 49 ++ .../BeamSqlGreaterThanOrEqualsExpression.java | 49 ++ .../comparison/BeamSqlIsNotNullExpression.java | 53 +++ .../comparison/BeamSqlIsNullExpression.java | 53 +++ .../comparison/BeamSqlLessThanExpression.java | 49 ++ .../BeamSqlLessThanOrEqualsExpression.java | 49 ++ .../comparison/BeamSqlNotEqualsExpression.java | 49 ++ .../operator/comparison/package-info.java | 22 + .../date/BeamSqlCurrentDateExpression.java | 44 ++ .../date/BeamSqlCurrentTimeExpression.java | 52 +++ .../date/BeamSqlCurrentTimestampExpression.java | 48 ++ .../date/BeamSqlDateCeilExpression.java | 54 +++ .../date/BeamSqlDateFloorExpression.java | 54 +++ .../operator/date/BeamSqlExtractExpression.java | 101 +++++ .../interpreter/operator/date/package-info.java | 22 + .../operator/logical/BeamSqlAndExpression.java | 47 ++ .../logical/BeamSqlLogicalExpression.java | 46 ++ .../operator/logical/BeamSqlNotExpression.java | 53 +++ .../operator/logical/BeamSqlOrExpression.java | 47 ++ .../operator/logical/package-info.java | 22 + .../operator/math/BeamSqlAbsExpression.java | 74 +++ .../operator/math/BeamSqlAcosExpression.java | 40 ++ .../operator/math/BeamSqlAsinExpression.java | 40 ++ .../operator/math/BeamSqlAtan2Expression.java | 42 ++ .../operator/math/BeamSqlAtanExpression.java | 40 ++ .../operator/math/BeamSqlCeilExpression.java | 45 ++ .../operator/math/BeamSqlCosExpression.java | 40 ++ .../operator/math/BeamSqlCotExpression.java | 40 ++ .../operator/math/BeamSqlDegreesExpression.java | 40 ++ .../operator/math/BeamSqlExpExpression.java | 40 ++ .../operator/math/BeamSqlFloorExpression.java | 45 ++ .../operator/math/BeamSqlLnExpression.java | 40 ++ .../operator/math/BeamSqlLogExpression.java | 40 ++ .../math/BeamSqlMathBinaryExpression.java | 63 +++ .../math/BeamSqlMathUnaryExpression.java | 58 +++ .../operator/math/BeamSqlPiExpression.java | 42 ++ .../operator/math/BeamSqlPowerExpression.java | 44 ++ .../operator/math/BeamSqlRadiansExpression.java | 40 ++ .../operator/math/BeamSqlRandExpression.java | 54 +++ .../math/BeamSqlRandIntegerExpression.java | 58 +++ .../operator/math/BeamSqlRoundExpression.java | 107 +++++ .../operator/math/BeamSqlSignExpression.java | 72 +++ .../operator/math/BeamSqlSinExpression.java | 40 ++ .../operator/math/BeamSqlTanExpression.java | 40 ++ .../math/BeamSqlTruncateExpression.java | 75 +++ .../interpreter/operator/math/package-info.java | 22 + .../sql/interpreter/operator/package-info.java | 22 + .../string/BeamSqlCharLengthExpression.java | 39 ++ .../string/BeamSqlConcatExpression.java | 62 +++ .../string/BeamSqlInitCapExpression.java | 55 +++ .../operator/string/BeamSqlLowerExpression.java | 39 ++ .../string/BeamSqlOverlayExpression.java | 76 ++++ .../string/BeamSqlPositionExpression.java | 72 +++ .../string/BeamSqlStringUnaryExpression.java | 44 ++ .../string/BeamSqlSubstringExpression.java | 82 ++++ .../operator/string/BeamSqlTrimExpression.java | 101 +++++ .../operator/string/BeamSqlUpperExpression.java | 39 ++ .../operator/string/package-info.java | 22 + .../sql/interpreter/package-info.java | 22 + .../beam/sdk/extensions/sql/package-info.java | 22 + .../sql/planner/BeamQueryPlanner.java | 167 +++++++ .../sql/planner/BeamRelDataTypeSystem.java | 40 ++ .../extensions/sql/planner/BeamRuleSets.java | 75 +++ .../extensions/sql/planner/package-info.java | 24 + .../extensions/sql/rel/BeamAggregationRel.java | 182 ++++++++ .../sdk/extensions/sql/rel/BeamFilterRel.java | 70 +++ .../sdk/extensions/sql/rel/BeamIOSinkRel.java | 75 +++ .../sdk/extensions/sql/rel/BeamIOSourceRel.java | 63 +++ .../extensions/sql/rel/BeamIntersectRel.java | 58 +++ .../sdk/extensions/sql/rel/BeamJoinRel.java | 302 +++++++++++++ .../sql/rel/BeamLogicalConvention.java | 72 +++ .../sdk/extensions/sql/rel/BeamMinusRel.java | 56 +++ .../sdk/extensions/sql/rel/BeamProjectRel.java | 81 ++++ .../sdk/extensions/sql/rel/BeamRelNode.java | 38 ++ .../sql/rel/BeamSetOperatorRelBase.java | 98 ++++ .../sdk/extensions/sql/rel/BeamSortRel.java | 247 ++++++++++ .../sdk/extensions/sql/rel/BeamSqlRelUtils.java | 72 +++ .../sdk/extensions/sql/rel/BeamUnionRel.java | 88 ++++ .../sdk/extensions/sql/rel/BeamValuesRel.java | 79 ++++ .../sdk/extensions/sql/rel/package-info.java | 23 + .../sql/rule/BeamAggregationRule.java | 162 +++++++ .../sdk/extensions/sql/rule/BeamFilterRule.java | 49 ++ .../sdk/extensions/sql/rule/BeamIOSinkRule.java | 81 ++++ .../extensions/sql/rule/BeamIOSourceRule.java | 49 ++ .../extensions/sql/rule/BeamIntersectRule.java | 50 ++ .../sdk/extensions/sql/rule/BeamJoinRule.java | 53 +++ .../sdk/extensions/sql/rule/BeamMinusRule.java | 50 ++ .../extensions/sql/rule/BeamProjectRule.java | 50 ++ .../sdk/extensions/sql/rule/BeamSortRule.java | 51 +++ .../sdk/extensions/sql/rule/BeamUnionRule.java | 50 ++ .../sdk/extensions/sql/rule/BeamValuesRule.java | 48 ++ .../sdk/extensions/sql/rule/package-info.java | 23 + .../extensions/sql/schema/BaseBeamTable.java | 34 ++ .../sdk/extensions/sql/schema/BeamIOType.java | 28 ++ .../sql/schema/BeamPCollectionTable.java | 61 +++ .../sdk/extensions/sql/schema/BeamSqlRow.java | 314 +++++++++++++ .../extensions/sql/schema/BeamSqlRowCoder.java | 185 ++++++++ .../extensions/sql/schema/BeamSqlRowType.java | 40 ++ .../sdk/extensions/sql/schema/BeamSqlTable.java | 52 +++ .../sdk/extensions/sql/schema/BeamSqlUdaf.java | 72 +++ .../sdk/extensions/sql/schema/BeamSqlUdf.java | 41 ++ .../extensions/sql/schema/BeamTableUtils.java | 122 +++++ .../sql/schema/kafka/BeamKafkaCSVTable.java | 109 +++++ .../sql/schema/kafka/BeamKafkaTable.java | 109 +++++ .../sql/schema/kafka/package-info.java | 22 + .../sdk/extensions/sql/schema/package-info.java | 22 + .../sql/schema/text/BeamTextCSVTable.java | 70 +++ .../schema/text/BeamTextCSVTableIOReader.java | 58 +++ .../schema/text/BeamTextCSVTableIOWriter.java | 58 +++ .../sql/schema/text/BeamTextTable.java | 41 ++ .../sql/schema/text/package-info.java | 22 + .../transform/BeamAggregationTransforms.java | 300 ++++++++++++ .../sql/transform/BeamBuiltinAggregations.java | 412 +++++++++++++++++ .../sql/transform/BeamJoinTransforms.java | 166 +++++++ .../transform/BeamSetOperatorsTransforms.java | 111 +++++ .../sql/transform/BeamSqlFilterFn.java | 62 +++ .../sql/transform/BeamSqlOutputToConsoleFn.java | 41 ++ .../sql/transform/BeamSqlProjectFn.java | 72 +++ .../extensions/sql/transform/package-info.java | 22 + .../sdk/extensions/sql/utils/CalciteUtils.java | 113 +++++ .../sdk/extensions/sql/utils/package-info.java | 22 + .../beam/dsls/sql/BeamSqlApiSurfaceTest.java | 59 --- .../dsls/sql/BeamSqlDslAggregationTest.java | 380 ---------------- .../apache/beam/dsls/sql/BeamSqlDslBase.java | 170 ------- .../beam/dsls/sql/BeamSqlDslFilterTest.java | 155 ------- .../beam/dsls/sql/BeamSqlDslJoinTest.java | 191 -------- .../beam/dsls/sql/BeamSqlDslProjectTest.java | 238 ---------- .../beam/dsls/sql/BeamSqlDslUdfUdafTest.java | 138 ------ .../org/apache/beam/dsls/sql/TestUtils.java | 195 -------- ...amSqlArithmeticOperatorsIntegrationTest.java | 165 ------- ...mSqlBuiltinFunctionsIntegrationTestBase.java | 169 ------- ...amSqlComparisonOperatorsIntegrationTest.java | 330 -------------- ...mSqlConditionalFunctionsIntegrationTest.java | 60 --- .../BeamSqlDateFunctionsIntegrationTest.java | 88 ---- .../BeamSqlLogicalFunctionsIntegrationTest.java | 43 -- .../BeamSqlMathFunctionsIntegrationTest.java | 351 -------------- .../BeamSqlStringFunctionsIntegrationTest.java | 51 --- .../sql/interpreter/BeamSqlFnExecutorTest.java | 416 ----------------- .../interpreter/BeamSqlFnExecutorTestBase.java | 92 ---- .../operator/BeamNullExperssionTest.java | 55 --- .../operator/BeamSqlAndOrExpressionTest.java | 62 --- .../operator/BeamSqlCaseExpressionTest.java | 94 ---- .../operator/BeamSqlCastExpressionTest.java | 126 ------ .../operator/BeamSqlCompareExpressionTest.java | 115 ----- .../operator/BeamSqlInputRefExpressionTest.java | 57 --- .../operator/BeamSqlPrimitiveTest.java | 59 --- .../BeamSqlReinterpretExpressionTest.java | 77 ---- .../operator/BeamSqlUdfExpressionTest.java | 51 --- .../BeamSqlArithmeticExpressionTest.java | 237 ---------- .../date/BeamSqlCurrentDateExpressionTest.java | 35 -- .../date/BeamSqlCurrentTimeExpressionTest.java | 40 -- .../BeamSqlCurrentTimestampExpressionTest.java | 40 -- .../date/BeamSqlDateCeilExpressionTest.java | 49 -- .../date/BeamSqlDateExpressionTestBase.java | 52 --- .../date/BeamSqlDateFloorExpressionTest.java | 50 -- .../date/BeamSqlExtractExpressionTest.java | 96 ---- .../logical/BeamSqlNotExpressionTest.java | 48 -- .../math/BeamSqlMathBinaryExpressionTest.java | 203 --------- .../math/BeamSqlMathUnaryExpressionTest.java | 310 ------------- .../string/BeamSqlCharLengthExpressionTest.java | 45 -- .../string/BeamSqlConcatExpressionTest.java | 67 --- .../string/BeamSqlInitCapExpressionTest.java | 55 --- .../string/BeamSqlLowerExpressionTest.java | 45 -- .../string/BeamSqlOverlayExpressionTest.java | 88 ---- .../string/BeamSqlPositionExpressionTest.java | 85 ---- .../BeamSqlStringUnaryExpressionTest.java | 53 --- .../string/BeamSqlSubstringExpressionTest.java | 102 ----- .../string/BeamSqlTrimExpressionTest.java | 103 ----- .../string/BeamSqlUpperExpressionTest.java | 45 -- .../beam/dsls/sql/mock/MockedBoundedTable.java | 134 ------ .../apache/beam/dsls/sql/mock/MockedTable.java | 42 -- .../dsls/sql/mock/MockedUnboundedTable.java | 114 ----- .../beam/dsls/sql/rel/BeamIntersectRelTest.java | 119 ----- .../rel/BeamJoinRelBoundedVsBoundedTest.java | 204 --------- .../rel/BeamJoinRelUnboundedVsBoundedTest.java | 241 ---------- .../BeamJoinRelUnboundedVsUnboundedTest.java | 219 --------- .../beam/dsls/sql/rel/BeamMinusRelTest.java | 118 ----- .../sql/rel/BeamSetOperatorRelBaseTest.java | 106 ----- .../beam/dsls/sql/rel/BeamSortRelTest.java | 237 ---------- .../beam/dsls/sql/rel/BeamUnionRelTest.java | 104 ----- .../beam/dsls/sql/rel/BeamValuesRelTest.java | 105 ----- .../org/apache/beam/dsls/sql/rel/CheckSize.java | 41 -- .../dsls/sql/schema/BeamSqlRowCoderTest.java | 83 ---- .../sql/schema/kafka/BeamKafkaCSVTableTest.java | 111 ----- .../sql/schema/text/BeamTextCSVTableTest.java | 176 ------- .../transform/BeamAggregationTransformTest.java | 453 ------------------- .../schema/transform/BeamTransformBaseTest.java | 97 ---- .../extensions/sql/BeamSqlApiSurfaceTest.java | 59 +++ .../sql/BeamSqlDslAggregationTest.java | 380 ++++++++++++++++ .../beam/sdk/extensions/sql/BeamSqlDslBase.java | 170 +++++++ .../extensions/sql/BeamSqlDslFilterTest.java | 155 +++++++ .../sdk/extensions/sql/BeamSqlDslJoinTest.java | 191 ++++++++ .../extensions/sql/BeamSqlDslProjectTest.java | 238 ++++++++++ .../extensions/sql/BeamSqlDslUdfUdafTest.java | 138 ++++++ .../beam/sdk/extensions/sql/TestUtils.java | 195 ++++++++ ...amSqlArithmeticOperatorsIntegrationTest.java | 165 +++++++ ...mSqlBuiltinFunctionsIntegrationTestBase.java | 169 +++++++ ...amSqlComparisonOperatorsIntegrationTest.java | 330 ++++++++++++++ ...mSqlConditionalFunctionsIntegrationTest.java | 60 +++ .../BeamSqlDateFunctionsIntegrationTest.java | 88 ++++ .../BeamSqlLogicalFunctionsIntegrationTest.java | 43 ++ .../BeamSqlMathFunctionsIntegrationTest.java | 351 ++++++++++++++ .../BeamSqlStringFunctionsIntegrationTest.java | 51 +++ .../sql/interpreter/BeamSqlFnExecutorTest.java | 416 +++++++++++++++++ .../interpreter/BeamSqlFnExecutorTestBase.java | 92 ++++ .../operator/BeamNullExperssionTest.java | 55 +++ .../operator/BeamSqlAndOrExpressionTest.java | 61 +++ .../operator/BeamSqlCaseExpressionTest.java | 93 ++++ .../operator/BeamSqlCastExpressionTest.java | 125 +++++ .../operator/BeamSqlCompareExpressionTest.java | 115 +++++ .../operator/BeamSqlInputRefExpressionTest.java | 57 +++ .../operator/BeamSqlPrimitiveTest.java | 59 +++ .../BeamSqlReinterpretExpressionTest.java | 75 +++ .../operator/BeamSqlUdfExpressionTest.java | 51 +++ .../BeamSqlArithmeticExpressionTest.java | 237 ++++++++++ .../date/BeamSqlCurrentDateExpressionTest.java | 38 ++ .../date/BeamSqlCurrentTimeExpressionTest.java | 39 ++ .../BeamSqlCurrentTimestampExpressionTest.java | 39 ++ .../date/BeamSqlDateCeilExpressionTest.java | 50 ++ .../date/BeamSqlDateExpressionTestBase.java | 51 +++ .../date/BeamSqlDateFloorExpressionTest.java | 49 ++ .../date/BeamSqlExtractExpressionTest.java | 103 +++++ .../logical/BeamSqlNotExpressionTest.java | 47 ++ .../math/BeamSqlMathBinaryExpressionTest.java | 201 ++++++++ .../math/BeamSqlMathUnaryExpressionTest.java | 309 +++++++++++++ .../string/BeamSqlCharLengthExpressionTest.java | 44 ++ .../string/BeamSqlConcatExpressionTest.java | 66 +++ .../string/BeamSqlInitCapExpressionTest.java | 54 +++ .../string/BeamSqlLowerExpressionTest.java | 44 ++ .../string/BeamSqlOverlayExpressionTest.java | 87 ++++ .../string/BeamSqlPositionExpressionTest.java | 84 ++++ .../BeamSqlStringUnaryExpressionTest.java | 52 +++ .../string/BeamSqlSubstringExpressionTest.java | 101 +++++ .../string/BeamSqlTrimExpressionTest.java | 103 +++++ .../string/BeamSqlUpperExpressionTest.java | 44 ++ .../extensions/sql/mock/MockedBoundedTable.java | 134 ++++++ .../sdk/extensions/sql/mock/MockedTable.java | 42 ++ .../sql/mock/MockedUnboundedTable.java | 112 +++++ .../sql/rel/BeamIntersectRelTest.java | 119 +++++ .../rel/BeamJoinRelBoundedVsBoundedTest.java | 204 +++++++++ .../rel/BeamJoinRelUnboundedVsBoundedTest.java | 241 ++++++++++ .../BeamJoinRelUnboundedVsUnboundedTest.java | 219 +++++++++ .../extensions/sql/rel/BeamMinusRelTest.java | 118 +++++ .../sql/rel/BeamSetOperatorRelBaseTest.java | 106 +++++ .../sdk/extensions/sql/rel/BeamSortRelTest.java | 237 ++++++++++ .../extensions/sql/rel/BeamUnionRelTest.java | 104 +++++ .../extensions/sql/rel/BeamValuesRelTest.java | 105 +++++ .../beam/sdk/extensions/sql/rel/CheckSize.java | 41 ++ .../sql/schema/BeamSqlRowCoderTest.java | 82 ++++ .../sql/schema/kafka/BeamKafkaCSVTableTest.java | 111 +++++ .../sql/schema/text/BeamTextCSVTableTest.java | 176 +++++++ .../transform/BeamAggregationTransformTest.java | 453 +++++++++++++++++++ .../schema/transform/BeamTransformBaseTest.java | 97 ++++ 426 files changed, 19118 insertions(+), 19184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java deleted file mode 100644 index d902f42..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql; - -import com.google.auto.value.AutoValue; -import org.apache.beam.dsls.sql.rel.BeamRelNode; -import org.apache.beam.dsls.sql.schema.BeamPCollectionTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.schema.BeamSqlUdaf; -import org.apache.beam.dsls.sql.schema.BeamSqlUdf; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.SqlSelect; -import org.apache.calcite.sql.parser.SqlParseException; -import org.apache.calcite.tools.RelConversionException; -import org.apache.calcite.tools.ValidationException; - -/** - * {@code BeamSql} is the DSL interface of BeamSQL. It translates a SQL query as a - * {@link PTransform}, so developers can use standard SQL queries in a Beam pipeline. - * - * <h1>Beam SQL DSL usage:</h1> - * A typical pipeline with Beam SQL DSL is: - * <pre> - *{@code -PipelineOptions options = PipelineOptionsFactory.create(); -Pipeline p = Pipeline.create(options); - -//create table from TextIO; -PCollection<BeamSqlRow> inputTableA = p.apply(TextIO.read().from("/my/input/patha")) - .apply(...); -PCollection<BeamSqlRow> inputTableB = p.apply(TextIO.read().from("/my/input/pathb")) - .apply(...); - -//run a simple query, and register the output as a table in BeamSql; -String sql1 = "select MY_FUNC(c1), c2 from PCOLLECTION"; -PCollection<BeamSqlRow> outputTableA = inputTableA.apply( - BeamSql.simpleQuery(sql1) - .withUdf("MY_FUNC", MY_FUNC.class, "FUNC")); - -//run a JOIN with one table from TextIO, and one table from another query -PCollection<BeamSqlRow> outputTableB = PCollectionTuple.of( - new TupleTag<BeamSqlRow>("TABLE_O_A"), outputTableA) - .and(new TupleTag<BeamSqlRow>("TABLE_B"), inputTableB) - .apply(BeamSql.query("select * from TABLE_O_A JOIN TABLE_B where ...")); - -//output the final result with TextIO -outputTableB.apply(...).apply(TextIO.write().to("/my/output/path")); - -p.run().waitUntilFinish(); - * } - * </pre> - */ -@Experimental -public class BeamSql { - /** - * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan. - * - * <p>The returned {@link PTransform} can be applied to a {@link PCollectionTuple} representing - * all the input tables and results in a {@code PCollection<BeamSqlRow>} representing the output - * table. The {@link PCollectionTuple} contains the mapping from {@code table names} to - * {@code PCollection<BeamSqlRow>}, each representing an input table. - * - * <p>It is an error to apply a {@link PCollectionTuple} missing any {@code table names} - * referenced within the query. - */ - public static QueryTransform query(String sqlQuery) { - return QueryTransform.builder() - .setSqlEnv(new BeamSqlEnv()) - .setSqlQuery(sqlQuery) - .build(); - } - - /** - * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan. - * - * <p>This is a simplified form of {@link #query(String)} where the query must reference - * a single input table. - * - * <p>Make sure to query it from a static table name <em>PCOLLECTION</em>. - */ - public static SimpleQueryTransform simpleQuery(String sqlQuery) throws Exception { - return SimpleQueryTransform.builder() - .setSqlEnv(new BeamSqlEnv()) - .setSqlQuery(sqlQuery) - .build(); - } - - /** - * A {@link PTransform} representing an execution plan for a SQL query. - */ - @AutoValue - public abstract static class QueryTransform extends - PTransform<PCollectionTuple, PCollection<BeamSqlRow>> { - abstract BeamSqlEnv getSqlEnv(); - abstract String getSqlQuery(); - - static Builder builder() { - return new AutoValue_BeamSql_QueryTransform.Builder(); - } - - @AutoValue.Builder - abstract static class Builder { - abstract Builder setSqlQuery(String sqlQuery); - abstract Builder setSqlEnv(BeamSqlEnv sqlEnv); - abstract QueryTransform build(); - } - - /** - * register a UDF function used in this query. - */ - public QueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){ - getSqlEnv().registerUdf(functionName, clazz); - return this; - } - - /** - * register a UDAF function used in this query. - */ - public QueryTransform withUdaf(String functionName, Class<? extends BeamSqlUdaf> clazz){ - getSqlEnv().registerUdaf(functionName, clazz); - return this; - } - - @Override - public PCollection<BeamSqlRow> expand(PCollectionTuple input) { - registerTables(input); - - BeamRelNode beamRelNode = null; - try { - beamRelNode = getSqlEnv().planner.convertToBeamRel(getSqlQuery()); - } catch (ValidationException | RelConversionException | SqlParseException e) { - throw new IllegalStateException(e); - } - - try { - return beamRelNode.buildBeamPipeline(input, getSqlEnv()); - } catch (Exception e) { - throw new IllegalStateException(e); - } - } - - //register tables, related with input PCollections. - private void registerTables(PCollectionTuple input){ - for (TupleTag<?> sourceTag : input.getAll().keySet()) { - PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag); - BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder(); - - getSqlEnv().registerTable(sourceTag.getId(), - new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema())); - } - } - } - - /** - * A {@link PTransform} representing an execution plan for a SQL query referencing - * a single table. - */ - @AutoValue - public abstract static class SimpleQueryTransform - extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> { - private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION"; - abstract BeamSqlEnv getSqlEnv(); - abstract String getSqlQuery(); - - static Builder builder() { - return new AutoValue_BeamSql_SimpleQueryTransform.Builder(); - } - - @AutoValue.Builder - abstract static class Builder { - abstract Builder setSqlQuery(String sqlQuery); - abstract Builder setSqlEnv(BeamSqlEnv sqlEnv); - abstract SimpleQueryTransform build(); - } - - /** - * register a UDF function used in this query. - */ - public SimpleQueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){ - getSqlEnv().registerUdf(functionName, clazz); - return this; - } - - /** - * register a UDAF function used in this query. - */ - public SimpleQueryTransform withUdaf(String functionName, Class<? extends BeamSqlUdaf> clazz){ - getSqlEnv().registerUdaf(functionName, clazz); - return this; - } - - private void validateQuery() { - SqlNode sqlNode; - try { - sqlNode = getSqlEnv().planner.parseQuery(getSqlQuery()); - getSqlEnv().planner.getPlanner().close(); - } catch (SqlParseException e) { - throw new IllegalStateException(e); - } - - if (sqlNode instanceof SqlSelect) { - SqlSelect select = (SqlSelect) sqlNode; - String tableName = select.getFrom().toString(); - if (!tableName.equalsIgnoreCase(PCOLLECTION_TABLE_NAME)) { - throw new IllegalStateException("Use fixed table name " + PCOLLECTION_TABLE_NAME); - } - } else { - throw new UnsupportedOperationException( - "Sql operation: " + sqlNode.toString() + " is not supported!"); - } - } - - @Override - public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) { - validateQuery(); - return PCollectionTuple.of(new TupleTag<BeamSqlRow>(PCOLLECTION_TABLE_NAME), input) - .apply(QueryTransform.builder() - .setSqlEnv(getSqlEnv()) - .setSqlQuery(getSqlQuery()) - .build()); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java deleted file mode 100644 index 50da244..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql; - -import org.apache.beam.dsls.sql.rel.BeamRelNode; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.values.PCollection; -import org.apache.calcite.plan.RelOptUtil; - -/** - * {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client. - */ -@Experimental -public class BeamSqlCli { - /** - * Returns a human readable representation of the query execution plan. - */ - public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) throws Exception { - BeamRelNode exeTree = sqlEnv.planner.convertToBeamRel(sqlString); - String beamPlan = RelOptUtil.toString(exeTree); - return beamPlan; - } - - /** - * compile SQL, and return a {@link Pipeline}. - */ - public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv) - throws Exception{ - PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation() - .as(PipelineOptions.class); // FlinkPipelineOptions.class - options.setJobName("BeamPlanCreator"); - Pipeline pipeline = Pipeline.create(options); - - return compilePipeline(sqlStatement, pipeline, sqlEnv); - } - - /** - * compile SQL, and return a {@link Pipeline}. - */ - public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline - , BeamSqlEnv sqlEnv) throws Exception{ - PCollection<BeamSqlRow> resultStream = - sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv); - return resultStream; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java deleted file mode 100644 index 0e1ac98..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql; - -import java.io.Serializable; - -import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; -import org.apache.beam.dsls.sql.schema.BaseBeamTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.dsls.sql.schema.BeamSqlUdaf; -import org.apache.beam.dsls.sql.schema.BeamSqlUdf; -import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.calcite.DataContext; -import org.apache.calcite.linq4j.Enumerable; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.schema.ScannableTable; -import org.apache.calcite.schema.Schema; -import org.apache.calcite.schema.SchemaPlus; -import org.apache.calcite.schema.Statistic; -import org.apache.calcite.schema.Statistics; -import org.apache.calcite.schema.impl.AggregateFunctionImpl; -import org.apache.calcite.schema.impl.ScalarFunctionImpl; -import org.apache.calcite.tools.Frameworks; - -/** - * {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and {@link BeamSqlCli}. - * - * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF functions, and - * a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries. - */ -public class BeamSqlEnv implements Serializable{ - transient SchemaPlus schema; - transient BeamQueryPlanner planner; - - public BeamSqlEnv() { - schema = Frameworks.createRootSchema(true); - planner = new BeamQueryPlanner(schema); - } - - /** - * Register a UDF function which can be used in SQL expression. - */ - public void registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) { - schema.add(functionName, ScalarFunctionImpl.create(clazz, BeamSqlUdf.UDF_METHOD)); - } - - /** - * Register a UDAF function which can be used in GROUP-BY expression. - * See {@link BeamSqlUdaf} on how to implement a UDAF. - */ - public void registerUdaf(String functionName, Class<? extends BeamSqlUdaf> clazz) { - schema.add(functionName, AggregateFunctionImpl.create(clazz)); - } - - /** - * Registers a {@link BaseBeamTable} which can be used for all subsequent queries. - * - */ - public void registerTable(String tableName, BaseBeamTable table) { - schema.add(tableName, new BeamCalciteTable(table.getRowType())); - planner.getSourceTables().put(tableName, table); - } - - /** - * Find {@link BaseBeamTable} by table name. - */ - public BaseBeamTable findTable(String tableName){ - return planner.getSourceTables().get(tableName); - } - - private static class BeamCalciteTable implements ScannableTable, Serializable { - private BeamSqlRowType beamSqlRowType; - public BeamCalciteTable(BeamSqlRowType beamSqlRowType) { - this.beamSqlRowType = beamSqlRowType; - } - @Override - public RelDataType getRowType(RelDataTypeFactory typeFactory) { - return CalciteUtils.toCalciteRowType(this.beamSqlRowType) - .apply(BeamQueryPlanner.TYPE_FACTORY); - } - - @Override - public Enumerable<Object[]> scan(DataContext root) { - // not used as Beam SQL uses its own execution engine - return null; - } - - /** - * Not used {@link Statistic} to optimize the plan. - */ - @Override - public Statistic getStatistic() { - return Statistics.UNKNOWN; - } - - /** - * all sources are treated as TABLE in Beam SQL. - */ - @Override - public Schema.TableType getJdbcTableType() { - return Schema.TableType.TABLE; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java deleted file mode 100644 index 4e364e1..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.example; - -import java.sql.Types; -import java.util.Arrays; -import java.util.List; -import org.apache.beam.dsls.sql.BeamSql; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; -import org.apache.beam.dsls.sql.schema.BeamSqlRowType; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; - -/** - * This is a quick example, which uses Beam SQL DSL to create a data pipeline. - * - * <p>Run the example with - * <pre> - * mvn -pl dsls/sql compile exec:java \ - * -Dexec.mainClass=org.apache.beam.dsls.sql.example.BeamSqlExample \ - * -Dexec.args="--runner=DirectRunner" -Pdirect-runner - * </pre> - * - */ -class BeamSqlExample { - public static void main(String[] args) throws Exception { - PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class); - Pipeline p = Pipeline.create(options); - - //define the input row format - List<String> fieldNames = Arrays.asList("c1", "c2", "c3"); - List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE); - BeamSqlRowType type = BeamSqlRowType.create(fieldNames, fieldTypes); - BeamSqlRow row = new BeamSqlRow(type); - row.addField(0, 1); - row.addField(1, "row"); - row.addField(2, 1.0); - - //create a source PCollection with Create.of(); - PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row) - .withCoder(new BeamSqlRowCoder(type))); - - //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery; - PCollection<BeamSqlRow> outputStream = inputTable.apply( - BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1=1")); - - //print the output record of case 1; - outputStream.apply("log_result", - MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() { - public Void apply(BeamSqlRow input) { - System.out.println("PCOLLECTION: " + input); - return null; - } - })); - - //Case 2. run the query with BeamSql.query over result PCollection of case 1. - PCollection<BeamSqlRow> outputStream2 = - PCollectionTuple.of(new TupleTag<BeamSqlRow>("CASE1_RESULT"), outputStream) - .apply(BeamSql.query("select c2, c3 from CASE1_RESULT where c1=1")); - - //print the output record of case 2; - outputStream2.apply("log_result", - MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() { - @Override - public Void apply(BeamSqlRow input) { - System.out.println("TABLE_B: " + input); - return null; - } - })); - - p.run().waitUntilFinish(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/package-info.java deleted file mode 100644 index 52a9fce..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/example/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * examples on how to use BeamSQL. - * - */ -package org.apache.beam.dsls.sql.example; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java deleted file mode 100644 index 3732933..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.interpreter; - -import java.io.Serializable; -import java.util.List; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; - -/** - * {@code BeamSqlExpressionExecutor} fills the gap between relational - * expressions in Calcite SQL and executable code. - * - */ -public interface BeamSqlExpressionExecutor extends Serializable { - - /** - * invoked before data processing. - */ - void prepare(); - - /** - * apply transformation to input record {@link BeamSqlRow}. - * - */ - List<Object> execute(BeamSqlRow inputRow); - - void close(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java deleted file mode 100644 index aee0e4a..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java +++ /dev/null @@ -1,442 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.interpreter; - -import java.math.BigDecimal; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.List; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCastExpression; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlReinterpretExpression; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpression; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowEndExpression; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowExpression; -import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowStartExpression; -import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression; -import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression; -import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression; -import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression; -import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression; -import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlEqualsExpression; -import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlGreaterThanExpression; -import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlGreaterThanOrEqualsExpression; -import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlIsNotNullExpression; -import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlIsNullExpression; -import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlLessThanExpression; -import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression; -import org.apache.beam.dsls.sql.interpreter.operator.comparison.BeamSqlNotEqualsExpression; -import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentDateExpression; -import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentTimeExpression; -import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentTimestampExpression; -import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlDateCeilExpression; -import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlDateFloorExpression; -import org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlExtractExpression; -import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlAndExpression; -import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlNotExpression; -import org.apache.beam.dsls.sql.interpreter.operator.logical.BeamSqlOrExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAbsExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAcosExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAsinExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAtan2Expression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAtanExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlCeilExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlCosExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlCotExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlDegreesExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlExpExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlFloorExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlLnExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlLogExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlPiExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlPowerExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRadiansExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRandExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRandIntegerExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlRoundExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlSignExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlSinExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlTanExpression; -import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlTruncateExpression; -import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlCharLengthExpression; -import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlConcatExpression; -import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlInitCapExpression; -import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlLowerExpression; -import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlOverlayExpression; -import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlPositionExpression; -import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlSubstringExpression; -import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlTrimExpression; -import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlUpperExpression; -import org.apache.beam.dsls.sql.rel.BeamFilterRel; -import org.apache.beam.dsls.sql.rel.BeamProjectRel; -import org.apache.beam.dsls.sql.rel.BeamRelNode; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.calcite.rex.RexCall; -import org.apache.calcite.rex.RexInputRef; -import org.apache.calcite.rex.RexLiteral; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.schema.impl.ScalarFunctionImpl; -import org.apache.calcite.sql.SqlOperator; -import org.apache.calcite.sql.type.SqlTypeName; -import org.apache.calcite.sql.validate.SqlUserDefinedFunction; -import org.apache.calcite.util.NlsString; - -/** - * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}. - * {@code BeamSqlFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression}, - * which can be evaluated against the {@link BeamSqlRow}. - * - */ -public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor { - protected List<BeamSqlExpression> exps; - - public BeamSqlFnExecutor(BeamRelNode relNode) { - this.exps = new ArrayList<>(); - if (relNode instanceof BeamFilterRel) { - BeamFilterRel filterNode = (BeamFilterRel) relNode; - RexNode condition = filterNode.getCondition(); - exps.add(buildExpression(condition)); - } else if (relNode instanceof BeamProjectRel) { - BeamProjectRel projectNode = (BeamProjectRel) relNode; - List<RexNode> projects = projectNode.getProjects(); - for (RexNode rexNode : projects) { - exps.add(buildExpression(rexNode)); - } - } else { - throw new UnsupportedOperationException( - String.format("%s is not supported yet!", relNode.getClass().toString())); - } - } - - /** - * {@link #buildExpression(RexNode)} visits the operands of {@link RexNode} recursively, - * and represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}. - */ - static BeamSqlExpression buildExpression(RexNode rexNode) { - BeamSqlExpression ret = null; - if (rexNode instanceof RexLiteral) { - RexLiteral node = (RexLiteral) rexNode; - SqlTypeName type = node.getTypeName(); - Object value = node.getValue(); - - if (SqlTypeName.CHAR_TYPES.contains(type) - && node.getValue() instanceof NlsString) { - // NlsString is not serializable, we need to convert - // it to string explicitly. - return BeamSqlPrimitive.of(type, ((NlsString) value).getValue()); - } else if (type == SqlTypeName.DATE && value instanceof Calendar) { - // does this actually make sense? - // Calcite actually treat Calendar as the java type of Date Literal - return BeamSqlPrimitive.of(type, ((Calendar) value).getTime()); - } else { - // node.getType().getSqlTypeName() and node.getSqlTypeName() can be different - // e.g. sql: "select 1" - // here the literal 1 will be parsed as a RexLiteral where: - // node.getType().getSqlTypeName() = INTEGER (the display type) - // node.getSqlTypeName() = DECIMAL (the actual internal storage format) - // So we need to do a convert here. - // check RexBuilder#makeLiteral for more information. - SqlTypeName realType = node.getType().getSqlTypeName(); - Object realValue = value; - if (type == SqlTypeName.DECIMAL) { - BigDecimal rawValue = (BigDecimal) value; - switch (realType) { - case TINYINT: - realValue = (byte) rawValue.intValue(); - break; - case SMALLINT: - realValue = (short) rawValue.intValue(); - break; - case INTEGER: - realValue = rawValue.intValue(); - break; - case BIGINT: - realValue = rawValue.longValue(); - break; - case DECIMAL: - realValue = rawValue; - break; - default: - throw new IllegalStateException("type/realType mismatch: " - + type + " VS " + realType); - } - } else if (type == SqlTypeName.DOUBLE) { - Double rawValue = (Double) value; - if (realType == SqlTypeName.FLOAT) { - realValue = rawValue.floatValue(); - } - } - return BeamSqlPrimitive.of(realType, realValue); - } - } else if (rexNode instanceof RexInputRef) { - RexInputRef node = (RexInputRef) rexNode; - ret = new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex()); - } else if (rexNode instanceof RexCall) { - RexCall node = (RexCall) rexNode; - String opName = node.op.getName(); - List<BeamSqlExpression> subExps = new ArrayList<>(); - for (RexNode subNode : node.getOperands()) { - subExps.add(buildExpression(subNode)); - } - switch (opName) { - // logical operators - case "AND": - ret = new BeamSqlAndExpression(subExps); - break; - case "OR": - ret = new BeamSqlOrExpression(subExps); - break; - case "NOT": - ret = new BeamSqlNotExpression(subExps); - break; - case "=": - ret = new BeamSqlEqualsExpression(subExps); - break; - case "<>": - ret = new BeamSqlNotEqualsExpression(subExps); - break; - case ">": - ret = new BeamSqlGreaterThanExpression(subExps); - break; - case ">=": - ret = new BeamSqlGreaterThanOrEqualsExpression(subExps); - break; - case "<": - ret = new BeamSqlLessThanExpression(subExps); - break; - case "<=": - ret = new BeamSqlLessThanOrEqualsExpression(subExps); - break; - - // arithmetic operators - case "+": - ret = new BeamSqlPlusExpression(subExps); - break; - case "-": - ret = new BeamSqlMinusExpression(subExps); - break; - case "*": - ret = new BeamSqlMultiplyExpression(subExps); - break; - case "/": - case "/INT": - ret = new BeamSqlDivideExpression(subExps); - break; - case "MOD": - ret = new BeamSqlModExpression(subExps); - break; - - case "ABS": - ret = new BeamSqlAbsExpression(subExps); - break; - case "ROUND": - ret = new BeamSqlRoundExpression(subExps); - break; - case "LN": - ret = new BeamSqlLnExpression(subExps); - break; - case "LOG10": - ret = new BeamSqlLogExpression(subExps); - break; - case "EXP": - ret = new BeamSqlExpExpression(subExps); - break; - case "ACOS": - ret = new BeamSqlAcosExpression(subExps); - break; - case "ASIN": - ret = new BeamSqlAsinExpression(subExps); - break; - case "ATAN": - ret = new BeamSqlAtanExpression(subExps); - break; - case "COT": - ret = new BeamSqlCotExpression(subExps); - break; - case "DEGREES": - ret = new BeamSqlDegreesExpression(subExps); - break; - case "RADIANS": - ret = new BeamSqlRadiansExpression(subExps); - break; - case "COS": - ret = new BeamSqlCosExpression(subExps); - break; - case "SIN": - ret = new BeamSqlSinExpression(subExps); - break; - case "TAN": - ret = new BeamSqlTanExpression(subExps); - break; - case "SIGN": - ret = new BeamSqlSignExpression(subExps); - break; - case "POWER": - ret = new BeamSqlPowerExpression(subExps); - break; - case "PI": - ret = new BeamSqlPiExpression(); - break; - case "ATAN2": - ret = new BeamSqlAtan2Expression(subExps); - break; - case "TRUNCATE": - ret = new BeamSqlTruncateExpression(subExps); - break; - case "RAND": - ret = new BeamSqlRandExpression(subExps); - break; - case "RAND_INTEGER": - ret = new BeamSqlRandIntegerExpression(subExps); - break; - - // string operators - case "||": - ret = new BeamSqlConcatExpression(subExps); - break; - case "POSITION": - ret = new BeamSqlPositionExpression(subExps); - break; - case "CHAR_LENGTH": - case "CHARACTER_LENGTH": - ret = new BeamSqlCharLengthExpression(subExps); - break; - case "UPPER": - ret = new BeamSqlUpperExpression(subExps); - break; - case "LOWER": - ret = new BeamSqlLowerExpression(subExps); - break; - case "TRIM": - ret = new BeamSqlTrimExpression(subExps); - break; - case "SUBSTRING": - ret = new BeamSqlSubstringExpression(subExps); - break; - case "OVERLAY": - ret = new BeamSqlOverlayExpression(subExps); - break; - case "INITCAP": - ret = new BeamSqlInitCapExpression(subExps); - break; - - // date functions - case "Reinterpret": - return new BeamSqlReinterpretExpression(subExps, node.type.getSqlTypeName()); - case "CEIL": - if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) { - return new BeamSqlCeilExpression(subExps); - } else { - return new BeamSqlDateCeilExpression(subExps); - } - case "FLOOR": - if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) { - return new BeamSqlFloorExpression(subExps); - } else { - return new BeamSqlDateFloorExpression(subExps); - } - case "EXTRACT_DATE": - case "EXTRACT": - return new BeamSqlExtractExpression(subExps); - - case "LOCALTIME": - case "CURRENT_TIME": - return new BeamSqlCurrentTimeExpression(subExps); - - case "CURRENT_TIMESTAMP": - case "LOCALTIMESTAMP": - return new BeamSqlCurrentTimestampExpression(subExps); - - case "CURRENT_DATE": - return new BeamSqlCurrentDateExpression(); - - - case "CASE": - ret = new BeamSqlCaseExpression(subExps); - break; - case "CAST": - ret = new BeamSqlCastExpression(subExps, node.type.getSqlTypeName()); - break; - - case "IS NULL": - ret = new BeamSqlIsNullExpression(subExps.get(0)); - break; - case "IS NOT NULL": - ret = new BeamSqlIsNotNullExpression(subExps.get(0)); - break; - - case "HOP": - case "TUMBLE": - case "SESSION": - ret = new BeamSqlWindowExpression(subExps, node.type.getSqlTypeName()); - break; - case "HOP_START": - case "TUMBLE_START": - case "SESSION_START": - ret = new BeamSqlWindowStartExpression(); - break; - case "HOP_END": - case "TUMBLE_END": - case "SESSION_END": - ret = new BeamSqlWindowEndExpression(); - break; - default: - //handle UDF - if (((RexCall) rexNode).getOperator() instanceof SqlUserDefinedFunction) { - SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) rexNode).getOperator(); - ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction(); - ret = new BeamSqlUdfExpression(fn.method, subExps, - ((RexCall) rexNode).type.getSqlTypeName()); - } else { - throw new UnsupportedOperationException("Operator: " + opName + " is not supported yet!"); - } - } - } else { - throw new UnsupportedOperationException( - String.format("%s is not supported yet!", rexNode.getClass().toString())); - } - - if (ret != null && !ret.accept()) { - throw new IllegalStateException(ret.getClass().getSimpleName() - + " does not accept the operands.(" + rexNode + ")"); - } - - return ret; - } - - @Override - public void prepare() { - } - - @Override - public List<Object> execute(BeamSqlRow inputRow) { - List<Object> results = new ArrayList<>(); - for (BeamSqlExpression exp : exps) { - results.add(exp.evaluate(inputRow).getValue()); - } - return results; - } - - @Override - public void close() { - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java deleted file mode 100644 index a30916b..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.interpreter.operator; - -import java.util.List; - -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * {@code BeamSqlCaseExpression} represents CASE, NULLIF, COALESCE in SQL. - */ -public class BeamSqlCaseExpression extends BeamSqlExpression { - public BeamSqlCaseExpression(List<BeamSqlExpression> operands) { - // the return type of CASE is the type of the `else` condition - super(operands, operands.get(operands.size() - 1).getOutputType()); - } - - @Override public boolean accept() { - // `when`-`then` pair + `else` - if (operands.size() % 2 != 1) { - return false; - } - - for (int i = 0; i < operands.size() - 1; i += 2) { - if (opType(i) != SqlTypeName.BOOLEAN) { - return false; - } else if (opType(i + 1) != outputType) { - return false; - } - } - - return true; - } - - @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - for (int i = 0; i < operands.size() - 1; i += 2) { - if (opValueEvaluated(i, inputRow)) { - return BeamSqlPrimitive.of( - outputType, - opValueEvaluated(i + 1, inputRow) - ); - } - } - return BeamSqlPrimitive.of(outputType, - opValueEvaluated(operands.size() - 1, inputRow)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java deleted file mode 100644 index 524d1df..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCastExpression.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.dsls.sql.interpreter.operator; - -import java.sql.Date; -import java.sql.Timestamp; -import java.util.List; - -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.calcite.runtime.SqlFunctions; -import org.apache.calcite.sql.type.SqlTypeName; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; -import org.joda.time.format.DateTimeFormatterBuilder; -import org.joda.time.format.DateTimeParser; - -/** - * Base class to support 'CAST' operations for all {@link SqlTypeName}. - */ -public class BeamSqlCastExpression extends BeamSqlExpression { - - private static final int index = 0; - private static final String outputTimestampFormat = "yyyy-MM-dd HH:mm:ss"; - private static final String outputDateFormat = "yyyy-MM-dd"; - /** - * Date and Timestamp formats used to parse - * {@link SqlTypeName#DATE}, {@link SqlTypeName#TIMESTAMP}. - */ - private static final DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder() - .append(null/*printer*/, new DateTimeParser[] { - // date formats - DateTimeFormat.forPattern("yy-MM-dd").getParser(), - DateTimeFormat.forPattern("yy/MM/dd").getParser(), - DateTimeFormat.forPattern("yy.MM.dd").getParser(), - DateTimeFormat.forPattern("yyMMdd").getParser(), - DateTimeFormat.forPattern("yyyyMMdd").getParser(), - DateTimeFormat.forPattern("yyyy-MM-dd").getParser(), - DateTimeFormat.forPattern("yyyy/MM/dd").getParser(), - DateTimeFormat.forPattern("yyyy.MM.dd").getParser(), - // datetime formats - DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").getParser(), - DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssz").getParser(), - DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss z").getParser(), - DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS").getParser(), - DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSSz").getParser(), - DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS z").getParser() }).toFormatter() - .withPivotYear(2020); - - public BeamSqlCastExpression(List<BeamSqlExpression> operands, SqlTypeName castType) { - super(operands, castType); - } - - @Override - public boolean accept() { - return numberOfOperands() == 1; - } - - @Override - public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - SqlTypeName castOutputType = getOutputType(); - switch (castOutputType) { - case INTEGER: - return BeamSqlPrimitive - .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow))); - case DOUBLE: - return BeamSqlPrimitive - .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRow))); - case SMALLINT: - return BeamSqlPrimitive - .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRow))); - case TINYINT: - return BeamSqlPrimitive - .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRow))); - case BIGINT: - return BeamSqlPrimitive - .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow))); - case DECIMAL: - return BeamSqlPrimitive.of(SqlTypeName.DECIMAL, - SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow))); - case FLOAT: - return BeamSqlPrimitive - .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow))); - case CHAR: - case VARCHAR: - return BeamSqlPrimitive - .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow).toString()); - case DATE: - return BeamSqlPrimitive - .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRow), outputDateFormat)); - case TIMESTAMP: - return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, - toTimeStamp(opValueEvaluated(index, inputRow), outputTimestampFormat)); - } - throw new UnsupportedOperationException( - String.format("Cast to type %s not supported", castOutputType)); - } - - private Date toDate(Object inputDate, String outputFormat) { - try { - return Date - .valueOf(dateTimeFormatter.parseLocalDate(inputDate.toString()).toString(outputFormat)); - } catch (IllegalArgumentException | UnsupportedOperationException e) { - throw new UnsupportedOperationException("Can't be cast to type 'Date'"); - } - } - - private Timestamp toTimeStamp(Object inputTimestamp, String outputFormat) { - try { - return Timestamp.valueOf( - dateTimeFormatter.parseDateTime(inputTimestamp.toString()).secondOfMinute() - .roundCeilingCopy().toString(outputFormat)); - } catch (IllegalArgumentException | UnsupportedOperationException e) { - throw new UnsupportedOperationException("Can't be cast to type 'Timestamp'"); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java deleted file mode 100644 index 9d2815c..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.interpreter.operator; - -import java.io.Serializable; -import java.util.List; -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * {@code BeamSqlExpression} is an equivalent expression in BeamSQL, of {@link RexNode} in Calcite. - * - * <p>An implementation of {@link BeamSqlExpression} takes one or more {@code BeamSqlExpression} - * as its operands, and return a value with type {@link SqlTypeName}. - * - */ -public abstract class BeamSqlExpression implements Serializable { - protected List<BeamSqlExpression> operands; - protected SqlTypeName outputType; - - protected BeamSqlExpression(){} - - public BeamSqlExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) { - this.operands = operands; - this.outputType = outputType; - } - - public BeamSqlExpression op(int idx) { - return operands.get(idx); - } - - public SqlTypeName opType(int idx) { - return op(idx).getOutputType(); - } - - public <T> T opValueEvaluated(int idx, BeamSqlRow row) { - return (T) op(idx).evaluate(row).getValue(); - } - - /** - * assertion to make sure the input and output are supported in this expression. - */ - public abstract boolean accept(); - - /** - * Apply input record {@link BeamSqlRow} to this expression, - * the output value is wrapped with {@link BeamSqlPrimitive}. - */ - public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRow); - - public List<BeamSqlExpression> getOperands() { - return operands; - } - - public SqlTypeName getOutputType() { - return outputType; - } - - public int numberOfOperands() { - return operands.size(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java deleted file mode 100644 index 710460b..0000000 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.interpreter.operator; - -import org.apache.beam.dsls.sql.schema.BeamSqlRow; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * An primitive operation for direct field extraction. - */ -public class BeamSqlInputRefExpression extends BeamSqlExpression { - private int inputRef; - - public BeamSqlInputRefExpression(SqlTypeName sqlTypeName, int inputRef) { - super(null, sqlTypeName); - this.inputRef = inputRef; - } - - @Override - public boolean accept() { - return true; - } - - @Override - public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) { - return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef)); - } -}