Repository: beam
Updated Branches:
  refs/heads/DSL_SQL a1f7cf6de -> d4d615a72


http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
index 41a786f..9ed56b4 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
@@ -18,8 +18,8 @@
 
 package org.apache.beam.dsls.sql.schema.text;
 
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -46,13 +46,13 @@ public class BeamTextCSVTable extends BeamTextTable {
   /**
    * CSV table with {@link CSVFormat#DEFAULT DEFAULT} format.
    */
-  public BeamTextCSVTable(BeamSqlRecordType beamSqlRecordType, String 
filePattern)  {
-    this(beamSqlRecordType, filePattern, CSVFormat.DEFAULT);
+  public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern)  {
+    this(beamSqlRowType, filePattern, CSVFormat.DEFAULT);
   }
 
-  public BeamTextCSVTable(BeamSqlRecordType beamSqlRecordType, String 
filePattern,
+  public BeamTextCSVTable(BeamSqlRowType beamSqlRowType, String filePattern,
       CSVFormat csvFormat) {
-    super(beamSqlRecordType, filePattern);
+    super(beamSqlRowType, filePattern);
     this.csvFormat = csvFormat;
   }
 
@@ -60,11 +60,11 @@ public class BeamTextCSVTable extends BeamTextTable {
   public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
     return PBegin.in(pipeline).apply("decodeRecord", 
TextIO.read().from(filePattern))
         .apply("parseCSVLine",
-            new BeamTextCSVTableIOReader(beamSqlRecordType, filePattern, 
csvFormat));
+            new BeamTextCSVTableIOReader(beamSqlRowType, filePattern, 
csvFormat));
   }
 
   @Override
   public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
-    return new BeamTextCSVTableIOWriter(beamSqlRecordType, filePattern, 
csvFormat);
+    return new BeamTextCSVTableIOWriter(beamSqlRowType, filePattern, 
csvFormat);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
index ef0a465..874c3e4 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
@@ -21,9 +21,8 @@ package org.apache.beam.dsls.sql.schema.text;
 import static 
org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSqlRow;
 
 import java.io.Serializable;
-
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -37,13 +36,13 @@ public class BeamTextCSVTableIOReader
     extends PTransform<PCollection<String>, PCollection<BeamSqlRow>>
     implements Serializable {
   private String filePattern;
-  protected BeamSqlRecordType beamSqlRecordType;
+  protected BeamSqlRowType beamSqlRowType;
   protected CSVFormat csvFormat;
 
-  public BeamTextCSVTableIOReader(BeamSqlRecordType beamSqlRecordType, String 
filePattern,
+  public BeamTextCSVTableIOReader(BeamSqlRowType beamSqlRowType, String 
filePattern,
       CSVFormat csvFormat) {
     this.filePattern = filePattern;
-    this.beamSqlRecordType = beamSqlRecordType;
+    this.beamSqlRowType = beamSqlRowType;
     this.csvFormat = csvFormat;
   }
 
@@ -53,7 +52,7 @@ public class BeamTextCSVTableIOReader
           @ProcessElement
           public void processElement(ProcessContext ctx) {
             String str = ctx.element();
-            ctx.output(csvLine2BeamSqlRow(csvFormat, str, beamSqlRecordType));
+            ctx.output(csvLine2BeamSqlRow(csvFormat, str, beamSqlRowType));
           }
         }));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
index 35a546c..f61bb71 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
@@ -21,9 +21,8 @@ package org.apache.beam.dsls.sql.schema.text;
 import static 
org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSqlRow2CsvLine;
 
 import java.io.Serializable;
-
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -38,13 +37,13 @@ import org.apache.commons.csv.CSVFormat;
 public class BeamTextCSVTableIOWriter extends 
PTransform<PCollection<BeamSqlRow>, PDone>
     implements Serializable {
   private String filePattern;
-  protected BeamSqlRecordType beamSqlRecordType;
+  protected BeamSqlRowType beamSqlRowType;
   protected CSVFormat csvFormat;
 
-  public BeamTextCSVTableIOWriter(BeamSqlRecordType beamSqlRecordType, String 
filePattern,
+  public BeamTextCSVTableIOWriter(BeamSqlRowType beamSqlRowType, String 
filePattern,
       CSVFormat csvFormat) {
     this.filePattern = filePattern;
-    this.beamSqlRecordType = beamSqlRecordType;
+    this.beamSqlRowType = beamSqlRowType;
     this.csvFormat = csvFormat;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
index 525c210..6dc6cd0 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextTable.java
@@ -22,7 +22,7 @@ import java.io.Serializable;
 
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 
 /**
  * {@code BeamTextTable} represents a text file/directory(backed by {@code 
TextIO}).
@@ -30,8 +30,8 @@ import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 public abstract class BeamTextTable extends BaseBeamTable implements 
Serializable {
   protected String filePattern;
 
-  protected BeamTextTable(BeamSqlRecordType beamSqlRecordType, String 
filePattern) {
-    super(beamSqlRecordType);
+  protected BeamTextTable(BeamSqlRowType beamSqlRowType, String filePattern) {
+    super(beamSqlRowType);
     this.filePattern = filePattern;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
index 34b169f..5b21765 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
@@ -27,8 +27,8 @@ import java.util.Iterator;
 import java.util.List;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.coders.BigDecimalCoder;
@@ -57,13 +57,13 @@ public class BeamAggregationTransforms implements 
Serializable{
    * Merge KV to single record.
    */
   public static class MergeAggregationRecord extends DoFn<KV<BeamSqlRow, 
BeamSqlRow>, BeamSqlRow> {
-    private BeamSqlRecordType outRecordType;
+    private BeamSqlRowType outRowType;
     private List<String> aggFieldNames;
     private int windowStartFieldIdx;
 
-    public MergeAggregationRecord(BeamSqlRecordType outRecordType, 
List<AggregateCall> aggList
+    public MergeAggregationRecord(BeamSqlRowType outRowType, 
List<AggregateCall> aggList
         , int windowStartFieldIdx) {
-      this.outRecordType = outRecordType;
+      this.outRowType = outRowType;
       this.aggFieldNames = new ArrayList<>();
       for (AggregateCall ac : aggList) {
         aggFieldNames.add(ac.getName());
@@ -73,7 +73,7 @@ public class BeamAggregationTransforms implements 
Serializable{
 
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) {
-      BeamSqlRow outRecord = new BeamSqlRow(outRecordType);
+      BeamSqlRow outRecord = new BeamSqlRow(outRowType);
       outRecord.updateWindowRange(c.element().getKey(), window);
 
       KV<BeamSqlRow, BeamSqlRow> kvRecord = c.element();
@@ -109,7 +109,7 @@ public class BeamAggregationTransforms implements 
Serializable{
 
     @Override
     public BeamSqlRow apply(BeamSqlRow input) {
-      BeamSqlRecordType typeOfKey = exTypeOfKeyRecord(input.getDataType());
+      BeamSqlRowType typeOfKey = exTypeOfKeyRecord(input.getDataType());
       BeamSqlRow keyOfRecord = new BeamSqlRow(typeOfKey);
       keyOfRecord.updateWindowRange(input, null);
 
@@ -119,14 +119,14 @@ public class BeamAggregationTransforms implements 
Serializable{
       return keyOfRecord;
     }
 
-    private BeamSqlRecordType exTypeOfKeyRecord(BeamSqlRecordType dataType) {
+    private BeamSqlRowType exTypeOfKeyRecord(BeamSqlRowType dataType) {
       List<String> fieldNames = new ArrayList<>();
       List<Integer> fieldTypes = new ArrayList<>();
       for (int idx : groupByKeys) {
         fieldNames.add(dataType.getFieldsName().get(idx));
         fieldTypes.add(dataType.getFieldsType().get(idx));
       }
-      return BeamSqlRecordType.create(fieldNames, fieldTypes);
+      return BeamSqlRowType.create(fieldNames, fieldTypes);
     }
   }
 
@@ -154,10 +154,10 @@ public class BeamAggregationTransforms implements 
Serializable{
     extends CombineFn<BeamSqlRow, AggregationAccumulator, BeamSqlRow> {
     private List<BeamSqlUdaf> aggregators;
     private List<BeamSqlExpression> sourceFieldExps;
-    private BeamSqlRecordType finalRecordType;
+    private BeamSqlRowType finalRowType;
 
     public AggregationAdaptor(List<AggregateCall> aggregationCalls,
-        BeamSqlRecordType sourceRowRecordType) {
+        BeamSqlRowType sourceRowType) {
       aggregators = new ArrayList<>();
       sourceFieldExps = new ArrayList<>();
       List<String> outFieldsName = new ArrayList<>();
@@ -165,7 +165,7 @@ public class BeamAggregationTransforms implements 
Serializable{
       for (AggregateCall call : aggregationCalls) {
         int refIndex = call.getArgList().size() > 0 ? call.getArgList().get(0) 
: 0;
         BeamSqlExpression sourceExp = new BeamSqlInputRefExpression(
-            CalciteUtils.getFieldType(sourceRowRecordType, refIndex), 
refIndex);
+            CalciteUtils.getFieldType(sourceRowType, refIndex), refIndex);
         sourceFieldExps.add(sourceExp);
 
         outFieldsName.add(call.name);
@@ -206,7 +206,7 @@ public class BeamAggregationTransforms implements 
Serializable{
           break;
         }
       }
-      finalRecordType = BeamSqlRecordType.create(outFieldsName, outFieldsType);
+      finalRowType = BeamSqlRowType.create(outFieldsName, outFieldsType);
     }
     @Override
     public AggregationAccumulator createAccumulator() {
@@ -241,7 +241,7 @@ public class BeamAggregationTransforms implements 
Serializable{
     }
     @Override
     public BeamSqlRow extractOutput(AggregationAccumulator accumulator) {
-      BeamSqlRow result = new BeamSqlRow(finalRecordType);
+      BeamSqlRow result = new BeamSqlRow(finalRowType);
       for (int idx = 0; idx < aggregators.size(); ++idx) {
         result.addField(idx, 
aggregators.get(idx).result(accumulator.accumulatorElements.get(idx)));
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
index 8169b83..9ea4376 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
@@ -22,8 +22,8 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.values.KV;
@@ -60,7 +60,7 @@ public class BeamJoinTransforms {
             ? 
input.getDataType().getFieldsType().get(joinColumns.get(i).getKey()) :
             
input.getDataType().getFieldsType().get(joinColumns.get(i).getValue()));
       }
-      BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
+      BeamSqlRowType type = BeamSqlRowType.create(names, types);
 
       // build the row
       BeamSqlRow row = new BeamSqlRow(type);
@@ -149,7 +149,7 @@ public class BeamJoinTransforms {
     List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size());
     types.addAll(leftRow.getDataType().getFieldsType());
     types.addAll(rightRow.getDataType().getFieldsType());
-    BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
+    BeamSqlRowType type = BeamSqlRowType.create(names, types);
 
     BeamSqlRow row = new BeamSqlRow(type);
     // build the row

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
index 2a3357c..886ddcf 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
@@ -20,8 +20,8 @@ package org.apache.beam.dsls.sql.transform;
 import java.util.List;
 import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
 import org.apache.beam.dsls.sql.rel.BeamProjectRel;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.schema.BeamTableUtils;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -34,14 +34,14 @@ import 
org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> {
   private String stepName;
   private BeamSqlExpressionExecutor executor;
-  private BeamSqlRecordType outputRecordType;
+  private BeamSqlRowType outputRowType;
 
   public BeamSqlProjectFn(String stepName, BeamSqlExpressionExecutor executor,
-      BeamSqlRecordType outputRecordType) {
+      BeamSqlRowType outputRowType) {
     super();
     this.stepName = stepName;
     this.executor = executor;
-    this.outputRecordType = outputRecordType;
+    this.outputRowType = outputRowType;
   }
 
   @Setup
@@ -51,11 +51,11 @@ public class BeamSqlProjectFn extends DoFn<BeamSqlRow, 
BeamSqlRow> {
 
   @ProcessElement
   public void processElement(ProcessContext c, BoundedWindow window) {
-    BeamSqlRow inputRecord = c.element();
-    List<Object> results = executor.execute(inputRecord);
+    BeamSqlRow inputRow = c.element();
+    List<Object> results = executor.execute(inputRow);
 
-    BeamSqlRow outRow = new BeamSqlRow(outputRecordType);
-    outRow.updateWindowRange(inputRecord, window);
+    BeamSqlRow outRow = new BeamSqlRow(outputRowType);
+    outRow.updateWindowRange(inputRow, window);
 
     for (int idx = 0; idx < results.size(); ++idx) {
       BeamTableUtils.addFieldWithAutoTypeCasting(outRow, idx, 
results.get(idx));

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
index 919ae5f..4b8696b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/utils/CalciteUtils.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
@@ -78,27 +78,27 @@ public class CalciteUtils {
   /**
    * Get the {@code SqlTypeName} for the specified column of a table.
    */
-  public static SqlTypeName getFieldType(BeamSqlRecordType schema, int index) {
+  public static SqlTypeName getFieldType(BeamSqlRowType schema, int index) {
     return toCalciteType(schema.getFieldsType().get(index));
   }
 
   /**
-   * Generate {@code BeamSqlRecordType} from {@code RelDataType} which is used 
to create table.
+   * Generate {@code BeamSqlRowType} from {@code RelDataType} which is used to 
create table.
    */
-  public static BeamSqlRecordType toBeamRecordType(RelDataType tableInfo) {
+  public static BeamSqlRowType toBeamRowType(RelDataType tableInfo) {
     List<String> fieldNames = new ArrayList<>();
     List<Integer> fieldTypes = new ArrayList<>();
     for (RelDataTypeField f : tableInfo.getFieldList()) {
       fieldNames.add(f.getName());
       fieldTypes.add(toJavaType(f.getType().getSqlTypeName()));
     }
-    return BeamSqlRecordType.create(fieldNames, fieldTypes);
+    return BeamSqlRowType.create(fieldNames, fieldTypes);
   }
 
   /**
    * Create an instance of {@code RelDataType} so it can be used to create a 
table.
    */
-  public static RelProtoDataType toCalciteRecordType(final BeamSqlRecordType 
that) {
+  public static RelProtoDataType toCalciteRowType(final BeamSqlRowType that) {
     return new RelProtoDataType() {
       @Override
       public RelDataType apply(RelDataTypeFactory a) {

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
index 471a856..a142514 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java
@@ -19,8 +19,8 @@ package org.apache.beam.dsls.sql;
 
 import java.sql.Types;
 import java.util.Arrays;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -55,7 +55,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase 
{
     PCollection<BeamSqlRow> result =
         input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql));
 
-    BeamSqlRecordType resultType = 
BeamSqlRecordType.create(Arrays.asList("f_int2", "size"),
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", 
"size"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
     BeamSqlRow record = new BeamSqlRow(resultType);
@@ -98,7 +98,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase 
{
         PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testAggregationFunctions", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(
+    BeamSqlRowType resultType = BeamSqlRowType.create(
         Arrays.asList("f_int2", "size", "sum1", "avg1", "max1", "min1", 
"sum2", "avg2", "max2",
             "min2", "sum3", "avg3", "max3", "min3", "sum4", "avg4", "max4", 
"min4", "sum5", "avg5",
             "max5", "min5", "max6", "min6"),
@@ -167,7 +167,7 @@ public class BeamSqlDslAggregationTest extends 
BeamSqlDslBase {
     PCollection<BeamSqlRow> result =
         input.apply("testDistinct", BeamSql.simpleQuery(sql));
 
-    BeamSqlRecordType resultType = 
BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", 
"f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
     BeamSqlRow record1 = new BeamSqlRow(resultType);
@@ -216,7 +216,7 @@ public class BeamSqlDslAggregationTest extends 
BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testTumbleWindow", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(
+    BeamSqlRowType resultType = BeamSqlRowType.create(
         Arrays.asList("f_int2", "size", "window_start"),
         Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
 
@@ -263,7 +263,7 @@ public class BeamSqlDslAggregationTest extends 
BeamSqlDslBase {
     PCollection<BeamSqlRow> result =
         input.apply("testHopWindow", BeamSql.simpleQuery(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(
+    BeamSqlRowType resultType = BeamSqlRowType.create(
         Arrays.asList("f_int2", "size", "window_start"),
         Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
 
@@ -325,7 +325,7 @@ public class BeamSqlDslAggregationTest extends 
BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testSessionWindow", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = BeamSqlRecordType.create(
+    BeamSqlRowType resultType = BeamSqlRowType.create(
         Arrays.asList("f_int2", "size", "window_start"),
         Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
index 57fcbc3..24f1a0a 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslBase.java
@@ -24,9 +24,9 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.Create;
@@ -52,7 +52,7 @@ public class BeamSqlDslBase {
   @Rule
   public ExpectedException exceptions = ExpectedException.none();
 
-  public static BeamSqlRecordType recordTypeInTableA;
+  public static BeamSqlRowType rowTypeInTableA;
   public static List<BeamSqlRow> recordsInTableA;
 
   //bounded PCollections
@@ -65,22 +65,22 @@ public class BeamSqlDslBase {
 
   @BeforeClass
   public static void prepareClass() throws ParseException {
-    recordTypeInTableA = BeamSqlRecordType.create(
+    rowTypeInTableA = BeamSqlRowType.create(
         Arrays.asList("f_int", "f_long", "f_short", "f_byte", "f_float", 
"f_double", "f_string",
             "f_timestamp", "f_int2"),
         Arrays.asList(Types.INTEGER, Types.BIGINT, Types.SMALLINT, 
Types.TINYINT, Types.FLOAT,
             Types.DOUBLE, Types.VARCHAR, Types.TIMESTAMP, Types.INTEGER));
 
-    recordsInTableA = prepareInputRecordsInTableA();
+    recordsInTableA = prepareInputRowsInTableA();
   }
 
   @Before
   public void preparePCollections(){
     boundedInput1 = PBegin.in(pipeline).apply("boundedInput1",
-        Create.of(recordsInTableA).withCoder(new 
BeamSqlRowCoder(recordTypeInTableA)));
+        Create.of(recordsInTableA).withCoder(new 
BeamSqlRowCoder(rowTypeInTableA)));
 
     boundedInput2 = PBegin.in(pipeline).apply("boundedInput2",
-        Create.of(recordsInTableA.get(0)).withCoder(new 
BeamSqlRowCoder(recordTypeInTableA)));
+        Create.of(recordsInTableA.get(0)).withCoder(new 
BeamSqlRowCoder(rowTypeInTableA)));
 
     unboundedInput1 = prepareUnboundedPCollection1();
     unboundedInput2 = prepareUnboundedPCollection2();
@@ -88,7 +88,7 @@ public class BeamSqlDslBase {
 
   private PCollection<BeamSqlRow> prepareUnboundedPCollection1() {
     TestStream.Builder<BeamSqlRow> values = TestStream
-        .create(new BeamSqlRowCoder(recordTypeInTableA));
+        .create(new BeamSqlRowCoder(rowTypeInTableA));
 
     for (BeamSqlRow row : recordsInTableA) {
       values = values.advanceWatermarkTo(new 
Instant(row.getDate("f_timestamp")));
@@ -100,7 +100,7 @@ public class BeamSqlDslBase {
 
   private PCollection<BeamSqlRow> prepareUnboundedPCollection2() {
     TestStream.Builder<BeamSqlRow> values = TestStream
-        .create(new BeamSqlRowCoder(recordTypeInTableA));
+        .create(new BeamSqlRowCoder(rowTypeInTableA));
 
     BeamSqlRow row = recordsInTableA.get(0);
     values = values.advanceWatermarkTo(new 
Instant(row.getDate("f_timestamp")));
@@ -109,10 +109,10 @@ public class BeamSqlDslBase {
     return PBegin.in(pipeline).apply("unboundedInput2", 
values.advanceWatermarkToInfinity());
   }
 
-  private static List<BeamSqlRow> prepareInputRecordsInTableA() throws 
ParseException{
+  private static List<BeamSqlRow> prepareInputRowsInTableA() throws 
ParseException{
     List<BeamSqlRow> rows = new ArrayList<>();
 
-    BeamSqlRow row1 = new BeamSqlRow(recordTypeInTableA);
+    BeamSqlRow row1 = new BeamSqlRow(rowTypeInTableA);
     row1.addField(0, 1);
     row1.addField(1, 1000L);
     row1.addField(2, Short.valueOf("1"));
@@ -124,7 +124,7 @@ public class BeamSqlDslBase {
     row1.addField(8, 0);
     rows.add(row1);
 
-    BeamSqlRow row2 = new BeamSqlRow(recordTypeInTableA);
+    BeamSqlRow row2 = new BeamSqlRow(rowTypeInTableA);
     row2.addField(0, 2);
     row2.addField(1, 2000L);
     row2.addField(2, Short.valueOf("2"));
@@ -136,7 +136,7 @@ public class BeamSqlDslBase {
     row2.addField(8, 0);
     rows.add(row2);
 
-    BeamSqlRow row3 = new BeamSqlRow(recordTypeInTableA);
+    BeamSqlRow row3 = new BeamSqlRow(rowTypeInTableA);
     row3.addField(0, 3);
     row3.addField(1, 3000L);
     row3.addField(2, Short.valueOf("3"));
@@ -148,7 +148,7 @@ public class BeamSqlDslBase {
     row3.addField(8, 0);
     rows.add(row3);
 
-    BeamSqlRow row4 = new BeamSqlRow(recordTypeInTableA);
+    BeamSqlRow row4 = new BeamSqlRow(rowTypeInTableA);
     row4.addField(0, 4);
     row4.addField(1, 4000L);
     row4.addField(2, Short.valueOf("4"));

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
index ae5f4e5..e010915 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslJoinTest.java
@@ -23,9 +23,9 @@ import static 
org.apache.beam.dsls.sql.rel.BeamJoinRelBoundedVsBoundedTest.ORDER
 
 import java.sql.Types;
 import java.util.Arrays;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
@@ -41,8 +41,8 @@ public class BeamSqlDslJoinTest {
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
 
-  private static final BeamSqlRecordType SOURCE_RECORD_TYPE =
-      BeamSqlRecordType.create(
+  private static final BeamSqlRowType SOURCE_RECORD_TYPE =
+      BeamSqlRowType.create(
           Arrays.asList(
               "order_id", "site_id", "price"
           ),
@@ -54,8 +54,8 @@ public class BeamSqlDslJoinTest {
   private static final BeamSqlRowCoder SOURCE_CODER =
       new BeamSqlRowCoder(SOURCE_RECORD_TYPE);
 
-  private static final BeamSqlRecordType RESULT_RECORD_TYPE =
-      BeamSqlRecordType.create(
+  private static final BeamSqlRowType RESULT_RECORD_TYPE =
+      BeamSqlRowType.create(
           Arrays.asList(
           "order_id", "site_id", "price", "order_id0", "site_id0", "price0"
           ),

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
index 10f61b0..ab5a639 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslProjectTest.java
@@ -19,8 +19,8 @@ package org.apache.beam.dsls.sql;
 
 import java.sql.Types;
 import java.util.Arrays;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -81,7 +81,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testPartialFields", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = 
BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", 
"f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
     BeamSqlRow record = new BeamSqlRow(resultType);
@@ -116,7 +116,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testPartialFieldsInMultipleRow", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = 
BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", 
"f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
     BeamSqlRow record1 = new BeamSqlRow(resultType);
@@ -163,7 +163,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testPartialFieldsInRows", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = 
BeamSqlRecordType.create(Arrays.asList("f_int", "f_long"),
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", 
"f_long"),
         Arrays.asList(Types.INTEGER, Types.BIGINT));
 
     BeamSqlRow record1 = new BeamSqlRow(resultType);
@@ -210,7 +210,7 @@ public class BeamSqlDslProjectTest extends BeamSqlDslBase {
         PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), input)
         .apply("testLiteralField", BeamSql.query(sql));
 
-    BeamSqlRecordType resultType = 
BeamSqlRecordType.create(Arrays.asList("literal_field"),
+    BeamSqlRowType resultType = 
BeamSqlRowType.create(Arrays.asList("literal_field"),
         Arrays.asList(Types.INTEGER));
 
     BeamSqlRow record = new BeamSqlRow(resultType);

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
index 332a273..726f658 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslUdfUdafTest.java
@@ -20,8 +20,8 @@ package org.apache.beam.dsls.sql;
 import java.sql.Types;
 import java.util.Arrays;
 import java.util.Iterator;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
 import org.apache.beam.dsls.sql.schema.BeamSqlUdf;
 import org.apache.beam.sdk.testing.PAssert;
@@ -39,7 +39,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
    */
   @Test
   public void testUdaf() throws Exception {
-    BeamSqlRecordType resultType = 
BeamSqlRecordType.create(Arrays.asList("f_int2", "squaresum"),
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int2", 
"squaresum"),
         Arrays.asList(Types.INTEGER, Types.INTEGER));
 
     BeamSqlRow record = new BeamSqlRow(resultType);
@@ -69,7 +69,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
    */
   @Test
   public void testUdf() throws Exception{
-    BeamSqlRecordType resultType = 
BeamSqlRecordType.create(Arrays.asList("f_int", "cubicvalue"),
+    BeamSqlRowType resultType = BeamSqlRowType.create(Arrays.asList("f_int", 
"cubicvalue"),
         Arrays.asList(Types.INTEGER, Types.INTEGER));
 
     BeamSqlRow record = new BeamSqlRow(resultType);

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
index 8c0a28d..a669635 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
@@ -21,8 +21,8 @@ package org.apache.beam.dsls.sql;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.transforms.DoFn;
 
 /**
@@ -69,7 +69,7 @@ public class TestUtils {
    * {@code}
    */
   public static class RowsBuilder {
-    private BeamSqlRecordType type;
+    private BeamSqlRowType type;
     private List<BeamSqlRow> rows = new ArrayList<>();
 
     /**
@@ -86,9 +86,9 @@ public class TestUtils {
      * @args pairs of column type and column names.
      */
     public static RowsBuilder of(final Object... args) {
-      BeamSqlRecordType beamSQLRecordType = buildBeamSqlRecordType(args);
+      BeamSqlRowType beamSQLRowType = buildBeamSqlRowType(args);
       RowsBuilder builder = new RowsBuilder();
-      builder.type = beamSQLRecordType;
+      builder.type = beamSQLRowType;
 
       return builder;
     }
@@ -99,13 +99,13 @@ public class TestUtils {
      * <p>For example:
      * <pre>{@code
      * TestUtils.RowsBuilder.of(
-     *   beamSqlRecordType
+     *   beamSqlRowType
      * )}</pre>
-     * @beamSQLRecordType the record type.
+     * @beamSQLRowType the record type.
      */
-    public static RowsBuilder of(final BeamSqlRecordType beamSQLRecordType) {
+    public static RowsBuilder of(final BeamSqlRowType beamSQLRowType) {
       RowsBuilder builder = new RowsBuilder();
-      builder.type = beamSQLRecordType;
+      builder.type = beamSQLRowType;
 
       return builder;
     }
@@ -140,12 +140,12 @@ public class TestUtils {
   }
 
   /**
-   * Convenient way to build a {@code BeamSqlRecordType}.
+   * Convenient way to build a {@code BeamSqlRowType}.
    *
    * <p>e.g.
    *
    * <pre>{@code
-   *   buildBeamSqlRecordType(
+   *   buildBeamSqlRowType(
    *       Types.BIGINT, "order_id",
    *       Types.INTEGER, "site_id",
    *       Types.DOUBLE, "price",
@@ -153,7 +153,7 @@ public class TestUtils {
    *   )
    * }</pre>
    */
-  public static BeamSqlRecordType buildBeamSqlRecordType(Object... args) {
+  public static BeamSqlRowType buildBeamSqlRowType(Object... args) {
     List<Integer> types = new ArrayList<>();
     List<String> names = new ArrayList<>();
 
@@ -162,7 +162,7 @@ public class TestUtils {
       names.add((String) args[i + 1]);
     }
 
-    return BeamSqlRecordType.create(names, types);
+    return BeamSqlRowType.create(names, types);
   }
 
   /**
@@ -172,14 +172,14 @@ public class TestUtils {
    *
    * <pre>{@code
    *   buildRows(
-   *       recordType,
+   *       rowType,
    *       1, 1, 1, // the first row
    *       2, 2, 2, // the second row
    *       ...
    *   )
    * }</pre>
    */
-  public static List<BeamSqlRow> buildRows(BeamSqlRecordType type, List args) {
+  public static List<BeamSqlRow> buildRows(BeamSqlRowType type, List args) {
     List<BeamSqlRow> rows = new ArrayList<>();
     int fieldCount = type.size();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
index ddbc3d8..b9ce9b4 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -32,9 +32,9 @@ import java.util.TimeZone;
 import org.apache.beam.dsls.sql.BeamSql;
 import org.apache.beam.dsls.sql.TestUtils;
 import org.apache.beam.dsls.sql.mock.MockedBoundedTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
@@ -63,7 +63,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
   public final TestPipeline pipeline = TestPipeline.create();
 
   protected PCollection<BeamSqlRow> getTestPCollection() {
-    BeamSqlRecordType type = BeamSqlRecordType.create(
+    BeamSqlRowType type = BeamSqlRowType.create(
         Arrays.asList("ts", "c_tinyint", "c_smallint",
             "c_integer", "c_bigint", "c_float", "c_double", "c_decimal",
             "c_tinyint_max", "c_smallint_max", "c_integer_max", 
"c_bigint_max"),
@@ -156,7 +156,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
         PCollection<BeamSqlRow> rows = 
inputCollection.apply(BeamSql.simpleQuery(getSql()));
         PAssert.that(rows).containsInAnyOrder(
             TestUtils.RowsBuilder
-                .of(BeamSqlRecordType.create(names, types))
+                .of(BeamSqlRowType.create(names, types))
                 .addRows(values)
                 .getRows()
         );

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
index 5afd273..d7b54c7 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
@@ -23,8 +23,8 @@ import 
org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
 import org.apache.beam.dsls.sql.planner.BeamRelDataTypeSystem;
 import org.apache.beam.dsls.sql.planner.BeamRuleSets;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.config.Lex;
@@ -57,7 +57,7 @@ public class BeamSqlFnExecutorTestBase {
       RelDataTypeSystem.DEFAULT);
   public static RelDataType relDataType;
 
-  public static BeamSqlRecordType beamRecordType;
+  public static BeamSqlRowType beamRowType;
   public static BeamSqlRow record;
 
   public static RelBuilder relBuilder;
@@ -70,8 +70,8 @@ public class BeamSqlFnExecutorTestBase {
         .add("price", SqlTypeName.DOUBLE)
         .add("order_time", SqlTypeName.BIGINT).build();
 
-    beamRecordType = CalciteUtils.toBeamRecordType(relDataType);
-    record = new BeamSqlRow(beamRecordType);
+    beamRowType = CalciteUtils.toBeamRowType(relDataType);
+    record = new BeamSqlRow(beamRowType);
 
     record.addField(0, 1234567L);
     record.addField(1, 0);

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
index 84f49a9..6c1dcb2 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedBoundedTable.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.dsls.sql.mock;
 
-import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRecordType;
+import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRowType;
 import static org.apache.beam.dsls.sql.TestUtils.buildRows;
 
 import java.util.ArrayList;
@@ -25,8 +25,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -45,8 +45,8 @@ public class MockedBoundedTable extends MockedTable {
   /** rows flow out from this table. */
   private final List<BeamSqlRow> rows = new ArrayList<>();
 
-  public MockedBoundedTable(BeamSqlRecordType beamSqlRecordType) {
-    super(beamSqlRecordType);
+  public MockedBoundedTable(BeamSqlRowType beamSqlRowType) {
+    super(beamSqlRowType);
   }
 
   /**
@@ -63,13 +63,13 @@ public class MockedBoundedTable extends MockedTable {
    * }</pre>
    */
   public static MockedBoundedTable of(final Object... args){
-    return new MockedBoundedTable(buildBeamSqlRecordType(args));
+    return new MockedBoundedTable(buildBeamSqlRowType(args));
   }
 
   /**
    * Build a mocked bounded table with the specified type.
    */
-  public static MockedBoundedTable of(final BeamSqlRecordType type) {
+  public static MockedBoundedTable of(final BeamSqlRowType type) {
     return new MockedBoundedTable(type);
   }
 
@@ -88,7 +88,7 @@ public class MockedBoundedTable extends MockedTable {
    * }</pre>
    */
   public MockedBoundedTable addRows(Object... args) {
-    List<BeamSqlRow> rows = buildRows(getRecordType(), Arrays.asList(args));
+    List<BeamSqlRow> rows = buildRows(getRowType(), Arrays.asList(args));
     this.rows.addAll(rows);
     return this;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
index eed740a..858ae88 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedTable.java
@@ -20,8 +20,8 @@ package org.apache.beam.dsls.sql.mock;
 
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -31,8 +31,8 @@ import org.apache.beam.sdk.values.PDone;
  */
 public abstract class MockedTable extends BaseBeamTable {
   public static final AtomicInteger COUNTER = new AtomicInteger();
-  public MockedTable(BeamSqlRecordType beamSqlRecordType) {
-    super(beamSqlRecordType);
+  public MockedTable(BeamSqlRowType beamSqlRowType) {
+    super(beamSqlRowType);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
index 0f8c912..ee6eb22 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/mock/MockedUnboundedTable.java
@@ -18,16 +18,16 @@
 
 package org.apache.beam.dsls.sql.mock;
 
-import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRecordType;
+import static org.apache.beam.dsls.sql.TestUtils.buildBeamSqlRowType;
 import static org.apache.beam.dsls.sql.TestUtils.buildRows;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.values.PCollection;
@@ -44,8 +44,8 @@ public class MockedUnboundedTable extends MockedTable {
   private final List<Pair<Duration, List<BeamSqlRow>>> timestampedRows = new 
ArrayList<>();
   /** specify the index of column in the row which stands for the event time 
field. */
   private int timestampField;
-  private MockedUnboundedTable(BeamSqlRecordType beamSqlRecordType) {
-    super(beamSqlRecordType);
+  private MockedUnboundedTable(BeamSqlRowType beamSqlRowType) {
+    super(beamSqlRowType);
   }
 
   /**
@@ -62,7 +62,7 @@ public class MockedUnboundedTable extends MockedTable {
    * }</pre>
    */
   public static MockedUnboundedTable of(final Object... args){
-    return new MockedUnboundedTable(buildBeamSqlRecordType(args));
+    return new MockedUnboundedTable(buildBeamSqlRowType(args));
   }
 
   public MockedUnboundedTable timestampColumnIndex(int idx) {
@@ -85,7 +85,7 @@ public class MockedUnboundedTable extends MockedTable {
    * }</pre>
    */
   public MockedUnboundedTable addRows(Duration duration, Object... args) {
-    List<BeamSqlRow> rows = buildRows(getRecordType(), Arrays.asList(args));
+    List<BeamSqlRow> rows = buildRows(getRowType(), Arrays.asList(args));
     // record the watermark + rows
     this.timestampedRows.add(Pair.of(duration, rows));
     return this;
@@ -97,7 +97,7 @@ public class MockedUnboundedTable extends MockedTable {
 
   @Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
     TestStream.Builder<BeamSqlRow> values = TestStream.create(
-        new BeamSqlRowCoder(beamSqlRecordType));
+        new BeamSqlRowCoder(beamSqlRowType));
 
     for (Pair<Duration, List<BeamSqlRow>> pair : timestampedRows) {
       values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey()));

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
index cf1d714..e41e341 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
@@ -58,10 +58,10 @@ public class BeamSqlRowCoderTest {
       }
     };
 
-    BeamSqlRecordType beamSQLRecordType = CalciteUtils.toBeamRecordType(
+    BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(
         protoRowType.apply(new JavaTypeFactoryImpl(
             RelDataTypeSystem.DEFAULT)));
-    BeamSqlRow row = new BeamSqlRow(beamSQLRecordType);
+    BeamSqlRow row = new BeamSqlRow(beamSQLRowType);
     row.addField("col_tinyint", Byte.valueOf("1"));
     row.addField("col_smallint", Short.valueOf("1"));
     row.addField("col_integer", 1);
@@ -77,7 +77,7 @@ public class BeamSqlRowCoderTest {
     row.addField("col_boolean", true);
 
 
-    BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRecordType);
+    BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRowType);
     CoderProperties.coderDecodeEncodeEqual(coder, row);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
index 9cd0915..01cd960 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java
@@ -19,10 +19,9 @@
 package org.apache.beam.dsls.sql.schema.kafka;
 
 import java.io.Serializable;
-
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -91,8 +90,8 @@ public class BeamKafkaCSVTableTest {
     pipeline.run();
   }
 
-  private static BeamSqlRecordType genRowType() {
-    return CalciteUtils.toBeamRecordType(new RelProtoDataType() {
+  private static BeamSqlRowType genRowType() {
+    return CalciteUtils.toBeamRowType(new RelProtoDataType() {
 
       @Override public RelDataType apply(RelDataTypeFactory a0) {
         return a0.builder().add("order_id", SqlTypeName.BIGINT)

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
index 176df46..b6e11e5 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java
@@ -31,10 +31,9 @@ import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -81,20 +80,20 @@ public class BeamTextCSVTableTest {
   private static File writerTargetFile;
 
   @Test public void testBuildIOReader() {
-    PCollection<BeamSqlRow> rows = new 
BeamTextCSVTable(buildBeamSqlRecordType(),
+    PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
         readerSourceFile.getAbsolutePath()).buildIOReader(pipeline);
     PAssert.that(rows).containsInAnyOrder(testDataRows);
     pipeline.run();
   }
 
   @Test public void testBuildIOWriter() {
-    new BeamTextCSVTable(buildBeamSqlRecordType(),
+    new BeamTextCSVTable(buildBeamSqlRowType(),
         readerSourceFile.getAbsolutePath()).buildIOReader(pipeline)
-        .apply(new BeamTextCSVTable(buildBeamSqlRecordType(), 
writerTargetFile.getAbsolutePath())
+        .apply(new BeamTextCSVTable(buildBeamSqlRowType(), 
writerTargetFile.getAbsolutePath())
             .buildIOWriter());
     pipeline.run();
 
-    PCollection<BeamSqlRow> rows = new 
BeamTextCSVTable(buildBeamSqlRecordType(),
+    PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildBeamSqlRowType(),
         writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2);
 
     // confirm the two reads match
@@ -167,11 +166,11 @@ public class BeamTextCSVTableTest {
         .add("amount", SqlTypeName.DOUBLE).add("user_name", 
SqlTypeName.VARCHAR).build();
   }
 
-  private static BeamSqlRecordType buildBeamSqlRecordType() {
-    return CalciteUtils.toBeamRecordType(buildRelDataType());
+  private static BeamSqlRowType buildBeamSqlRowType() {
+    return CalciteUtils.toBeamRowType(buildRelDataType());
   }
 
   private static BeamSqlRow buildRow(Object[] data) {
-    return new BeamSqlRow(buildBeamSqlRecordType(), Arrays.asList(data));
+    return new BeamSqlRow(buildBeamSqlRowType(), Arrays.asList(data));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
index a0fed22..5d5d4fc 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
@@ -21,11 +21,10 @@ import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.coders.IterableCoder;
@@ -64,9 +63,9 @@ public class BeamAggregationTransformTest extends 
BeamTransformBaseTest{
 
   private List<AggregateCall> aggCalls;
 
-  private BeamSqlRecordType keyType;
-  private BeamSqlRecordType aggPartType;
-  private BeamSqlRecordType outputType;
+  private BeamSqlRowType keyType;
+  private BeamSqlRowType aggPartType;
+  private BeamSqlRowType outputType;
 
   private BeamSqlRowCoder inRecordCoder;
   private BeamSqlRowCoder keyCoder;
@@ -405,7 +404,7 @@ public class BeamAggregationTransformTest extends 
BeamTransformBaseTest{
   /**
    * Row type of final output row.
    */
-  private BeamSqlRecordType prepareFinalRowType() {
+  private BeamSqlRowType prepareFinalRowType() {
     FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
     List<KV<String, SqlTypeName>> columnMetadata =
         Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", 
SqlTypeName.BIGINT),
@@ -433,7 +432,7 @@ public class BeamAggregationTransformTest extends 
BeamTransformBaseTest{
     for (KV<String, SqlTypeName> cm : columnMetadata) {
       builder.add(cm.getKey(), cm.getValue());
     }
-    return CalciteUtils.toBeamRecordType(builder.build());
+    return CalciteUtils.toBeamRowType(builder.build());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/a9c8a8a1/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
index 2e91405..4045bc8 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java
@@ -23,8 +23,8 @@ import java.text.SimpleDateFormat;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
@@ -38,7 +38,7 @@ import org.junit.BeforeClass;
 public class BeamTransformBaseTest {
   public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss");
 
-  public static  BeamSqlRecordType inputRowType;
+  public static BeamSqlRowType inputRowType;
   public static List<BeamSqlRow> inputRows;
 
   @BeforeClass
@@ -66,14 +66,14 @@ public class BeamTransformBaseTest {
   }
 
   /**
-   * create a {@code BeamSqlRecordType} for given column metadata.
+   * create a {@code BeamSqlRowType} for given column metadata.
    */
-  public static BeamSqlRecordType initTypeOfSqlRow(List<KV<String, 
SqlTypeName>> columnMetadata){
+  public static BeamSqlRowType initTypeOfSqlRow(List<KV<String, SqlTypeName>> 
columnMetadata){
     FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
     for (KV<String, SqlTypeName> cm : columnMetadata) {
       builder.add(cm.getKey(), cm.getValue());
     }
-    return CalciteUtils.toBeamRecordType(builder.build());
+    return CalciteUtils.toBeamRowType(builder.build());
   }
 
   /**
@@ -89,7 +89,7 @@ public class BeamTransformBaseTest {
    */
   public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> 
columnMetadata,
       List<Object> rowValues){
-    BeamSqlRecordType rowType = initTypeOfSqlRow(columnMetadata);
+    BeamSqlRowType rowType = initTypeOfSqlRow(columnMetadata);
 
     return new BeamSqlRow(rowType, rowValues);
   }

Reply via email to