Repository: beam Updated Branches: refs/heads/DSL_SQL 5fea74638 -> a452b8020
cleanup BeamSqlRow Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bdea7a6b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bdea7a6b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bdea7a6b Branch: refs/heads/DSL_SQL Commit: bdea7a6b62f9d131efca04266b3adbec15d66543 Parents: a976ec0 Author: James Xu <xumingmi...@gmail.com> Authored: Thu Jul 13 18:42:02 2017 +0800 Committer: James Xu <xumingmi...@gmail.com> Committed: Fri Jul 14 11:15:53 2017 +0800 ---------------------------------------------------------------------- .../apache/beam/dsls/sql/schema/BeamSqlRow.java | 185 ++++--------------- 1 file changed, 37 insertions(+), 148 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/bdea7a6b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java index b21a018..082d92a 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java @@ -19,10 +19,13 @@ package org.apache.beam.dsls.sql.schema; import java.io.Serializable; import java.math.BigDecimal; +import java.sql.Types; import java.util.ArrayList; import java.util.Date; import java.util.GregorianCalendar; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.beam.dsls.sql.utils.CalciteUtils; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -35,6 +38,25 @@ import org.joda.time.Instant; * */ public class BeamSqlRow implements Serializable { + private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>(); + static { + SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class); + + SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class); + SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class); + } + private List<Integer> nullFields = new ArrayList<>(); private List<Object> dataValues; private BeamSqlRecordType dataType; @@ -82,78 +104,23 @@ public class BeamSqlRow implements Serializable { } } - SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, index); - switch (fieldType) { - case INTEGER: - if (!(fieldValue instanceof Integer)) { - throw new IllegalArgumentException( - getTypeMismatchErrorMessage(fieldValue, fieldType)); - } - break; - case SMALLINT: - if (!(fieldValue instanceof Short)) { - throw new IllegalArgumentException( - getTypeMismatchErrorMessage(fieldValue, fieldType)); - } - break; - case TINYINT: - if (!(fieldValue instanceof Byte)) { - throw new IllegalArgumentException( - getTypeMismatchErrorMessage(fieldValue, fieldType)); - } - break; - case DOUBLE: - if (!(fieldValue instanceof Double)) { - throw new IllegalArgumentException( - getTypeMismatchErrorMessage(fieldValue, fieldType)); - } - break; - case BIGINT: - if (!(fieldValue instanceof Long)) { - throw new IllegalArgumentException( - getTypeMismatchErrorMessage(fieldValue, fieldType)); - } - break; - case FLOAT: - if (!(fieldValue instanceof Float)) { - throw new IllegalArgumentException( - getTypeMismatchErrorMessage(fieldValue, fieldType)); - } - break; - case DECIMAL: - if (!(fieldValue instanceof BigDecimal)) { - throw new IllegalArgumentException(getTypeMismatchErrorMessage(fieldValue, fieldType)); - } - break; - case VARCHAR: - case CHAR: - if (!(fieldValue instanceof String)) { - throw new IllegalArgumentException( - getTypeMismatchErrorMessage(fieldValue, fieldType)); - } - break; - case TIME: - if (!(fieldValue instanceof GregorianCalendar)) { - throw new IllegalArgumentException( - getTypeMismatchErrorMessage(fieldValue, fieldType)); - } - break; - case TIMESTAMP: - case DATE: - if (!(fieldValue instanceof Date)) { - throw new IllegalArgumentException( - getTypeMismatchErrorMessage(fieldValue, fieldType)); - } - break; - default: - throw new UnsupportedOperationException("Data type: " + fieldType + " not supported yet!"); - } + validateValueType(index, fieldValue); dataValues.set(index, fieldValue); } - private String getTypeMismatchErrorMessage(Object fieldValue, SqlTypeName fieldType) { - return String.format("[%s](%s) doesn't match type [%s]", - fieldValue, fieldValue.getClass(), fieldType); + private void validateValueType(int index, Object fieldValue) { + SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, index); + Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(CalciteUtils.toJavaType(fieldType)); + if (javaClazz == null) { + throw new UnsupportedOperationException("Data type: " + fieldType + " not supported yet!"); + } + + if (!fieldValue.getClass().equals(javaClazz)) { + throw new IllegalArgumentException( + String.format("[%s](%s) doesn't match type [%s]", + fieldValue, fieldValue.getClass(), fieldType) + ); + } } public Object getFieldValue(String fieldName) { @@ -205,85 +172,7 @@ public class BeamSqlRow implements Serializable { return null; } - Object fieldValue = dataValues.get(fieldIdx); - SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, fieldIdx); - - switch (fieldType) { - case INTEGER: - if (!(fieldValue instanceof Integer)) { - throw new IllegalArgumentException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case SMALLINT: - if (!(fieldValue instanceof Short)) { - throw new IllegalArgumentException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case TINYINT: - if (!(fieldValue instanceof Byte)) { - throw new IllegalArgumentException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case DOUBLE: - if (!(fieldValue instanceof Double)) { - throw new IllegalArgumentException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case DECIMAL: - if (!(fieldValue instanceof BigDecimal)) { - throw new IllegalArgumentException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case BIGINT: - if (!(fieldValue instanceof Long)) { - throw new IllegalArgumentException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case FLOAT: - if (!(fieldValue instanceof Float)) { - throw new IllegalArgumentException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case VARCHAR: - case CHAR: - if (!(fieldValue instanceof String)) { - throw new IllegalArgumentException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case TIME: - if (!(fieldValue instanceof GregorianCalendar)) { - throw new IllegalArgumentException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - case TIMESTAMP: - case DATE: - if (!(fieldValue instanceof Date)) { - throw new IllegalArgumentException( - String.format("[%s] doesn't match type [%s]", fieldValue, fieldType)); - } else { - return fieldValue; - } - default: - throw new UnsupportedOperationException("Data type: " + fieldType + " not supported yet!"); - } + return dataValues.get(fieldIdx); } public byte getByte(int idx) {