Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 5fea74638 -> a452b8020


cleanup BeamSqlRow


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bdea7a6b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bdea7a6b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bdea7a6b

Branch: refs/heads/DSL_SQL
Commit: bdea7a6b62f9d131efca04266b3adbec15d66543
Parents: a976ec0
Author: James Xu <xumingmi...@gmail.com>
Authored: Thu Jul 13 18:42:02 2017 +0800
Committer: James Xu <xumingmi...@gmail.com>
Committed: Fri Jul 14 11:15:53 2017 +0800

----------------------------------------------------------------------
 .../apache/beam/dsls/sql/schema/BeamSqlRow.java | 185 ++++---------------
 1 file changed, 37 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bdea7a6b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
index b21a018..082d92a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
@@ -19,10 +19,13 @@ package org.apache.beam.dsls.sql.schema;
 
 import java.io.Serializable;
 import java.math.BigDecimal;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.GregorianCalendar;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -35,6 +38,25 @@ import org.joda.time.Instant;
  *
  */
 public class BeamSqlRow implements Serializable {
+  private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new 
HashMap<>();
+  static {
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
+
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
+    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
+  }
+
   private List<Integer> nullFields = new ArrayList<>();
   private List<Object> dataValues;
   private BeamSqlRecordType dataType;
@@ -82,78 +104,23 @@ public class BeamSqlRow implements Serializable {
       }
     }
 
-    SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, index);
-    switch (fieldType) {
-      case INTEGER:
-        if (!(fieldValue instanceof Integer)) {
-          throw new IllegalArgumentException(
-              getTypeMismatchErrorMessage(fieldValue, fieldType));
-        }
-        break;
-      case SMALLINT:
-        if (!(fieldValue instanceof Short)) {
-          throw new IllegalArgumentException(
-              getTypeMismatchErrorMessage(fieldValue, fieldType));
-        }
-        break;
-      case TINYINT:
-        if (!(fieldValue instanceof Byte)) {
-          throw new IllegalArgumentException(
-              getTypeMismatchErrorMessage(fieldValue, fieldType));
-        }
-        break;
-      case DOUBLE:
-        if (!(fieldValue instanceof Double)) {
-          throw new IllegalArgumentException(
-              getTypeMismatchErrorMessage(fieldValue, fieldType));
-        }
-        break;
-      case BIGINT:
-        if (!(fieldValue instanceof Long)) {
-          throw new IllegalArgumentException(
-              getTypeMismatchErrorMessage(fieldValue, fieldType));
-        }
-        break;
-      case FLOAT:
-        if (!(fieldValue instanceof Float)) {
-          throw new IllegalArgumentException(
-              getTypeMismatchErrorMessage(fieldValue, fieldType));
-        }
-        break;
-      case DECIMAL:
-        if (!(fieldValue instanceof BigDecimal)) {
-          throw new 
IllegalArgumentException(getTypeMismatchErrorMessage(fieldValue, fieldType));
-        }
-        break;
-      case VARCHAR:
-      case CHAR:
-        if (!(fieldValue instanceof String)) {
-          throw new IllegalArgumentException(
-              getTypeMismatchErrorMessage(fieldValue, fieldType));
-        }
-        break;
-      case TIME:
-        if (!(fieldValue instanceof GregorianCalendar)) {
-          throw new IllegalArgumentException(
-              getTypeMismatchErrorMessage(fieldValue, fieldType));
-        }
-        break;
-      case TIMESTAMP:
-      case DATE:
-        if (!(fieldValue instanceof Date)) {
-          throw new IllegalArgumentException(
-              getTypeMismatchErrorMessage(fieldValue, fieldType));
-        }
-        break;
-      default:
-        throw new UnsupportedOperationException("Data type: " + fieldType + " 
not supported yet!");
-    }
+    validateValueType(index, fieldValue);
     dataValues.set(index, fieldValue);
   }
 
-  private String getTypeMismatchErrorMessage(Object fieldValue, SqlTypeName 
fieldType) {
-    return String.format("[%s](%s) doesn't match type [%s]",
-        fieldValue, fieldValue.getClass(), fieldType);
+  private void validateValueType(int index, Object fieldValue) {
+    SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, index);
+    Class javaClazz = 
SQL_TYPE_TO_JAVA_CLASS.get(CalciteUtils.toJavaType(fieldType));
+    if (javaClazz == null) {
+      throw new UnsupportedOperationException("Data type: " + fieldType + " 
not supported yet!");
+    }
+
+    if (!fieldValue.getClass().equals(javaClazz)) {
+      throw new IllegalArgumentException(
+          String.format("[%s](%s) doesn't match type [%s]",
+              fieldValue, fieldValue.getClass(), fieldType)
+      );
+    }
   }
 
   public Object getFieldValue(String fieldName) {
@@ -205,85 +172,7 @@ public class BeamSqlRow implements Serializable {
       return null;
     }
 
-    Object fieldValue = dataValues.get(fieldIdx);
-    SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, fieldIdx);
-
-    switch (fieldType) {
-      case INTEGER:
-        if (!(fieldValue instanceof Integer)) {
-          throw new IllegalArgumentException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        } else {
-          return fieldValue;
-        }
-      case SMALLINT:
-        if (!(fieldValue instanceof Short)) {
-          throw new IllegalArgumentException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        } else {
-          return fieldValue;
-        }
-      case TINYINT:
-        if (!(fieldValue instanceof Byte)) {
-          throw new IllegalArgumentException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        } else {
-          return fieldValue;
-        }
-      case DOUBLE:
-        if (!(fieldValue instanceof Double)) {
-          throw new IllegalArgumentException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        } else {
-          return fieldValue;
-        }
-      case DECIMAL:
-        if (!(fieldValue instanceof BigDecimal)) {
-          throw new IllegalArgumentException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        } else {
-          return fieldValue;
-        }
-      case BIGINT:
-        if (!(fieldValue instanceof Long)) {
-          throw new IllegalArgumentException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        } else {
-          return fieldValue;
-        }
-      case FLOAT:
-        if (!(fieldValue instanceof Float)) {
-          throw new IllegalArgumentException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        } else {
-          return fieldValue;
-        }
-      case VARCHAR:
-      case CHAR:
-        if (!(fieldValue instanceof String)) {
-          throw new IllegalArgumentException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        } else {
-          return fieldValue;
-        }
-      case TIME:
-        if (!(fieldValue instanceof GregorianCalendar)) {
-          throw new IllegalArgumentException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        } else {
-          return fieldValue;
-        }
-      case TIMESTAMP:
-      case DATE:
-        if (!(fieldValue instanceof Date)) {
-          throw new IllegalArgumentException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        } else {
-          return fieldValue;
-        }
-      default:
-        throw new UnsupportedOperationException("Data type: " + fieldType + " 
not supported yet!");
-    }
+    return dataValues.get(fieldIdx);
   }
 
   public byte getByte(int idx) {

Reply via email to