Repository: beam Updated Branches: refs/heads/DSL_SQL be01be5cb -> 2a1377e1c
[BEAM-2744] rename BeamRecordType#size() Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/affb8f6e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/affb8f6e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/affb8f6e Branch: refs/heads/DSL_SQL Commit: affb8f6e9f1fb3e4df79c787528b7af726100d29 Parents: be01be5 Author: James Xu <xumingmi...@gmail.com> Authored: Tue Aug 8 15:15:59 2017 +0800 Committer: Tyler Akidau <taki...@apache.org> Committed: Thu Aug 10 13:06:15 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/coders/BeamRecordCoder.java | 12 +++++------ .../org/apache/beam/sdk/values/BeamRecord.java | 22 +++++--------------- .../apache/beam/sdk/values/BeamRecordType.java | 7 ++++++- .../extensions/sql/impl/rel/BeamJoinRel.java | 2 +- .../extensions/sql/impl/rel/BeamValuesRel.java | 2 +- .../sql/impl/transform/BeamJoinTransforms.java | 4 ++-- .../extensions/sql/schema/BeamTableUtils.java | 10 ++++----- .../sql/BeamSqlDslAggregationTest.java | 16 +++++++------- .../beam/sdk/extensions/sql/TestUtils.java | 6 +++--- 9 files changed, 38 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/affb8f6e/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java index 4e24b82..cbed87d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BeamRecordCoder.java @@ -43,7 +43,7 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> { } public static BeamRecordCoder of(BeamRecordType recordType, List<Coder> coderArray){ - if (recordType.size() != coderArray.size()) { + if (recordType.getFieldCount() != coderArray.size()) { throw new IllegalArgumentException("Coder size doesn't match with field size"); } return new BeamRecordCoder(recordType, coderArray); @@ -57,7 +57,7 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> { public void encode(BeamRecord value, OutputStream outStream) throws CoderException, IOException { nullListCoder.encode(scanNullFields(value), outStream); - for (int idx = 0; idx < value.size(); ++idx) { + for (int idx = 0; idx < value.getFieldCount(); ++idx) { if (value.getFieldValue(idx) == null) { continue; } @@ -70,8 +70,8 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> { public BeamRecord decode(InputStream inStream) throws CoderException, IOException { BitSet nullFields = nullListCoder.decode(inStream); - List<Object> fieldValues = new ArrayList<>(recordType.size()); - for (int idx = 0; idx < recordType.size(); ++idx) { + List<Object> fieldValues = new ArrayList<>(recordType.getFieldCount()); + for (int idx = 0; idx < recordType.getFieldCount(); ++idx) { if (nullFields.get(idx)) { fieldValues.add(null); } else { @@ -87,8 +87,8 @@ public class BeamRecordCoder extends CustomCoder<BeamRecord> { * Scan {@link BeamRecord} to find fields with a NULL value. */ private BitSet scanNullFields(BeamRecord record){ - BitSet nullFields = new BitSet(record.size()); - for (int idx = 0; idx < record.size(); ++idx) { + BitSet nullFields = new BitSet(record.getFieldCount()); + for (int idx = 0; idx < record.getFieldCount(); ++idx) { if (record.getFieldValue(idx) == null) { nullFields.set(idx); } http://git-wip-us.apache.org/repos/asf/beam/blob/affb8f6e/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java index a3ede3c..fa3b574 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecord.java @@ -52,13 +52,13 @@ public class BeamRecord implements Serializable { } this.dataType = dataType; - this.dataValues = new ArrayList<>(dataType.size()); + this.dataValues = new ArrayList<>(dataType.getFieldCount()); - for (int idx = 0; idx < dataType.size(); ++idx) { + for (int idx = 0; idx < dataType.getFieldCount(); ++idx) { dataValues.add(null); } - for (int idx = 0; idx < dataType.size(); ++idx) { + for (int idx = 0; idx < dataType.getFieldCount(); ++idx) { addField(idx, rawDataValues.get(idx)); } } @@ -168,7 +168,7 @@ public class BeamRecord implements Serializable { return (Boolean) getFieldValue(idx); } - public int size() { + public int getFieldCount() { return dataValues.size(); } @@ -182,19 +182,7 @@ public class BeamRecord implements Serializable { @Override public String toString() { - return "BeamSqlRow [dataValues=" + dataValues + ", dataType=" + dataType + "]"; - } - - /** - * Return data fields as key=value. - */ - public String valueInString() { - StringBuilder sb = new StringBuilder(); - for (int idx = 0; idx < size(); ++idx) { - sb.append( - String.format(",%s=%s", getDataType().getFieldNames().get(idx), getFieldValue(idx))); - } - return sb.substring(1); + return "BeamRecord [dataValues=" + dataValues + ", dataType=" + dataType + "]"; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/affb8f6e/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java index 6ab783c..29cc80d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/BeamRecordType.java @@ -65,7 +65,12 @@ public class BeamRecordType implements Serializable{ return fieldNames.indexOf(fieldName); } - public int size(){ + public int getFieldCount(){ return fieldNames.size(); } + + @Override + public String toString() { + return "BeamRecordType [fieldsName=" + fieldNames + "]"; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/affb8f6e/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java index 9dceb25..5ac9575 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java @@ -256,7 +256,7 @@ public class BeamJoinRel extends Join implements BeamRelNode { private BeamRecord buildNullRow(BeamRelNode relNode) { BeamRecordSqlType leftType = CalciteUtils.toBeamRowType(relNode.getRowType()); - return new BeamRecord(leftType, Collections.nCopies(leftType.size(), null)); + return new BeamRecord(leftType, Collections.nCopies(leftType.getFieldCount(), null)); } private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) { http://git-wip-us.apache.org/repos/asf/beam/blob/affb8f6e/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java index fde002e..c4caff3 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamValuesRel.java @@ -65,7 +65,7 @@ public class BeamValuesRel extends Values implements BeamRelNode { BeamRecordSqlType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType()); for (ImmutableList<RexLiteral> tuple : tuples) { - List<Object> fieldsValue = new ArrayList<>(beamSQLRowType.size()); + List<Object> fieldsValue = new ArrayList<>(beamSQLRowType.getFieldCount()); for (int i = 0; i < tuple.size(); i++) { fieldsValue.add(BeamTableUtils.autoCastField( beamSQLRowType.getFieldTypeByIndex(i), tuple.get(i).getValue())); http://git-wip-us.apache.org/repos/asf/beam/blob/affb8f6e/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java index 9a48c53..7a8d10d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java @@ -145,11 +145,11 @@ public class BeamJoinTransforms { private static BeamRecord combineTwoRowsIntoOneHelper(BeamRecord leftRow, BeamRecord rightRow) { // build the type - List<String> names = new ArrayList<>(leftRow.size() + rightRow.size()); + List<String> names = new ArrayList<>(leftRow.getFieldCount() + rightRow.getFieldCount()); names.addAll(leftRow.getDataType().getFieldNames()); names.addAll(rightRow.getDataType().getFieldNames()); - List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size()); + List<Integer> types = new ArrayList<>(leftRow.getFieldCount() + rightRow.getFieldCount()); types.addAll(BeamSqlRecordHelper.getSqlRecordType(leftRow).getFieldTypes()); types.addAll(BeamSqlRecordHelper.getSqlRecordType(rightRow).getFieldTypes()); BeamRecordSqlType type = BeamRecordSqlType.create(names, types); http://git-wip-us.apache.org/repos/asf/beam/blob/affb8f6e/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java index 99f9522..687a082 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/schema/BeamTableUtils.java @@ -41,18 +41,18 @@ public final class BeamTableUtils { CSVFormat csvFormat, String line, BeamRecordSqlType beamRecordSqlType) { - List<Object> fieldsValue = new ArrayList<>(beamRecordSqlType.size()); + List<Object> fieldsValue = new ArrayList<>(beamRecordSqlType.getFieldCount()); try (StringReader reader = new StringReader(line)) { CSVParser parser = csvFormat.parse(reader); CSVRecord rawRecord = parser.getRecords().get(0); - if (rawRecord.size() != beamRecordSqlType.size()) { + if (rawRecord.size() != beamRecordSqlType.getFieldCount()) { throw new IllegalArgumentException(String.format( "Expect %d fields, but actually %d", - beamRecordSqlType.size(), rawRecord.size() + beamRecordSqlType.getFieldCount(), rawRecord.size() )); } else { - for (int idx = 0; idx < beamRecordSqlType.size(); idx++) { + for (int idx = 0; idx < beamRecordSqlType.getFieldCount(); idx++) { String raw = rawRecord.get(idx); fieldsValue.add(autoCastField(beamRecordSqlType.getFieldTypeByIndex(idx), raw)); } @@ -66,7 +66,7 @@ public final class BeamTableUtils { public static String beamSqlRow2CsvLine(BeamRecord row, CSVFormat csvFormat) { StringWriter writer = new StringWriter(); try (CSVPrinter printer = csvFormat.print(writer)) { - for (int i = 0; i < row.size(); i++) { + for (int i = 0; i < row.getFieldCount(); i++) { printer.print(row.getFieldValue(i).toString()); } printer.println(); http://git-wip-us.apache.org/repos/asf/beam/blob/affb8f6e/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java index 4e74dbb..db562da 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java @@ -49,7 +49,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { } private void runAggregationWithoutWindow(PCollection<BeamRecord> input) throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION GROUP BY f_int2"; + String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount` FROM PCOLLECTION GROUP BY f_int2"; PCollection<BeamRecord> result = input.apply("testAggregationWithoutWindow", BeamSql.simpleQuery(sql)); @@ -57,6 +57,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { BeamRecordSqlType resultType = BeamRecordSqlType.create(Arrays.asList("f_int2", "size"), Arrays.asList(Types.INTEGER, Types.BIGINT)); + BeamRecord record = new BeamRecord(resultType, 0, 4L); PAssert.that(result).containsInAnyOrder(record); @@ -81,7 +82,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { } private void runAggregationFunctions(PCollection<BeamRecord> input) throws Exception{ - String sql = "select f_int2, count(*) as size, " + String sql = "select f_int2, count(*) as getFieldCount, " + "sum(f_long) as sum1, avg(f_long) as avg1, max(f_long) as max1, min(f_long) as min1," + "sum(f_short) as sum2, avg(f_short) as avg2, max(f_short) as max2, min(f_short) as min2," + "sum(f_byte) as sum3, avg(f_byte) as avg3, max(f_byte) as max3, min(f_byte) as min3," @@ -171,7 +172,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { } private void runTumbleWindow(PCollection<BeamRecord> input) throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size`," + String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount`," + " TUMBLE_START(f_timestamp, INTERVAL '1' HOUR) AS `window_start`" + " FROM TABLE_A" + " GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)"; @@ -208,7 +209,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { } private void runHopWindow(PCollection<BeamRecord> input) throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size`," + String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount`," + " HOP_START(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE) AS `window_start`" + " FROM PCOLLECTION" + " GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)"; @@ -246,7 +247,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { } private void runSessionWindow(PCollection<BeamRecord> input) throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size`," + String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount`," + " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`" + " FROM TABLE_A" + " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)"; @@ -273,7 +274,7 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { "Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL HOUR>)'"); pipeline.enableAbandonedNodeEnforcement(false); - String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A " + String sql = "SELECT f_int2, COUNT(*) AS `getFieldCount` FROM TABLE_A " + "GROUP BY f_int2, TUMBLE(f_long, INTERVAL '1' HOUR)"; PCollection<BeamRecord> result = PCollectionTuple.of(new TupleTag<BeamRecord>("TABLE_A"), boundedInput1) @@ -288,7 +289,8 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { exceptions.expectMessage("Encountered \"*\""); pipeline.enableAbandonedNodeEnforcement(false); - String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` FROM PCOLLECTION GROUP BY f_int2"; + String sql = "SELECT f_int2, COUNT(DISTINCT *) AS `size` " + + "FROM PCOLLECTION GROUP BY f_int2"; PCollection<BeamRecord> result = boundedInput1.apply("testUnsupportedDistinct", BeamSql.simpleQuery(sql)); http://git-wip-us.apache.org/repos/asf/beam/blob/affb8f6e/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java index aa1fc29..373deb7 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java @@ -35,7 +35,7 @@ public class TestUtils { public static class BeamSqlRow2StringDoFn extends DoFn<BeamRecord, String> { @ProcessElement public void processElement(ProcessContext ctx) { - ctx.output(ctx.element().valueInString()); + ctx.output(ctx.element().toString()); } } @@ -45,7 +45,7 @@ public class TestUtils { public static List<String> beamSqlRows2Strings(List<BeamRecord> rows) { List<String> strs = new ArrayList<>(); for (BeamRecord row : rows) { - strs.add(row.valueInString()); + strs.add(row.toString()); } return strs; @@ -181,7 +181,7 @@ public class TestUtils { */ public static List<BeamRecord> buildRows(BeamRecordSqlType type, List args) { List<BeamRecord> rows = new ArrayList<>(); - int fieldCount = type.size(); + int fieldCount = type.getFieldCount(); for (int i = 0; i < args.size(); i += fieldCount) { rows.add(new BeamRecord(type, args.subList(i, i + fieldCount)));