http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java new file mode 100644 index 0000000..fb0a8e2 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * BeamSQL specified nodes, to replace {@link org.apache.calcite.rel.RelNode}. + * + */ +package org.apache.beam.sdk.extensions.sql.rel;
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java new file mode 100644 index 0000000..17e3f80 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.rule; + +import com.google.common.collect.ImmutableList; +import java.util.GregorianCalendar; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.rel.BeamAggregationRel; +import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.ImmutableBitSet; +import org.joda.time.Duration; + +/** + * Rule to detect the window/trigger settings. + * + */ +public class BeamAggregationRule extends RelOptRule { + public static final BeamAggregationRule INSTANCE = + new BeamAggregationRule(Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER); + + public BeamAggregationRule( + Class<? extends Aggregate> aggregateClass, + Class<? extends Project> projectClass, + RelBuilderFactory relBuilderFactory) { + super( + operand(aggregateClass, + operand(projectClass, any())), + relBuilderFactory, null); + } + + public BeamAggregationRule(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final Aggregate aggregate = call.rel(0); + final Project project = call.rel(1); + updateWindowTrigger(call, aggregate, project); + } + + private void updateWindowTrigger(RelOptRuleCall call, Aggregate aggregate, + Project project) { + ImmutableBitSet groupByFields = aggregate.getGroupSet(); + List<RexNode> projectMapping = project.getProjects(); + + WindowFn windowFn = new GlobalWindows(); + Trigger triggerFn = Repeatedly.forever(AfterWatermark.pastEndOfWindow()); + int windowFieldIdx = -1; + Duration allowedLatence = Duration.ZERO; + + for (int groupField : groupByFields.asList()) { + RexNode projNode = projectMapping.get(groupField); + if (projNode instanceof RexCall) { + SqlOperator op = ((RexCall) projNode).op; + ImmutableList<RexNode> parameters = ((RexCall) projNode).operands; + String functionName = op.getName(); + switch (functionName) { + case "TUMBLE": + windowFieldIdx = groupField; + windowFn = FixedWindows + .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1)))); + if (parameters.size() == 3) { + GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2)) + .getValue(); + triggerFn = createTriggerWithDelay(delayTime); + allowedLatence = (Duration.millis(delayTime.getTimeInMillis())); + } + break; + case "HOP": + windowFieldIdx = groupField; + windowFn = SlidingWindows + .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1)))) + .every(Duration.millis(getWindowParameterAsMillis(parameters.get(2)))); + if (parameters.size() == 4) { + GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(3)) + .getValue(); + triggerFn = createTriggerWithDelay(delayTime); + allowedLatence = (Duration.millis(delayTime.getTimeInMillis())); + } + break; + case "SESSION": + windowFieldIdx = groupField; + windowFn = Sessions + .withGapDuration(Duration.millis(getWindowParameterAsMillis(parameters.get(1)))); + if (parameters.size() == 3) { + GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2)) + .getValue(); + triggerFn = createTriggerWithDelay(delayTime); + allowedLatence = (Duration.millis(delayTime.getTimeInMillis())); + } + break; + default: + break; + } + } + } + + BeamAggregationRel newAggregator = new BeamAggregationRel(aggregate.getCluster(), + aggregate.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(aggregate.getInput(), + aggregate.getInput().getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + aggregate.indicator, + aggregate.getGroupSet(), + aggregate.getGroupSets(), + aggregate.getAggCallList(), + windowFn, + triggerFn, + windowFieldIdx, + allowedLatence); + call.transformTo(newAggregator); + } + + private Trigger createTriggerWithDelay(GregorianCalendar delayTime) { + return Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime + .pastFirstElementInPane().plusDelayOf(Duration.millis(delayTime.getTimeInMillis())))); + } + + private long getWindowParameterAsMillis(RexNode parameterNode) { + if (parameterNode instanceof RexLiteral) { + return RexLiteral.intValue(parameterNode); + } else { + throw new IllegalArgumentException(String.format("[%s] is not valid.", parameterNode)); + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java new file mode 100644 index 0000000..b30a9d9 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.rule; + +import org.apache.beam.sdk.extensions.sql.rel.BeamFilterRel; +import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.logical.LogicalFilter; + +/** + * A {@code ConverterRule} to replace {@link Filter} with {@link BeamFilterRel}. + * + */ +public class BeamFilterRule extends ConverterRule { + public static final BeamFilterRule INSTANCE = new BeamFilterRule(); + + private BeamFilterRule() { + super(LogicalFilter.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamFilterRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final Filter filter = (Filter) rel; + final RelNode input = filter.getInput(); + + return new BeamFilterRel(filter.getCluster(), + filter.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + filter.getCondition()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java new file mode 100644 index 0000000..54079b0 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.rule; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.rel.BeamIOSinkRel; +import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.prepare.Prepare; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.TableModify; +import org.apache.calcite.rel.logical.LogicalTableModify; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.Table; + +/** + * A {@code ConverterRule} to replace {@link TableModify} with + * {@link BeamIOSinkRel}. + * + */ +public class BeamIOSinkRule extends ConverterRule { + public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule(); + + private BeamIOSinkRule() { + super(LogicalTableModify.class, Convention.NONE, BeamLogicalConvention.INSTANCE, + "BeamIOSinkRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final TableModify tableModify = (TableModify) rel; + final RelNode input = tableModify.getInput(); + + final RelOptCluster cluster = tableModify.getCluster(); + final RelTraitSet traitSet = tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE); + final RelOptTable relOptTable = tableModify.getTable(); + final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader(); + final RelNode convertedInput = convert(input, + input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)); + final TableModify.Operation operation = tableModify.getOperation(); + final List<String> updateColumnList = tableModify.getUpdateColumnList(); + final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList(); + final boolean flattened = tableModify.isFlattened(); + + final Table table = tableModify.getTable().unwrap(Table.class); + + switch (table.getJdbcTableType()) { + case TABLE: + case STREAM: + if (operation != TableModify.Operation.INSERT) { + throw new UnsupportedOperationException( + String.format("Streams doesn't support %s modify operation", operation)); + } + return new BeamIOSinkRel(cluster, traitSet, + relOptTable, catalogReader, convertedInput, operation, updateColumnList, + sourceExpressionList, flattened); + default: + throw new IllegalArgumentException( + String.format("Unsupported table type: %s", table.getJdbcTableType())); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java new file mode 100644 index 0000000..496b977 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.rule; + +import org.apache.beam.sdk.extensions.sql.rel.BeamIOSourceRel; +import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.logical.LogicalTableScan; + +/** + * A {@code ConverterRule} to replace {@link TableScan} with + * {@link BeamIOSourceRel}. + * + */ +public class BeamIOSourceRule extends ConverterRule { + public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule(); + + private BeamIOSourceRule() { + super(LogicalTableScan.class, Convention.NONE, BeamLogicalConvention.INSTANCE, + "BeamIOSourceRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final TableScan scan = (TableScan) rel; + + return new BeamIOSourceRel(scan.getCluster(), + scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE), scan.getTable()); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java new file mode 100644 index 0000000..6fdbd9b --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rule; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.rel.BeamIntersectRel; +import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.logical.LogicalIntersect; + +/** + * {@code ConverterRule} to replace {@code Intersect} with {@code BeamIntersectRel}. + */ +public class BeamIntersectRule extends ConverterRule { + public static final BeamIntersectRule INSTANCE = new BeamIntersectRule(); + private BeamIntersectRule() { + super(LogicalIntersect.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamIntersectRule"); + } + + @Override public RelNode convert(RelNode rel) { + Intersect intersect = (Intersect) rel; + final List<RelNode> inputs = intersect.getInputs(); + return new BeamIntersectRel( + intersect.getCluster(), + intersect.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convertList(inputs, BeamLogicalConvention.INSTANCE), + intersect.all + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java new file mode 100644 index 0000000..147932e --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rule; + +import org.apache.beam.sdk.extensions.sql.rel.BeamJoinRel; +import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.logical.LogicalJoin; + +/** + * {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}. + */ +public class BeamJoinRule extends ConverterRule { + public static final BeamJoinRule INSTANCE = new BeamJoinRule(); + private BeamJoinRule() { + super(LogicalJoin.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamJoinRule"); + } + + @Override public RelNode convert(RelNode rel) { + Join join = (Join) rel; + return new BeamJoinRel( + join.getCluster(), + join.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(join.getLeft(), + join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + convert(join.getRight(), + join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + join.getCondition(), + join.getVariablesSet(), + join.getJoinType() + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamMinusRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamMinusRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamMinusRule.java new file mode 100644 index 0000000..363cf3b --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamMinusRule.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rule; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention; +import org.apache.beam.sdk.extensions.sql.rel.BeamMinusRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Minus; +import org.apache.calcite.rel.logical.LogicalMinus; + +/** + * {@code ConverterRule} to replace {@code Minus} with {@code BeamMinusRel}. + */ +public class BeamMinusRule extends ConverterRule { + public static final BeamMinusRule INSTANCE = new BeamMinusRule(); + private BeamMinusRule() { + super(LogicalMinus.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamMinusRule"); + } + + @Override public RelNode convert(RelNode rel) { + Minus minus = (Minus) rel; + final List<RelNode> inputs = minus.getInputs(); + return new BeamMinusRel( + minus.getCluster(), + minus.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convertList(inputs, BeamLogicalConvention.INSTANCE), + minus.all + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamProjectRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamProjectRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamProjectRule.java new file mode 100644 index 0000000..4f2f8c9 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamProjectRule.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.rule; + +import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention; +import org.apache.beam.sdk.extensions.sql.rel.BeamProjectRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.logical.LogicalProject; + +/** + * A {@code ConverterRule} to replace {@link Project} with + * {@link BeamProjectRel}. + * + */ +public class BeamProjectRule extends ConverterRule { + public static final BeamProjectRule INSTANCE = new BeamProjectRule(); + + private BeamProjectRule() { + super(LogicalProject.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamProjectRule"); + } + + @Override + public RelNode convert(RelNode rel) { + final Project project = (Project) rel; + final RelNode input = project.getInput(); + + return new BeamProjectRel(project.getCluster(), + project.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + project.getProjects(), project.getRowType()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamSortRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamSortRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamSortRule.java new file mode 100644 index 0000000..e104d37 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamSortRule.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rule; + +import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention; +import org.apache.beam.sdk.extensions.sql.rel.BeamSortRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.logical.LogicalSort; + +/** + * {@code ConverterRule} to replace {@code Sort} with {@code BeamSortRel}. + */ +public class BeamSortRule extends ConverterRule { + public static final BeamSortRule INSTANCE = new BeamSortRule(); + private BeamSortRule() { + super(LogicalSort.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamSortRule"); + } + + @Override public RelNode convert(RelNode rel) { + Sort sort = (Sort) rel; + final RelNode input = sort.getInput(); + return new BeamSortRel( + sort.getCluster(), + sort.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), + sort.getCollation(), + sort.offset, + sort.fetch + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamUnionRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamUnionRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamUnionRule.java new file mode 100644 index 0000000..975ccbc --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamUnionRule.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rule; + +import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention; +import org.apache.beam.sdk.extensions.sql.rel.BeamUnionRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Union; +import org.apache.calcite.rel.logical.LogicalUnion; + +/** + * A {@code ConverterRule} to replace {@link org.apache.calcite.rel.core.Union} with + * {@link BeamUnionRule}. + */ +public class BeamUnionRule extends ConverterRule { + public static final BeamUnionRule INSTANCE = new BeamUnionRule(); + private BeamUnionRule() { + super(LogicalUnion.class, Convention.NONE, BeamLogicalConvention.INSTANCE, + "BeamUnionRule"); + } + + @Override public RelNode convert(RelNode rel) { + Union union = (Union) rel; + + return new BeamUnionRel( + union.getCluster(), + union.getTraitSet().replace(BeamLogicalConvention.INSTANCE), + convertList(union.getInputs(), BeamLogicalConvention.INSTANCE), + union.all + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamValuesRule.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamValuesRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamValuesRule.java new file mode 100644 index 0000000..86a8f72 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamValuesRule.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.rule; + +import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention; +import org.apache.beam.sdk.extensions.sql.rel.BeamValuesRel; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.apache.calcite.rel.core.Values; +import org.apache.calcite.rel.logical.LogicalValues; + +/** + * {@code ConverterRule} to replace {@code Values} with {@code BeamValuesRel}. + */ +public class BeamValuesRule extends ConverterRule { + public static final BeamValuesRule INSTANCE = new BeamValuesRule(); + private BeamValuesRule() { + super(LogicalValues.class, Convention.NONE, + BeamLogicalConvention.INSTANCE, "BeamValuesRule"); + } + + @Override public RelNode convert(RelNode rel) { + Values values = (Values) rel; + return new BeamValuesRel( + values.getCluster(), + values.getRowType(), + values.getTuples(), + values.getTraitSet().replace(BeamLogicalConvention.INSTANCE) + ); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/package-info.java new file mode 100644 index 0000000..f57cdee --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link org.apache.calcite.plan.RelOptRule} to generate + * {@link org.apache.beam.sdk.extensions.sql.rel.BeamRelNode}. + */ +package org.apache.beam.sdk.extensions.sql.rule; http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java new file mode 100644 index 0000000..bf41c95 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BaseBeamTable.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.schema; + +import java.io.Serializable; + +/** + * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}. + */ +public abstract class BaseBeamTable implements BeamSqlTable, Serializable { + protected BeamSqlRowType beamSqlRowType; + public BaseBeamTable(BeamSqlRowType beamSqlRowType) { + this.beamSqlRowType = beamSqlRowType; + } + + @Override public BeamSqlRowType getRowType() { + return beamSqlRowType; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java new file mode 100644 index 0000000..bda3ca1 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamIOType.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.schema; + +import java.io.Serializable; + +/** + * Type as a source IO, determined whether it's a STREAMING process, or batch + * process. + */ +public enum BeamIOType implements Serializable { + BOUNDED, UNBOUNDED; +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java new file mode 100644 index 0000000..5bbb8fd --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamPCollectionTable.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.schema; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PDone; + +/** + * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSqlRow>} as a virtual table, + * then a downstream query can query directly. + */ +public class BeamPCollectionTable extends BaseBeamTable { + private BeamIOType ioType; + private transient PCollection<BeamSqlRow> upstream; + + protected BeamPCollectionTable(BeamSqlRowType beamSqlRowType) { + super(beamSqlRowType); + } + + public BeamPCollectionTable(PCollection<BeamSqlRow> upstream, + BeamSqlRowType beamSqlRowType){ + this(beamSqlRowType); + ioType = upstream.isBounded().equals(IsBounded.BOUNDED) + ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED; + this.upstream = upstream; + } + + @Override + public BeamIOType getSourceType() { + return ioType; + } + + @Override + public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { + return upstream; + } + + @Override + public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { + throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as target"); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java new file mode 100644 index 0000000..616e9f3 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRow.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.schema; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.calcite.sql.type.SqlTypeName; +import org.joda.time.Instant; + +/** + * Represent a generic ROW record in Beam SQL. + * + */ +public class BeamSqlRow implements Serializable { + private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>(); + static { + SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class); + } + + private List<Integer> nullFields = new ArrayList<>(); + private List<Object> dataValues; + private BeamSqlRowType dataType; + + private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE)); + private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE)); + + public BeamSqlRow(BeamSqlRowType dataType) { + this.dataType = dataType; + this.dataValues = new ArrayList<>(); + for (int idx = 0; idx < dataType.size(); ++idx) { + dataValues.add(null); + nullFields.add(idx); + } + } + + public BeamSqlRow(BeamSqlRowType dataType, List<Object> dataValues) { + this(dataType); + for (int idx = 0; idx < dataValues.size(); ++idx) { + addField(idx, dataValues.get(idx)); + } + } + + public void updateWindowRange(BeamSqlRow upstreamRecord, BoundedWindow window){ + windowStart = upstreamRecord.windowStart; + windowEnd = upstreamRecord.windowEnd; + + if (window instanceof IntervalWindow) { + IntervalWindow iWindow = (IntervalWindow) window; + windowStart = iWindow.start(); + windowEnd = iWindow.end(); + } + } + + public void addField(String fieldName, Object fieldValue) { + addField(dataType.getFieldsName().indexOf(fieldName), fieldValue); + } + + public void addField(int index, Object fieldValue) { + if (fieldValue == null) { + return; + } else { + if (nullFields.contains(index)) { + nullFields.remove(nullFields.indexOf(index)); + } + } + + validateValueType(index, fieldValue); + dataValues.set(index, fieldValue); + } + + private void validateValueType(int index, Object fieldValue) { + SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, index); + Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(CalciteUtils.toJavaType(fieldType)); + if (javaClazz == null) { + throw new UnsupportedOperationException("Data type: " + fieldType + " not supported yet!"); + } + + if (!fieldValue.getClass().equals(javaClazz)) { + throw new IllegalArgumentException( + String.format("[%s](%s) doesn't match type [%s]", + fieldValue, fieldValue.getClass(), fieldType) + ); + } + } + + public Object getFieldValue(String fieldName) { + return getFieldValue(dataType.getFieldsName().indexOf(fieldName)); + } + + public byte getByte(String fieldName) { + return (Byte) getFieldValue(fieldName); + } + + public short getShort(String fieldName) { + return (Short) getFieldValue(fieldName); + } + + public int getInteger(String fieldName) { + return (Integer) getFieldValue(fieldName); + } + + public float getFloat(String fieldName) { + return (Float) getFieldValue(fieldName); + } + + public double getDouble(String fieldName) { + return (Double) getFieldValue(fieldName); + } + + public long getLong(String fieldName) { + return (Long) getFieldValue(fieldName); + } + + public String getString(String fieldName) { + return (String) getFieldValue(fieldName); + } + + public Date getDate(String fieldName) { + return (Date) getFieldValue(fieldName); + } + + public GregorianCalendar getGregorianCalendar(String fieldName) { + return (GregorianCalendar) getFieldValue(fieldName); + } + + public BigDecimal getBigDecimal(String fieldName) { + return (BigDecimal) getFieldValue(fieldName); + } + + public boolean getBoolean(String fieldName) { + return (boolean) getFieldValue(fieldName); + } + + public Object getFieldValue(int fieldIdx) { + if (nullFields.contains(fieldIdx)) { + return null; + } + + return dataValues.get(fieldIdx); + } + + public byte getByte(int idx) { + return (Byte) getFieldValue(idx); + } + + public short getShort(int idx) { + return (Short) getFieldValue(idx); + } + + public int getInteger(int idx) { + return (Integer) getFieldValue(idx); + } + + public float getFloat(int idx) { + return (Float) getFieldValue(idx); + } + + public double getDouble(int idx) { + return (Double) getFieldValue(idx); + } + + public long getLong(int idx) { + return (Long) getFieldValue(idx); + } + + public String getString(int idx) { + return (String) getFieldValue(idx); + } + + public Date getDate(int idx) { + return (Date) getFieldValue(idx); + } + + public GregorianCalendar getGregorianCalendar(int idx) { + return (GregorianCalendar) getFieldValue(idx); + } + + public BigDecimal getBigDecimal(int idx) { + return (BigDecimal) getFieldValue(idx); + } + + public boolean getBoolean(int idx) { + return (boolean) getFieldValue(idx); + } + + public int size() { + return dataValues.size(); + } + + public List<Object> getDataValues() { + return dataValues; + } + + public void setDataValues(List<Object> dataValues) { + this.dataValues = dataValues; + } + + public BeamSqlRowType getDataType() { + return dataType; + } + + public void setDataType(BeamSqlRowType dataType) { + this.dataType = dataType; + } + + public void setNullFields(List<Integer> nullFields) { + this.nullFields = nullFields; + } + + public List<Integer> getNullFields() { + return nullFields; + } + + /** + * is the specified field NULL? + */ + public boolean isNull(int idx) { + return nullFields.contains(idx); + } + + public Instant getWindowStart() { + return windowStart; + } + + public Instant getWindowEnd() { + return windowEnd; + } + + public void setWindowStart(Instant windowStart) { + this.windowStart = windowStart; + } + + public void setWindowEnd(Instant windowEnd) { + this.windowEnd = windowEnd; + } + + @Override + public String toString() { + return "BeamSqlRow [nullFields=" + nullFields + ", dataValues=" + dataValues + ", dataType=" + + dataType + ", windowStart=" + windowStart + ", windowEnd=" + windowEnd + "]"; + } + + /** + * Return data fields as key=value. + */ + public String valueInString() { + StringBuilder sb = new StringBuilder(); + for (int idx = 0; idx < size(); ++idx) { + sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx))); + } + return sb.substring(1); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + BeamSqlRow other = (BeamSqlRow) obj; + return toString().equals(other.toString()); + } + + @Override public int hashCode() { + return 31 * (31 * dataType.hashCode() + dataValues.hashCode()) + nullFields.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java new file mode 100644 index 0000000..39e2fd3 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowCoder.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.schema; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.List; +import org.apache.beam.sdk.coders.BigDecimalCoder; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.ByteCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.DoubleCoder; +import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; + +/** + * A {@link Coder} encodes {@link BeamSqlRow}. + */ +public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { + private BeamSqlRowType tableSchema; + + private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of()); + + private static final StringUtf8Coder stringCoder = StringUtf8Coder.of(); + private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of(); + private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of(); + private static final DoubleCoder doubleCoder = DoubleCoder.of(); + private static final InstantCoder instantCoder = InstantCoder.of(); + private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of(); + private static final ByteCoder byteCoder = ByteCoder.of(); + + public BeamSqlRowCoder(BeamSqlRowType tableSchema) { + this.tableSchema = tableSchema; + } + + @Override + public void encode(BeamSqlRow value, OutputStream outStream) throws CoderException, IOException { + listCoder.encode(value.getNullFields(), outStream); + for (int idx = 0; idx < value.size(); ++idx) { + if (value.getNullFields().contains(idx)) { + continue; + } + + switch (CalciteUtils.getFieldType(value.getDataType(), idx)) { + case INTEGER: + intCoder.encode(value.getInteger(idx), outStream); + break; + case SMALLINT: + intCoder.encode((int) value.getShort(idx), outStream); + break; + case TINYINT: + byteCoder.encode(value.getByte(idx), outStream); + break; + case DOUBLE: + doubleCoder.encode(value.getDouble(idx), outStream); + break; + case FLOAT: + doubleCoder.encode((double) value.getFloat(idx), outStream); + break; + case DECIMAL: + bigDecimalCoder.encode(value.getBigDecimal(idx), outStream); + break; + case BIGINT: + longCoder.encode(value.getLong(idx), outStream); + break; + case VARCHAR: + case CHAR: + stringCoder.encode(value.getString(idx), outStream); + break; + case TIME: + longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), outStream); + break; + case DATE: + case TIMESTAMP: + longCoder.encode(value.getDate(idx).getTime(), outStream); + break; + case BOOLEAN: + byteCoder.encode((byte) (value.getBoolean(idx) ? 1 : 0), outStream); + break; + + default: + throw new UnsupportedOperationException( + "Data type: " + value.getDataType().getFieldsType().get(idx) + " not supported yet!"); + } + } + + instantCoder.encode(value.getWindowStart(), outStream); + instantCoder.encode(value.getWindowEnd(), outStream); + } + + @Override + public BeamSqlRow decode(InputStream inStream) throws CoderException, IOException { + List<Integer> nullFields = listCoder.decode(inStream); + + BeamSqlRow record = new BeamSqlRow(tableSchema); + record.setNullFields(nullFields); + for (int idx = 0; idx < tableSchema.size(); ++idx) { + if (nullFields.contains(idx)) { + continue; + } + + switch (CalciteUtils.getFieldType(tableSchema, idx)) { + case INTEGER: + record.addField(idx, intCoder.decode(inStream)); + break; + case SMALLINT: + record.addField(idx, intCoder.decode(inStream).shortValue()); + break; + case TINYINT: + record.addField(idx, byteCoder.decode(inStream)); + break; + case DOUBLE: + record.addField(idx, doubleCoder.decode(inStream)); + break; + case FLOAT: + record.addField(idx, doubleCoder.decode(inStream).floatValue()); + break; + case BIGINT: + record.addField(idx, longCoder.decode(inStream)); + break; + case DECIMAL: + record.addField(idx, bigDecimalCoder.decode(inStream)); + break; + case VARCHAR: + case CHAR: + record.addField(idx, stringCoder.decode(inStream)); + break; + case TIME: + GregorianCalendar calendar = new GregorianCalendar(); + calendar.setTime(new Date(longCoder.decode(inStream))); + record.addField(idx, calendar); + break; + case DATE: + case TIMESTAMP: + record.addField(idx, new Date(longCoder.decode(inStream))); + break; + case BOOLEAN: + record.addField(idx, byteCoder.decode(inStream) == 1); + break; + + default: + throw new UnsupportedOperationException("Data type: " + + CalciteUtils.toCalciteType(tableSchema.getFieldsType().get(idx)) + + " not supported yet!"); + } + } + + record.setWindowStart(instantCoder.decode(inStream)); + record.setWindowEnd(instantCoder.decode(inStream)); + + return record; + } + + public BeamSqlRowType getTableSchema() { + return tableSchema; + } + + @Override + public void verifyDeterministic() + throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java new file mode 100644 index 0000000..018fe81 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlRowType.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.schema; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.List; + +/** + * Field type information in {@link BeamSqlRow}. + * + */ +@AutoValue +public abstract class BeamSqlRowType implements Serializable { + public abstract List<String> getFieldsName(); + public abstract List<Integer> getFieldsType(); + + public static BeamSqlRowType create(List<String> fieldNames, List<Integer> fieldTypes) { + return new AutoValue_BeamSqlRowType(fieldNames, fieldTypes); + } + + public int size() { + return getFieldsName().size(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java new file mode 100644 index 0000000..c179935 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlTable.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.schema; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +/** + * This interface defines a Beam Sql Table. + */ +public interface BeamSqlTable { + /** + * In Beam SQL, there's no difference between a batch query and a streaming + * query. {@link BeamIOType} is used to validate the sources. + */ + BeamIOType getSourceType(); + + /** + * create a {@code PCollection<BeamSqlRow>} from source. + * + */ + PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline); + + /** + * create a {@code IO.write()} instance to write to target. + * + */ + PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter(); + + /** + * Get the schema info of the table. + */ + BeamSqlRowType getRowType(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdaf.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdaf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdaf.java new file mode 100644 index 0000000..2f78586 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdaf.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.schema; + +import java.io.Serializable; +import java.lang.reflect.ParameterizedType; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.transforms.Combine.CombineFn; + +/** + * abstract class of aggregation functions in Beam SQL. + * + * <p>There're several constrains for a UDAF:<br> + * 1. A constructor with an empty argument list is required;<br> + * 2. The type of {@code InputT} and {@code OutputT} can only be Interger/Long/Short/Byte/Double + * /Float/Date/BigDecimal, mapping as SQL type INTEGER/BIGINT/SMALLINT/TINYINE/DOUBLE/FLOAT + * /TIMESTAMP/DECIMAL;<br> + * 3. Keep intermediate data in {@code AccumT}, and do not rely on elements in class;<br> + */ +public abstract class BeamSqlUdaf<InputT, AccumT, OutputT> implements Serializable { + public BeamSqlUdaf(){} + + /** + * create an initial aggregation object, equals to {@link CombineFn#createAccumulator()}. + */ + public abstract AccumT init(); + + /** + * add an input value, equals to {@link CombineFn#addInput(Object, Object)}. + */ + public abstract AccumT add(AccumT accumulator, InputT input); + + /** + * merge aggregation objects from parallel tasks, equals to + * {@link CombineFn#mergeAccumulators(Iterable)}. + */ + public abstract AccumT merge(Iterable<AccumT> accumulators); + + /** + * extract output value from aggregation object, equals to + * {@link CombineFn#extractOutput(Object)}. + */ + public abstract OutputT result(AccumT accumulator); + + /** + * get the coder for AccumT which stores the intermediate result. + * By default it's fetched from {@link CoderRegistry}. + */ + public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry) + throws CannotProvideCoderException { + return registry.getCoder( + (Class<AccumT>) ((ParameterizedType) getClass() + .getGenericSuperclass()).getActualTypeArguments()[1]); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java new file mode 100644 index 0000000..191b78e --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamSqlUdf.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.schema; + +import java.io.Serializable; + +/** + * Interface to create a UDF in Beam SQL. + * + * <p>A static method {@code eval} is required. Here is an example: + * + * <blockquote><pre> + * public static class MyLeftFunction { + * public String eval( + * @Parameter(name = "s") String s, + * @Parameter(name = "n", optional = true) Integer n) { + * return s.substring(0, n == null ? 1 : n); + * } + * }</pre></blockquote> + * + * <p>The first parameter is named "s" and is mandatory, + * and the second parameter is named "n" and is optional. + */ +public interface BeamSqlUdf extends Serializable { + String UDF_METHOD = "eval"; +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java new file mode 100644 index 0000000..53e8483 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.sql.schema; + +import java.io.IOException; +import java.io.StringReader; +import java.io.StringWriter; +import java.math.BigDecimal; +import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.NlsString; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVPrinter; +import org.apache.commons.csv.CSVRecord; + +/** + * Utility methods for working with {@code BeamTable}. + */ +public final class BeamTableUtils { + public static BeamSqlRow csvLine2BeamSqlRow( + CSVFormat csvFormat, + String line, + BeamSqlRowType beamSqlRowType) { + BeamSqlRow row = new BeamSqlRow(beamSqlRowType); + try (StringReader reader = new StringReader(line)) { + CSVParser parser = csvFormat.parse(reader); + CSVRecord rawRecord = parser.getRecords().get(0); + + if (rawRecord.size() != beamSqlRowType.size()) { + throw new IllegalArgumentException(String.format( + "Expect %d fields, but actually %d", + beamSqlRowType.size(), rawRecord.size() + )); + } else { + for (int idx = 0; idx < beamSqlRowType.size(); idx++) { + String raw = rawRecord.get(idx); + addFieldWithAutoTypeCasting(row, idx, raw); + } + } + } catch (IOException e) { + throw new IllegalArgumentException("decodeRecord failed!", e); + } + return row; + } + + public static String beamSqlRow2CsvLine(BeamSqlRow row, CSVFormat csvFormat) { + StringWriter writer = new StringWriter(); + try (CSVPrinter printer = csvFormat.print(writer)) { + for (int i = 0; i < row.size(); i++) { + printer.print(row.getFieldValue(i).toString()); + } + printer.println(); + } catch (IOException e) { + throw new IllegalArgumentException("encodeRecord failed!", e); + } + return writer.toString(); + } + + public static void addFieldWithAutoTypeCasting(BeamSqlRow row, int idx, Object rawObj) { + if (rawObj == null) { + row.addField(idx, null); + return; + } + + SqlTypeName columnType = CalciteUtils.getFieldType(row.getDataType(), idx); + // auto-casting for numberics + if ((rawObj instanceof String && SqlTypeName.NUMERIC_TYPES.contains(columnType)) + || (rawObj instanceof BigDecimal && columnType != SqlTypeName.DECIMAL)) { + String raw = rawObj.toString(); + switch (columnType) { + case TINYINT: + row.addField(idx, Byte.valueOf(raw)); + break; + case SMALLINT: + row.addField(idx, Short.valueOf(raw)); + break; + case INTEGER: + row.addField(idx, Integer.valueOf(raw)); + break; + case BIGINT: + row.addField(idx, Long.valueOf(raw)); + break; + case FLOAT: + row.addField(idx, Float.valueOf(raw)); + break; + case DOUBLE: + row.addField(idx, Double.valueOf(raw)); + break; + default: + throw new UnsupportedOperationException( + String.format("Column type %s is not supported yet!", columnType)); + } + } else if (SqlTypeName.CHAR_TYPES.contains(columnType)) { + // convert NlsString to String + if (rawObj instanceof NlsString) { + row.addField(idx, ((NlsString) rawObj).getValue()); + } else { + row.addField(idx, rawObj); + } + } else { + // keep the origin + row.addField(idx, rawObj); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java new file mode 100644 index 0000000..2a50947 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaCSVTable.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.schema.kafka; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.csv.CSVFormat; + +/** + * A Kafka topic that saves records as CSV format. + * + */ +public class BeamKafkaCSVTable extends BeamKafkaTable { + private CSVFormat csvFormat; + public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers, + List<String> topics) { + this(beamSqlRowType, bootstrapServers, topics, CSVFormat.DEFAULT); + } + + public BeamKafkaCSVTable(BeamSqlRowType beamSqlRowType, String bootstrapServers, + List<String> topics, CSVFormat format) { + super(beamSqlRowType, bootstrapServers, topics); + this.csvFormat = format; + } + + @Override + public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>> + getPTransformForInput() { + return new CsvRecorderDecoder(beamSqlRowType, csvFormat); + } + + @Override + public PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>> + getPTransformForOutput() { + return new CsvRecorderEncoder(beamSqlRowType, csvFormat); + } + + /** + * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamSqlRow}. + * + */ + public static class CsvRecorderDecoder + extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>> { + private BeamSqlRowType rowType; + private CSVFormat format; + public CsvRecorderDecoder(BeamSqlRowType rowType, CSVFormat format) { + this.rowType = rowType; + this.format = format; + } + + @Override + public PCollection<BeamSqlRow> expand(PCollection<KV<byte[], byte[]>> input) { + return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamSqlRow>() { + @ProcessElement + public void processElement(ProcessContext c) { + String rowInString = new String(c.element().getValue()); + c.output(BeamTableUtils.csvLine2BeamSqlRow(format, rowInString, rowType)); + } + })); + } + } + + /** + * A PTransform to convert {@link BeamSqlRow} to {@code KV<byte[], byte[]>}. + * + */ + public static class CsvRecorderEncoder + extends PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>> { + private BeamSqlRowType rowType; + private CSVFormat format; + public CsvRecorderEncoder(BeamSqlRowType rowType, CSVFormat format) { + this.rowType = rowType; + this.format = format; + } + + @Override + public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamSqlRow> input) { + return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSqlRow, KV<byte[], byte[]>>() { + @ProcessElement + public void processElement(ProcessContext c) { + BeamSqlRow in = c.element(); + c.output(KV.of(new byte[] {}, BeamTableUtils.beamSqlRow2CsvLine(in, format).getBytes())); + } + })); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java new file mode 100644 index 0000000..2cc664f --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/BeamKafkaTable.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.schema.kafka; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; +import org.apache.beam.sdk.extensions.sql.schema.BeamIOType; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; + +/** + * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to + * extend to convert between {@code BeamSqlRow} and {@code KV<byte[], byte[]>}. + * + */ +public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable { + + private String bootstrapServers; + private List<String> topics; + private Map<String, Object> configUpdates; + + protected BeamKafkaTable(BeamSqlRowType beamSqlRowType) { + super(beamSqlRowType); + } + + public BeamKafkaTable(BeamSqlRowType beamSqlRowType, String bootstrapServers, + List<String> topics) { + super(beamSqlRowType); + this.bootstrapServers = bootstrapServers; + this.topics = topics; + } + + public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) { + this.configUpdates = configUpdates; + return this; + } + + @Override + public BeamIOType getSourceType() { + return BeamIOType.UNBOUNDED; + } + + public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>> + getPTransformForInput(); + + public abstract PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>> + getPTransformForOutput(); + + @Override + public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { + return PBegin.in(pipeline).apply("read", + KafkaIO.<byte[], byte[]>read() + .withBootstrapServers(bootstrapServers) + .withTopics(topics) + .updateConsumerProperties(configUpdates) + .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) + .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of()) + .withoutMetadata()) + .apply("in_format", getPTransformForInput()); + } + + @Override + public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { + checkArgument(topics != null && topics.size() == 1, + "Only one topic can be acceptable as output."); + + return new PTransform<PCollection<BeamSqlRow>, PDone>() { + @Override + public PDone expand(PCollection<BeamSqlRow> input) { + return input.apply("out_reformat", getPTransformForOutput()).apply("persistent", + KafkaIO.<byte[], byte[]>write() + .withBootstrapServers(bootstrapServers) + .withTopic(topics.get(0)) + .withKeySerializer(ByteArraySerializer.class) + .withValueSerializer(ByteArraySerializer.class)); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java new file mode 100644 index 0000000..f0ddeb6 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/kafka/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * table schema for KafkaIO. + */ +package org.apache.beam.sdk.extensions.sql.schema.kafka;