Enhance exception handling.

Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/16d4ab89
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/16d4ab89
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/16d4ab89

Branch: refs/heads/master
Commit: 16d4ab896df5cb368988220bc90f2834d9651d02
Parents: 48d195c
Author: Aditya <adi...@mapr.com>
Authored: Wed Feb 3 17:46:44 2016 -0800
Committer: Aditya Kishore <a...@apache.org>
Committed: Fri Sep 9 10:08:34 2016 -0700

----------------------------------------------------------------------
 .../maprdb/json/MaprDBJsonRecordReader.java     | 163 +++++++++++--------
 1 file changed, 91 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/16d4ab89/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java
 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java
index f033de4..2e0e0c1 100644
--- 
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java
+++ 
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java
@@ -28,8 +28,8 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
@@ -85,8 +85,6 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
 
   private boolean includeId;
 
-  private String currentFieldName;
-
   public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec,
       List<SchemaPath> projectedColumns, FragmentContext context) {
     buffer = context.getManagedBuffer();
@@ -144,20 +142,24 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
     writer.reset();
 
     int recordCount = 0;
+    DBDocumentReaderBase reader = null;
 
     while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION) {
-      DBDocumentReaderBase reader = nextDocumentReader();
-      if (reader == null) break;
-      writer.setPosition(recordCount);
-      if (reader.next() != EventType.START_MAP) {
-        throw new IllegalStateException("The document did not start with 
START_MAP!");
-      }
       try {
+        reader = nextDocumentReader();
+        if (reader == null) break;
+        writer.setPosition(recordCount);
+        if (reader.next() != EventType.START_MAP) {
+          throw dataReadError("The document did not start with START_MAP!");
+        }
         writeToMap(reader, writer.rootAsMap());
         recordCount++;
-      } catch (IllegalArgumentException e) {
-        logger.warn(String.format("Possible schema change at _id: '%s', field: 
'%s'",
-            IdCodec.asString(reader.getId()), currentFieldName), e);
+      } catch (UserException e) {
+        throw UserException.unsupportedError(e)
+            .addContext(String.format("Table: %s, document id: '%s'",
+                table.getPath(),
+                reader == null ? null : IdCodec.asString(reader.getId())))
+            .build(logger);
       }
     }
 
@@ -172,60 +174,65 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
       EventType event = reader.next();
       if (event == null || event == EventType.END_MAP) break outside;
 
-      currentFieldName = reader.getFieldName();
-      switch (event) {
-      case NULL:
-        map.varChar(currentFieldName).write(null); // treat as VARCHAR for now
-      case BINARY:
-        writeBinary(map.varBinary(currentFieldName), reader.getBinary());
-        break;
-      case BOOLEAN:
-        map.bit(currentFieldName).writeBit(reader.getBoolean() ? 1 : 0);
-        break;
-      case STRING:
-        writeString(map.varChar(currentFieldName), reader.getString());
-        break;
-      case BYTE:
-        map.tinyInt(currentFieldName).writeTinyInt(reader.getByte());
-        break;
-      case SHORT:
-        map.smallInt(currentFieldName).writeSmallInt(reader.getShort());
-        break;
-      case INT:
-        map.integer(currentFieldName).writeInt(reader.getInt());
-        break;
-      case LONG:
-        map.bigInt(currentFieldName).writeBigInt(reader.getLong());
-        break;
-      case FLOAT:
-        map.float4(currentFieldName).writeFloat4(reader.getFloat());
-        break;
-      case DOUBLE:
-        map.float8(currentFieldName).writeFloat8(reader.getDouble());
-        break;
-      case DECIMAL:
-        throw new UnsupportedOperationException("Decimals are currently not 
supported.");
-      case DATE:
-        
map.date(currentFieldName).writeDate(reader.getDate().toDate().getTime());
-        break;
-      case TIME:
-        map.time(currentFieldName).writeTime(reader.getTimeInt());
-        break;
-      case TIMESTAMP:
-        
map.timeStamp(currentFieldName).writeTimeStamp(reader.getTimestampLong());
-        break;
-      case INTERVAL:
-        throw new UnsupportedOperationException("Interval is currently not 
supported.");
-      case START_MAP:
-        writeToMap(reader, map.map(currentFieldName));
-        break;
-      case START_ARRAY:
-        writeToList(reader, map.list(currentFieldName));
-        break;
-      case END_ARRAY:
-        throw new IllegalStateException("Shouldn't get a END_ARRAY inside a 
map");
-      default:
-        throw new UnsupportedOperationException("Unsupported type: " + event);
+      String fieldName = reader.getFieldName();
+      try {
+        switch (event) {
+        case NULL:
+          break; // not setting the field will leave it as null
+        case BINARY:
+          writeBinary(map.varBinary(fieldName), reader.getBinary());
+          break;
+        case BOOLEAN:
+          map.bit(fieldName).writeBit(reader.getBoolean() ? 1 : 0);
+          break;
+        case STRING:
+          writeString(map.varChar(fieldName), reader.getString());
+          break;
+        case BYTE:
+          map.tinyInt(fieldName).writeTinyInt(reader.getByte());
+          break;
+        case SHORT:
+          map.smallInt(fieldName).writeSmallInt(reader.getShort());
+          break;
+        case INT:
+          map.integer(fieldName).writeInt(reader.getInt());
+          break;
+        case LONG:
+          map.bigInt(fieldName).writeBigInt(reader.getLong());
+          break;
+        case FLOAT:
+          map.float4(fieldName).writeFloat4(reader.getFloat());
+          break;
+        case DOUBLE:
+          map.float8(fieldName).writeFloat8(reader.getDouble());
+          break;
+        case DECIMAL:
+          throw unsupportedError("Decimal type is currently not supported.");
+        case DATE:
+          map.date(fieldName).writeDate(reader.getDate().toDate().getTime());
+          break;
+        case TIME:
+          map.time(fieldName).writeTime(reader.getTimeInt());
+          break;
+        case TIMESTAMP:
+          map.timeStamp(fieldName).writeTimeStamp(reader.getTimestampLong());
+          break;
+        case INTERVAL:
+          throw unsupportedError("Interval type is currently not supported.");
+        case START_MAP:
+          writeToMap(reader, map.map(fieldName));
+          break;
+        case START_ARRAY:
+          writeToList(reader, map.list(fieldName));
+          break;
+        case END_ARRAY:
+          throw dataReadError("Encountered an END_ARRAY event inside a map.");
+        default:
+          throw unsupportedError("Unsupported type: %s encountered during the 
query.", event);
+        }
+      } catch (IllegalArgumentException e) {
+        logger.warn(String.format("Possible schema change at _id: '%s', field: 
'%s'",
+            IdCodec.asString(reader.getId()), fieldName), e);
       }
     }
     map.end();
@@ -239,7 +246,7 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
 
       switch (event) {
       case NULL:
-        list.varChar().write(null); // treat as VARCHAR for now
+        throw unsupportedError("Null values are not supported in lists.");
       case BINARY:
         writeBinary(list.varBinary(), reader.getBinary());
         break;
@@ -268,7 +275,7 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
         list.float8().writeFloat8(reader.getDouble());
         break;
       case DECIMAL:
-        throw new UnsupportedOperationException("Decimals are currently not 
supported.");
+        throw unsupportedError("Decimals are currently not supported.");
       case DATE:
         list.date().writeDate(reader.getDate().toDate().getTime());
         break;
@@ -279,17 +286,17 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
         list.timeStamp().writeTimeStamp(reader.getTimestampLong());
         break;
       case INTERVAL:
-        throw new UnsupportedOperationException("Interval is currently not 
supported.");
+        throw unsupportedError("Interval is currently not supported.");
       case START_MAP:
         writeToMap(reader, list.map());
         break;
       case END_MAP:
-        throw new IllegalStateException("Shouldn't get a END_MAP inside a 
list");
+        throw dataReadError("Encountered an END_MAP event inside a list.");
       case START_ARRAY:
         writeToList(reader, list.list());
         break;
       default:
-        throw new UnsupportedOperationException("Unsupported type: " + event);
+        throw unsupportedError("Unsupported type: %s encountered during the 
query.%s", event);
       }
     }
     list.endList();
@@ -308,6 +315,18 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
     varCharWriter.writeVarChar(0, strBytes.length, buffer);
   }
 
+  private UserException unsupportedError(String format, Object... args) {
+    return UserException.unsupportedError()
+        .message(String.format(format, args))
+        .build(logger);
+  }
+
+  private UserException dataReadError(String format, Object... args) {
+    return UserException.dataReadError()
+        .message(String.format(format, args))
+        .build(logger);
+  }
+
   private DBDocumentReaderBase nextDocumentReader() {
     final OperatorStats operatorStats = operatorContext == null ? null : 
operatorContext.getStats();
     try {
@@ -326,7 +345,7 @@ public class MaprDBJsonRecordReader extends 
AbstractRecordReader {
         }
       }
     } catch (DBException e) {
-      throw new DrillRuntimeException(e);
+      throw UserException.dataReadError(e).build(logger);
     }
   }
 

Reply via email to