Repository: beam Updated Branches: refs/heads/DSL_SQL a1f7cf6de -> d4d615a72
http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java index 41a786f..9ed56b4 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java @@ -18,8 +18,8 @@ package org.apache.beam.dsls.sql.schema.text; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.PTransform; @@ -46,13 +46,13 @@ public class BeamTextCSVTable extends BeamTextTable { /** * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format. */ - public BeamTextCSVTable(BeamSqlRecordType beamSqlRecordType, String filePattern) { - this(beamSqlRecordType, filePattern, CSVFormat.DEFAULT); + public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern) { + this(beamSqlRowType, filePattern, CSVFormat.DEFAULT); } - public BeamTextCSVTable(BeamSqlRecordType beamSqlRecordType, String filePattern, + public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern, CSVFormat csvFormat) { - super(beamSqlRecordType, filePattern); + super(beamSqlRowType, filePattern); this.csvFormat = csvFormat; } @@ -60,11 +60,11 @@ public class BeamTextCSVTable extends BeamTextTable { public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { return PBegin.in(pipeline).apply("decodeRecord", TextIO.read().from(filePattern)) .apply("parseCSVLine", - new BeamTextCSVTableIOReader(beamSqlRecordType, filePattern, csvFormat)); + new BeamTextCSVTableIOReader(beamSqlRowType, filePattern, csvFormat)); } @Override public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { - return new BeamTextCSVTableIOWriter(beamSqlRecordType, filePattern, csvFormat); + return new BeamTextCSVTableIOWriter(beamSqlRowType, filePattern, csvFormat); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java index ef0a465..874c3e4 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java @@ -21,9 +21,8 @@ package org.apache.beam.dsls.sql.schema.text; import static org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSqlRow; import java.io.Serializable; - -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -37,13 +36,13 @@ public class BeamTextCSVTableIOReader extends PTransform<PCollection<String>, PCollection<BeamSqlRow>> implements Serializable { private String filePattern; - protected BeamSqlRecordType beamSqlRecordType; + protected BeamSqlRowType beamSqlRowType; protected CSVFormat csvFormat; - public BeamTextCSVTableIOReader(BeamSqlRecordType beamSqlRecordType, String filePattern, + public BeamTextCSVTableIOReader(BeamSqlRowType beamSqlRowType, String filePattern, CSVFormat csvFormat) { this.filePattern = filePattern; - this.beamSqlRecordType = beamSqlRecordType; + this.beamSqlRowType = beamSqlRowType; this.csvFormat = csvFormat; } @@ -53,7 +52,7 @@ public class BeamTextCSVTableIOReader @ProcessElement public void processElement(ProcessContext ctx) { String str = ctx.element(); - ctx.output(csvLine2BeamSqlRow(csvFormat, str, beamSqlRecordType)); + ctx.output(csvLine2BeamSqlRow(csvFormat, str, beamSqlRowType)); } })); } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java index 35a546c..f61bb71 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java @@ -21,9 +21,8 @@ package org.apache.beam.dsls.sql.schema.text; import static org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSqlRow2CsvLine; import java.io.Serializable; - -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -38,13 +37,13 @@ import org.apache.commons.csv.CSVFormat; public class BeamTextCSVTableIOWriter extends PTransform<PCollection<BeamSqlRow>, PDone> implements Serializable { private String filePattern; - protected BeamSqlRecordType beamSqlRecordType; + protected BeamSqlRowType beamSqlRowType; protected CSVFormat csvFormat; - public BeamTextCSVTableIOWriter(BeamSqlRecordType beamSqlRecordType, String filePattern, + public BeamTextCSVTableIOWriter(BeamSqlRowType beamSqlRowType, String filePattern, CSVFormat csvFormat) { this.filePattern = filePattern; - this.beamSqlRecordType = beamSqlRecordType; + this.beamSqlRowType = beamSqlRowType; this.csvFormat = csvFormat; } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java index 525c210..6dc6cd0 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java @@ -22,7 +22,7 @@ import java.io.Serializable; import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.beam.dsls.sql.schema.BeamIOType; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; /** * {@code BeamTextTable} represents a text file/directory(backed by {@code TextIO}). @@ -30,8 +30,8 @@ import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; public abstract class BeamTextTable extends BaseBeamTable implements Serializable { protected String filePattern; - protected BeamTextTable(BeamSqlRecordType beamSqlRecordType, String filePattern) { - super(beamSqlRecordType); + protected BeamTextTable(BeamSqlRowType beamSqlRowType, String filePattern) { + super(beamSqlRowType); this.filePattern = filePattern; } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java index 34b169f..5b21765 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java @@ -27,8 +27,8 @@ import java.util.Iterator; import java.util.List; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.dsls.sql.schema.BeamSqlUdaf; import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.coders.BigDecimalCoder; @@ -57,13 +57,13 @@ public class BeamAggregationTransforms implements Serializable{ * Merge KV to single record. */ public static class MergeAggregationRecord extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> { - private BeamSqlRecordType outRecordType; + private BeamSqlRowType outRowType; private List<String> aggFieldNames; private int windowStartFieldIdx; - public MergeAggregationRecord(BeamSqlRecordType outRecordType, List<AggregateCall> aggList + public MergeAggregationRecord(BeamSqlRowType outRowType, List<AggregateCall> aggList , int windowStartFieldIdx) { - this.outRecordType = outRecordType; + this.outRowType = outRowType; this.aggFieldNames = new ArrayList<>(); for (AggregateCall ac : aggList) { aggFieldNames.add(ac.getName()); @@ -73,7 +73,7 @@ public class BeamAggregationTransforms implements Serializable{ @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) { - BeamSqlRow outRecord = new BeamSqlRow(outRecordType); + BeamSqlRow outRecord = new BeamSqlRow(outRowType); outRecord.updateWindowRange(c.element().getKey(), window); KV<BeamSqlRow, BeamSqlRow> kvRecord = c.element(); @@ -109,7 +109,7 @@ public class BeamAggregationTransforms implements Serializable{ @Override public BeamSqlRow apply(BeamSqlRow input) { - BeamSqlRecordType typeOfKey = exTypeOfKeyRecord(input.getDataType()); + BeamSqlRowType typeOfKey = exTypeOfKeyRecord(input.getDataType()); BeamSqlRow keyOfRecord = new BeamSqlRow(typeOfKey); keyOfRecord.updateWindowRange(input, null); @@ -119,14 +119,14 @@ public class BeamAggregationTransforms implements Serializable{ return keyOfRecord; } - private BeamSqlRecordType exTypeOfKeyRecord(BeamSqlRecordType dataType) { + private BeamSqlRowType exTypeOfKeyRecord(BeamSqlRowType dataType) { List<String> fieldNames = new ArrayList<>(); List<Integer> fieldTypes = new ArrayList<>(); for (int idx : groupByKeys) { fieldNames.add(dataType.getFieldsName().get(idx)); fieldTypes.add(dataType.getFieldsType().get(idx)); } - return BeamSqlRecordType.create(fieldNames, fieldTypes); + return BeamSqlRowType.create(fieldNames, fieldTypes); } } @@ -154,10 +154,10 @@ public class BeamAggregationTransforms implements Serializable{ extends CombineFn<BeamSqlRow, AggregationAccumulator, BeamSqlRow> { private List<BeamSqlUdaf> aggregators; private List<BeamSqlExpression> sourceFieldExps; - private BeamSqlRecordType finalRecordType; + private BeamSqlRowType finalRowType; public AggregationAdaptor(List<AggregateCall> aggregationCalls, - BeamSqlRecordType sourceRowRecordType) { + BeamSqlRowType sourceRowType) { aggregators = new ArrayList<>(); sourceFieldExps = new ArrayList<>(); List<String> outFieldsName = new ArrayList<>(); @@ -165,7 +165,7 @@ public class BeamAggregationTransforms implements Serializable{ for (AggregateCall call : aggregationCalls) { int refIndex = call.getArgList().size() > 0 ? call.getArgList().get(0) : 0; BeamSqlExpression sourceExp = new BeamSqlInputRefExpression( - CalciteUtils.getFieldType(sourceRowRecordType, refIndex), refIndex); + CalciteUtils.getFieldType(sourceRowType, refIndex), refIndex); sourceFieldExps.add(sourceExp); outFieldsName.add(call.name); @@ -206,7 +206,7 @@ public class BeamAggregationTransforms implements Serializable{ break; } } - finalRecordType = BeamSqlRecordType.create(outFieldsName, outFieldsType); + finalRowType = BeamSqlRowType.create(outFieldsName, outFieldsType); } @Override public AggregationAccumulator createAccumulator() { @@ -241,7 +241,7 @@ public class BeamAggregationTransforms implements Serializable{ } @Override public BeamSqlRow extractOutput(AggregationAccumulator accumulator) { - BeamSqlRow result = new BeamSqlRow(finalRecordType); + BeamSqlRow result = new BeamSqlRow(finalRowType); for (int idx = 0; idx < aggregators.size(); ++idx) { result.addField(idx, aggregators.get(idx).result(accumulator.accumulatorElements.get(idx))); } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java index 8169b83..9ea4376 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java @@ -22,8 +22,8 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; @@ -60,7 +60,7 @@ public class BeamJoinTransforms { ? input.getDataType().getFieldsType().get(joinColumns.get(i).getKey()) : input.getDataType().getFieldsType().get(joinColumns.get(i).getValue())); } - BeamSqlRecordType type = BeamSqlRecordType.create(names, types); + BeamSqlRowType type = BeamSqlRowType.create(names, types); // build the row BeamSqlRow row = new BeamSqlRow(type); @@ -149,7 +149,7 @@ public class BeamJoinTransforms { List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size()); types.addAll(leftRow.getDataType().getFieldsType()); types.addAll(rightRow.getDataType().getFieldsType()); - BeamSqlRecordType type = BeamSqlRecordType.create(names, types); + BeamSqlRowType type = BeamSqlRowType.create(names, types); BeamSqlRow row = new BeamSqlRow(type); // build the row http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java index 2a3357c..886ddcf 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java @@ -20,8 +20,8 @@ package org.apache.beam.dsls.sql.transform; import java.util.List; import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor; import org.apache.beam.dsls.sql.rel.BeamProjectRel; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.dsls.sql.schema.BeamTableUtils; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -34,14 +34,14 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> { private String stepName; private BeamSqlExpressionExecutor executor; - private BeamSqlRecordType outputRecordType; + private BeamSqlRowType outputRowType; public BeamSqlProjectFn(String stepName, BeamSqlExpressionExecutor executor, - BeamSqlRecordType outputRecordType) { + BeamSqlRowType outputRowType) { super(); this.stepName = stepName; this.executor = executor; - this.outputRecordType = outputRecordType; + this.outputRowType = outputRowType; } @Setup @@ -51,11 +51,11 @@ public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> { @ProcessElement public void processElement(ProcessContext c, BoundedWindow window) { - BeamSqlRow inputRecord = c.element(); - List<Object> results = executor.execute(inputRecord); + BeamSqlRow inputRow = c.element(); + List<Object> results = executor.execute(inputRow); - BeamSqlRow outRow = new BeamSqlRow(outputRecordType); - outRow.updateWindowRange(inputRecord, window); + BeamSqlRow outRow = new BeamSqlRow(outputRowType); + outRow.updateWindowRange(inputRow, window); for (int idx = 0; idx < results.size(); ++idx) { BeamTableUtils.addFieldWithAutoTypeCasting(outRow, idx, results.get(idx)); http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java index 919ae5f..4b8696b 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java @@ -23,7 +23,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; @@ -78,27 +78,27 @@ public class CalciteUtils { /** * Get the {@code SqlTypeName} for the specified column of a table. */ - public static SqlTypeName getFieldType(BeamSqlRecordType schema, int index) { + public static SqlTypeName getFieldType(BeamSqlRowType schema, int index) { return toCalciteType(schema.getFieldsType().get(index)); } /** - * Generate {@code BeamSqlRecordType} from {@code RelDataType} which is used to create table. + * Generate {@code BeamSqlRowType} from {@code RelDataType} which is used to create table. */ - public static BeamSqlRecordType toBeamRecordType(RelDataType tableInfo) { + public static BeamSqlRowType toBeamRowType(RelDataType tableInfo) { List<String> fieldNames = new ArrayList<>(); List<Integer> fieldTypes = new ArrayList<>(); for (RelDataTypeField f : tableInfo.getFieldList()) { fieldNames.add(f.getName()); fieldTypes.add(toJavaType(f.getType().getSqlTypeName())); } - return BeamSqlRecordType.create(fieldNames, fieldTypes); + return BeamSqlRowType.create(fieldNames, fieldTypes); } /** * Create an instance of {@code RelDataType} so it can be used to create a table. */ - public static RelProtoDataType toCalciteRecordType(final BeamSqlRecordType that) { + public static RelProtoDataType toCalciteRowType(final BeamSqlRowType that) { return new RelProtoDataType() { @Override public RelDataType apply(RelDataTypeFactory a) { http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java index 471a856..a142514 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java @@ -19,8 +19,8 @@ package org.apache.beam.dsls.sql; import java.sql.Types; import java.util.Arrays; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -55,7 +55,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { PCollection<BeamSqlRow> result = input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"), + BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "size"), Arrays.asList(Types.INTEGER, Types.BIGINT)); BeamSqlRow record = new BeamSqlRow(resultType); @@ -98,7 +98,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) .apply("testAggregationFunctions", BeamSql.query(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create( + BeamSqlRowType resultType = BeamSqlRowType.create( Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", "sum2", "avg2", "max2", "min2", "sum3", "avg3", "max3", "min3", "sum4", "avg4", "max4", "min4", "sum5", "avg5", "max5", "min5", "max6", "min6"), @@ -167,7 +167,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { PCollection<BeamSqlRow> result = input.apply("testDistinct", BeamSql.simpleQuery(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), + BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); BeamSqlRow record1 = new BeamSqlRow(resultType); @@ -216,7 +216,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) .apply("testTumbleWindow", BeamSql.query(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create( + BeamSqlRowType resultType = BeamSqlRowType.create( Arrays.asList("f_int2", "size", "window_start"), Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); @@ -263,7 +263,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { PCollection<BeamSqlRow> result = input.apply("testHopWindow", BeamSql.simpleQuery(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create( + BeamSqlRowType resultType = BeamSqlRowType.create( Arrays.asList("f_int2", "size", "window_start"), Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); @@ -325,7 +325,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) .apply("testSessionWindow", BeamSql.query(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create( + BeamSqlRowType resultType = BeamSqlRowType.create( Arrays.asList("f_int2", "size", "window_start"), Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java index 57fcbc3..24f1a0a 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java @@ -24,9 +24,9 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Create; @@ -52,7 +52,7 @@ public class BeamSqlDslBase { @Rule public ExpectedException exceptions = ExpectedException.none(); - public static BeamSqlRecordType recordTypeInTableA; + public static BeamSqlRowType rowTypeInTableA; public static List<BeamSqlRow> recordsInTableA; //bounded PCollections @@ -65,22 +65,22 @@ public class BeamSqlDslBase { @BeforeClass public static void prepareClass() throws ParseException { - recordTypeInTableA = BeamSqlRecordType.create( + rowTypeInTableA = BeamSqlRowType.create( Arrays.asList("f_int", "f_long", "f_short", "f_byte", "f_float", "f_double", "f_string", "f_timestamp", "f_int2"), Arrays.asList(Types.INTEGER, Types.BIGINT, Types.SMALLINT, Types.TINYINT, Types.FLOAT, Types.DOUBLE, Types.VARCHAR, Types.TIMESTAMP, Types.INTEGER)); - recordsInTableA = prepareInputRecordsInTableA(); + recordsInTableA = prepareInputRowsInTableA(); } @Before public void preparePCollections(){ boundedInput1 = PBegin.in(pipeline).apply("boundedInput1", - Create.of(recordsInTableA).withCoder(new BeamSqlRowCoder(recordTypeInTableA))); + Create.of(recordsInTableA).withCoder(new BeamSqlRowCoder(rowTypeInTableA))); boundedInput2 = PBegin.in(pipeline).apply("boundedInput2", - Create.of(recordsInTableA.get(0)).withCoder(new BeamSqlRowCoder(recordTypeInTableA))); + Create.of(recordsInTableA.get(0)).withCoder(new BeamSqlRowCoder(rowTypeInTableA))); unboundedInput1 = prepareUnboundedPCollection1(); unboundedInput2 = prepareUnboundedPCollection2(); @@ -88,7 +88,7 @@ public class BeamSqlDslBase { private PCollection<BeamSqlRow> prepareUnboundedPCollection1() { TestStream.Builder<BeamSqlRow> values = TestStream - .create(new BeamSqlRowCoder(recordTypeInTableA)); + .create(new BeamSqlRowCoder(rowTypeInTableA)); for (BeamSqlRow row : recordsInTableA) { values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp"))); @@ -100,7 +100,7 @@ public class BeamSqlDslBase { private PCollection<BeamSqlRow> prepareUnboundedPCollection2() { TestStream.Builder<BeamSqlRow> values = TestStream - .create(new BeamSqlRowCoder(recordTypeInTableA)); + .create(new BeamSqlRowCoder(rowTypeInTableA)); BeamSqlRow row = recordsInTableA.get(0); values = values.advanceWatermarkTo(new Instant(row.getDate("f_timestamp"))); @@ -109,10 +109,10 @@ public class BeamSqlDslBase { return PBegin.in(pipeline).apply("unboundedInput2", values.advanceWatermarkToInfinity()); } - private static List<BeamSqlRow> prepareInputRecordsInTableA() throws ParseException{ + private static List<BeamSqlRow> prepareInputRowsInTableA() throws ParseException{ List<BeamSqlRow> rows = new ArrayList<>(); - BeamSqlRow row1 = new BeamSqlRow(recordTypeInTableA); + BeamSqlRow row1 = new BeamSqlRow(rowTypeInTableA); row1.addField(0, 1); row1.addField(1, 1000L); row1.addField(2, Short.valueOf("1")); @@ -124,7 +124,7 @@ public class BeamSqlDslBase { row1.addField(8, 0); rows.add(row1); - BeamSqlRow row2 = new BeamSqlRow(recordTypeInTableA); + BeamSqlRow row2 = new BeamSqlRow(rowTypeInTableA); row2.addField(0, 2); row2.addField(1, 2000L); row2.addField(2, Short.valueOf("2")); @@ -136,7 +136,7 @@ public class BeamSqlDslBase { row2.addField(8, 0); rows.add(row2); - BeamSqlRow row3 = new BeamSqlRow(recordTypeInTableA); + BeamSqlRow row3 = new BeamSqlRow(rowTypeInTableA); row3.addField(0, 3); row3.addField(1, 3000L); row3.addField(2, Short.valueOf("3")); @@ -148,7 +148,7 @@ public class BeamSqlDslBase { row3.addField(8, 0); rows.add(row3); - BeamSqlRow row4 = new BeamSqlRow(recordTypeInTableA); + BeamSqlRow row4 = new BeamSqlRow(rowTypeInTableA); row4.addField(0, 4); row4.addField(1, 4000L); row4.addField(2, Short.valueOf("4")); http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java index ae5f4e5..e010915 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java @@ -23,9 +23,9 @@ import static org.apache.beam.dsls.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER import java.sql.Types; import java.util.Arrays; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; @@ -41,8 +41,8 @@ public class BeamSqlDslJoinTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); - private static final BeamSqlRecordType SOURCE_RECORD_TYPE = - BeamSqlRecordType.create( + private static final BeamSqlRowType SOURCE_RECORD_TYPE = + BeamSqlRowType.create( Arrays.asList( "order_id", "site_id", "price" ), @@ -54,8 +54,8 @@ public class BeamSqlDslJoinTest { private static final BeamSqlRowCoder SOURCE_CODER = new BeamSqlRowCoder(SOURCE_RECORD_TYPE); - private static final BeamSqlRecordType RESULT_RECORD_TYPE = - BeamSqlRecordType.create( + private static final BeamSqlRowType RESULT_RECORD_TYPE = + BeamSqlRowType.create( Arrays.asList( "order_id", "site_id", "price", "order_id0", "site_id0", "price0" ), http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java index 10f61b0..ab5a639 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java @@ -19,8 +19,8 @@ package org.apache.beam.dsls.sql; import java.sql.Types; import java.util.Arrays; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -81,7 +81,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) .apply("testPartialFields", BeamSql.query(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), + BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); BeamSqlRow record = new BeamSqlRow(resultType); @@ -116,7 +116,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) .apply("testPartialFieldsInMultipleRow", BeamSql.query(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), + BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); BeamSqlRow record1 = new BeamSqlRow(resultType); @@ -163,7 +163,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) .apply("testPartialFieldsInRows", BeamSql.query(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"), + BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "f_long"), Arrays.asList(Types.INTEGER, Types.BIGINT)); BeamSqlRow record1 = new BeamSqlRow(resultType); @@ -210,7 +210,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase { PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input) .apply("testLiteralField", BeamSql.query(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("literal_field"), + BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("literal_field"), Arrays.asList(Types.INTEGER)); BeamSqlRow record = new BeamSqlRow(resultType); http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java index 332a273..726f658 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java @@ -20,8 +20,8 @@ package org.apache.beam.dsls.sql; import java.sql.Types; import java.util.Arrays; import java.util.Iterator; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.dsls.sql.schema.BeamSqlUdaf; import org.apache.beam.dsls.sql.schema.BeamSqlUdf; import org.apache.beam.sdk.testing.PAssert; @@ -39,7 +39,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { */ @Test public void testUdaf() throws Exception { - BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "squaresum"), + BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", "squaresum"), Arrays.asList(Types.INTEGER, Types.INTEGER)); BeamSqlRow record = new BeamSqlRow(resultType); @@ -69,7 +69,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { */ @Test public void testUdf() throws Exception{ - BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int", "cubicvalue"), + BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", "cubicvalue"), Arrays.asList(Types.INTEGER, Types.INTEGER)); BeamSqlRow record = new BeamSqlRow(resultType); http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java index 8c0a28d..a669635 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java @@ -21,8 +21,8 @@ package org.apache.beam.dsls.sql; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.sdk.transforms.DoFn; /** @@ -69,7 +69,7 @@ public class TestUtils { * {@code} */ public static class RowsBuilder { - private BeamSqlRecordType type; + private BeamSqlRowType type; private List<BeamSqlRow> rows = new ArrayList<>(); /** @@ -86,9 +86,9 @@ public class TestUtils { * @args pairs of column type and column names. */ public static RowsBuilder of(final Object... args) { - BeamSqlRecordType beamSQLRecordType = buildBeamSqlRecordType(args); + BeamSqlRowType beamSQLRowType = buildBeamSqlRowType(args); RowsBuilder builder = new RowsBuilder(); - builder.type = beamSQLRecordType; + builder.type = beamSQLRowType; return builder; } @@ -99,13 +99,13 @@ public class TestUtils { * <p>For example: * <pre>{@code * TestUtils.RowsBuilder.of( - * beamSqlRecordType + * beamSqlRowType * )}</pre> - * @beamSQLRecordType the record type. + * @beamSQLRowType the record type. */ - public static RowsBuilder of(final BeamSqlRecordType beamSQLRecordType) { + public static RowsBuilder of(final BeamSqlRowType beamSQLRowType) { RowsBuilder builder = new RowsBuilder(); - builder.type = beamSQLRecordType; + builder.type = beamSQLRowType; return builder; } @@ -140,12 +140,12 @@ public class TestUtils { } /** - * Convenient way to build a {@code BeamSqlRecordType}. + * Convenient way to build a {@code BeamSqlRowType}. * * <p>e.g. * * <pre>{@code - * buildBeamSqlRecordType( + * buildBeamSqlRowType( * Types.BIGINT, "order_id", * Types.INTEGER, "site_id", * Types.DOUBLE, "price", @@ -153,7 +153,7 @@ public class TestUtils { * ) * }</pre> */ - public static BeamSqlRecordType buildBeamSqlRecordType(Object... args) { + public static BeamSqlRowType buildBeamSqlRowType(Object... args) { List<Integer> types = new ArrayList<>(); List<String> names = new ArrayList<>(); @@ -162,7 +162,7 @@ public class TestUtils { names.add((String) args[i + 1]); } - return BeamSqlRecordType.create(names, types); + return BeamSqlRowType.create(names, types); } /** @@ -172,14 +172,14 @@ public class TestUtils { * * <pre>{@code * buildRows( - * recordType, + * rowType, * 1, 1, 1, // the first row * 2, 2, 2, // the second row * ... * ) * }</pre> */ - public static List<BeamSqlRow> buildRows(BeamSqlRecordType type, List args) { + public static List<BeamSqlRow> buildRows(BeamSqlRowType type, List args) { List<BeamSqlRow> rows = new ArrayList<>(); int fieldCount = type.size(); http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java index ddbc3d8..b9ce9b4 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java @@ -32,9 +32,9 @@ import java.util.TimeZone; import org.apache.beam.dsls.sql.BeamSql; import org.apache.beam.dsls.sql.TestUtils; import org.apache.beam.dsls.sql.mock.MockedBoundedTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; @@ -63,7 +63,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase { public final TestPipeline pipeline = TestPipeline.create(); protected PCollection<BeamSqlRow> getTestPCollection() { - BeamSqlRecordType type = BeamSqlRecordType.create( + BeamSqlRowType type = BeamSqlRowType.create( Arrays.asList("ts", "c_tinyint", "c_smallint", "c_integer", "c_bigint", "c_float", "c_double", "c_decimal", "c_tinyint_max", "c_smallint_max", "c_integer_max", "c_bigint_max"), @@ -156,7 +156,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase { PCollection<BeamSqlRow> rows = inputCollection.apply(BeamSql.simpleQuery(getSql())); PAssert.that(rows).containsInAnyOrder( TestUtils.RowsBuilder - .of(BeamSqlRecordType.create(names, types)) + .of(BeamSqlRowType.create(names, types)) .addRows(values) .getRows() ); http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java index 5afd273..d7b54c7 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java @@ -23,8 +23,8 @@ import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression; import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; import org.apache.beam.dsls.sql.planner.BeamRelDataTypeSystem; import org.apache.beam.dsls.sql.planner.BeamRuleSets; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.config.Lex; @@ -57,7 +57,7 @@ public class BeamSqlFnExecutorTestBase { RelDataTypeSystem.DEFAULT); public static RelDataType relDataType; - public static BeamSqlRecordType beamRecordType; + public static BeamSqlRowType beamRowType; public static BeamSqlRow record; public static RelBuilder relBuilder; @@ -70,8 +70,8 @@ public class BeamSqlFnExecutorTestBase { .add("price", SqlTypeName.DOUBLE) .add("order_time", SqlTypeName.BIGINT).build(); - beamRecordType = CalciteUtils.toBeamRecordType(relDataType); - record = new BeamSqlRow(beamRecordType); + beamRowType = CalciteUtils.toBeamRowType(relDataType); + record = new BeamSqlRow(beamRowType); record.addField(0, 1234567L); record.addField(1, 0); http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java index 84f49a9..6c1dcb2 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java @@ -17,7 +17,7 @@ */ package org.apache.beam.dsls.sql.mock; -import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRecordType; +import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRowType; import static org.apache.beam.dsls.sql.TestUtils.buildRows; import java.util.ArrayList; @@ -25,8 +25,8 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.beam.dsls.sql.schema.BeamIOType; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -45,8 +45,8 @@ public class MockedBoundedTable extends MockedTable { /** rows flow out from this table. */ private final List<BeamSqlRow> rows = new ArrayList<>(); - public MockedBoundedTable(BeamSqlRecordType beamSqlRecordType) { - super(beamSqlRecordType); + public MockedBoundedTable(BeamSqlRowType beamSqlRowType) { + super(beamSqlRowType); } /** @@ -63,13 +63,13 @@ public class MockedBoundedTable extends MockedTable { * }</pre> */ public static MockedBoundedTable of(final Object... args){ - return new MockedBoundedTable(buildBeamSqlRecordType(args)); + return new MockedBoundedTable(buildBeamSqlRowType(args)); } /** * Build a mocked bounded table with the specified type. */ - public static MockedBoundedTable of(final BeamSqlRecordType type) { + public static MockedBoundedTable of(final BeamSqlRowType type) { return new MockedBoundedTable(type); } @@ -88,7 +88,7 @@ public class MockedBoundedTable extends MockedTable { * }</pre> */ public MockedBoundedTable addRows(Object... args) { - List<BeamSqlRow> rows = buildRows(getRecordType(), Arrays.asList(args)); + List<BeamSqlRow> rows = buildRows(getRowType(), Arrays.asList(args)); this.rows.addAll(rows); return this; } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java index eed740a..858ae88 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java @@ -20,8 +20,8 @@ package org.apache.beam.dsls.sql.mock; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.dsls.sql.schema.BaseBeamTable; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -31,8 +31,8 @@ import org.apache.beam.sdk.values.PDone; */ public abstract class MockedTable extends BaseBeamTable { public static final AtomicInteger COUNTER = new AtomicInteger(); - public MockedTable(BeamSqlRecordType beamSqlRecordType) { - super(beamSqlRecordType); + public MockedTable(BeamSqlRowType beamSqlRowType) { + super(beamSqlRowType); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java index 0f8c912..ee6eb22 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java @@ -18,16 +18,16 @@ package org.apache.beam.dsls.sql.mock; -import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRecordType; +import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRowType; import static org.apache.beam.dsls.sql.TestUtils.buildRows; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.beam.dsls.sql.schema.BeamIOType; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.values.PCollection; @@ -44,8 +44,8 @@ public class MockedUnboundedTable extends MockedTable { private final List<Pair<Duration, List<BeamSqlRow>>> timestampedRows = new ArrayList<>(); /** specify the index of column in the row which stands for the event time field. */ private int timestampField; - private MockedUnboundedTable(BeamSqlRecordType beamSqlRecordType) { - super(beamSqlRecordType); + private MockedUnboundedTable(BeamSqlRowType beamSqlRowType) { + super(beamSqlRowType); } /** @@ -62,7 +62,7 @@ public class MockedUnboundedTable extends MockedTable { * }</pre> */ public static MockedUnboundedTable of(final Object... args){ - return new MockedUnboundedTable(buildBeamSqlRecordType(args)); + return new MockedUnboundedTable(buildBeamSqlRowType(args)); } public MockedUnboundedTable timestampColumnIndex(int idx) { @@ -85,7 +85,7 @@ public class MockedUnboundedTable extends MockedTable { * }</pre> */ public MockedUnboundedTable addRows(Duration duration, Object... args) { - List<BeamSqlRow> rows = buildRows(getRecordType(), Arrays.asList(args)); + List<BeamSqlRow> rows = buildRows(getRowType(), Arrays.asList(args)); // record the watermark + rows this.timestampedRows.add(Pair.of(duration, rows)); return this; @@ -97,7 +97,7 @@ public class MockedUnboundedTable extends MockedTable { @Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { TestStream.Builder<BeamSqlRow> values = TestStream.create( - new BeamSqlRowCoder(beamSqlRecordType)); + new BeamSqlRowCoder(beamSqlRowType)); for (Pair<Duration, List<BeamSqlRow>> pair : timestampedRows) { values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey())); http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java index cf1d714..e41e341 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java @@ -58,10 +58,10 @@ public class BeamSqlRowCoderTest { } }; - BeamSqlRecordType beamSQLRecordType = CalciteUtils.toBeamRecordType( + BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType( protoRowType.apply(new JavaTypeFactoryImpl( RelDataTypeSystem.DEFAULT))); - BeamSqlRow row = new BeamSqlRow(beamSQLRecordType); + BeamSqlRow row = new BeamSqlRow(beamSQLRowType); row.addField("col_tinyint", Byte.valueOf("1")); row.addField("col_smallint", Short.valueOf("1")); row.addField("col_integer", 1); @@ -77,7 +77,7 @@ public class BeamSqlRowCoderTest { row.addField("col_boolean", true); - BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRecordType); + BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRowType); CoderProperties.coderDecodeEncodeEqual(coder, row); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java index 9cd0915..01cd960 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java @@ -19,10 +19,9 @@ package org.apache.beam.dsls.sql.schema.kafka; import java.io.Serializable; - import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -91,8 +90,8 @@ public class BeamKafkaCSVTableTest { pipeline.run(); } - private static BeamSqlRecordType genRowType() { - return CalciteUtils.toBeamRecordType(new RelProtoDataType() { + private static BeamSqlRowType genRowType() { + return CalciteUtils.toBeamRowType(new RelProtoDataType() { @Override public RelDataType apply(RelDataTypeFactory a0) { return a0.builder().add("order_id", SqlTypeName.BIGINT) http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java index 176df46..b6e11e5 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java @@ -31,10 +31,9 @@ import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Arrays; import java.util.List; - import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -81,20 +80,20 @@ public class BeamTextCSVTableTest { private static File writerTargetFile; @Test public void testBuildIOReader() { - PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRecordType(), + PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRowType(), readerSourceFile.getAbsolutePath()).buildIOReader(pipeline); PAssert.that(rows).containsInAnyOrder(testDataRows); pipeline.run(); } @Test public void testBuildIOWriter() { - new BeamTextCSVTable(buildBeamSqlRecordType(), + new BeamTextCSVTable(buildBeamSqlRowType(), readerSourceFile.getAbsolutePath()).buildIOReader(pipeline) - .apply(new BeamTextCSVTable(buildBeamSqlRecordType(), writerTargetFile.getAbsolutePath()) + .apply(new BeamTextCSVTable(buildBeamSqlRowType(), writerTargetFile.getAbsolutePath()) .buildIOWriter()); pipeline.run(); - PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRecordType(), + PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRowType(), writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2); // confirm the two reads match @@ -167,11 +166,11 @@ public class BeamTextCSVTableTest { .add("amount", SqlTypeName.DOUBLE).add("user_name", SqlTypeName.VARCHAR).build(); } - private static BeamSqlRecordType buildBeamSqlRecordType() { - return CalciteUtils.toBeamRecordType(buildRelDataType()); + private static BeamSqlRowType buildBeamSqlRowType() { + return CalciteUtils.toBeamRowType(buildRelDataType()); } private static BeamSqlRow buildRow(Object[] data) { - return new BeamSqlRow(buildBeamSqlRecordType(), Arrays.asList(data)); + return new BeamSqlRow(buildBeamSqlRowType(), Arrays.asList(data)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java index a0fed22..5d5d4fc 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java @@ -21,11 +21,10 @@ import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; - import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms; import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.coders.IterableCoder; @@ -64,9 +63,9 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ private List<AggregateCall> aggCalls; - private BeamSqlRecordType keyType; - private BeamSqlRecordType aggPartType; - private BeamSqlRecordType outputType; + private BeamSqlRowType keyType; + private BeamSqlRowType aggPartType; + private BeamSqlRowType outputType; private BeamSqlRowCoder inRecordCoder; private BeamSqlRowCoder keyCoder; @@ -405,7 +404,7 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ /** * Row type of final output row. */ - private BeamSqlRecordType prepareFinalRowType() { + private BeamSqlRowType prepareFinalRowType() { FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder(); List<KV<String, SqlTypeName>> columnMetadata = Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT), @@ -433,7 +432,7 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ for (KV<String, SqlTypeName> cm : columnMetadata) { builder.add(cm.getKey(), cm.getValue()); } - return CalciteUtils.toBeamRecordType(builder.build()); + return CalciteUtils.toBeamRowType(builder.build()); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java index 2e91405..4045bc8 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java @@ -23,8 +23,8 @@ import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.List; import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; -import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRowType; import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.values.KV; import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder; @@ -38,7 +38,7 @@ import org.junit.BeforeClass; public class BeamTransformBaseTest { public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - public static BeamSqlRecordType inputRowType; + public static BeamSqlRowType inputRowType; public static List<BeamSqlRow> inputRows; @BeforeClass @@ -66,14 +66,14 @@ public class BeamTransformBaseTest { } /** - * create a {@code BeamSqlRecordType} for given column metadata. + * create a {@code BeamSqlRowType} for given column metadata. */ - public static BeamSqlRecordType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){ + public static BeamSqlRowType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){ FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder(); for (KV<String, SqlTypeName> cm : columnMetadata) { builder.add(cm.getKey(), cm.getValue()); } - return CalciteUtils.toBeamRecordType(builder.build()); + return CalciteUtils.toBeamRowType(builder.build()); } /** @@ -89,7 +89,7 @@ public class BeamTransformBaseTest { */ public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata, List<Object> rowValues){ - BeamSqlRecordType rowType = initTypeOfSqlRow(columnMetadata); + BeamSqlRowType rowType = initTypeOfSqlRow(columnMetadata); return new BeamSqlRow(rowType, rowValues); }