Enhance exception handling.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/16d4ab89 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/16d4ab89 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/16d4ab89 Branch: refs/heads/master Commit: 16d4ab896df5cb368988220bc90f2834d9651d02 Parents: 48d195c Author: Aditya <adi...@mapr.com> Authored: Wed Feb 3 17:46:44 2016 -0800 Committer: Aditya Kishore <a...@apache.org> Committed: Fri Sep 9 10:08:34 2016 -0700 ---------------------------------------------------------------------- .../maprdb/json/MaprDBJsonRecordReader.java | 163 +++++++++++-------- 1 file changed, 91 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/16d4ab89/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java index f033de4..2e0e0c1 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/maprdb/json/MaprDBJsonRecordReader.java @@ -28,8 +28,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; -import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; @@ -85,8 +85,6 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { private boolean includeId; - private String currentFieldName; - public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec, List<SchemaPath> projectedColumns, FragmentContext context) { buffer = context.getManagedBuffer(); @@ -144,20 +142,24 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { writer.reset(); int recordCount = 0; + DBDocumentReaderBase reader = null; while(recordCount < BaseValueVector.INITIAL_VALUE_ALLOCATION) { - DBDocumentReaderBase reader = nextDocumentReader(); - if (reader == null) break; - writer.setPosition(recordCount); - if (reader.next() != EventType.START_MAP) { - throw new IllegalStateException("The document did not start with START_MAP!"); - } try { + reader = nextDocumentReader(); + if (reader == null) break; + writer.setPosition(recordCount); + if (reader.next() != EventType.START_MAP) { + throw dataReadError("The document did not start with START_MAP!"); + } writeToMap(reader, writer.rootAsMap()); recordCount++; - } catch (IllegalArgumentException e) { - logger.warn(String.format("Possible schema change at _id: '%s', field: '%s'", - IdCodec.asString(reader.getId()), currentFieldName), e); + } catch (UserException e) { + throw UserException.unsupportedError(e) + .addContext(String.format("Table: %s, document id: '%s'", + table.getPath(), + reader == null ? null : IdCodec.asString(reader.getId()))) + .build(logger); } } @@ -172,60 +174,65 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { EventType event = reader.next(); if (event == null || event == EventType.END_MAP) break outside; - currentFieldName = reader.getFieldName(); - switch (event) { - case NULL: - map.varChar(currentFieldName).write(null); // treat as VARCHAR for now - case BINARY: - writeBinary(map.varBinary(currentFieldName), reader.getBinary()); - break; - case BOOLEAN: - map.bit(currentFieldName).writeBit(reader.getBoolean() ? 1 : 0); - break; - case STRING: - writeString(map.varChar(currentFieldName), reader.getString()); - break; - case BYTE: - map.tinyInt(currentFieldName).writeTinyInt(reader.getByte()); - break; - case SHORT: - map.smallInt(currentFieldName).writeSmallInt(reader.getShort()); - break; - case INT: - map.integer(currentFieldName).writeInt(reader.getInt()); - break; - case LONG: - map.bigInt(currentFieldName).writeBigInt(reader.getLong()); - break; - case FLOAT: - map.float4(currentFieldName).writeFloat4(reader.getFloat()); - break; - case DOUBLE: - map.float8(currentFieldName).writeFloat8(reader.getDouble()); - break; - case DECIMAL: - throw new UnsupportedOperationException("Decimals are currently not supported."); - case DATE: - map.date(currentFieldName).writeDate(reader.getDate().toDate().getTime()); - break; - case TIME: - map.time(currentFieldName).writeTime(reader.getTimeInt()); - break; - case TIMESTAMP: - map.timeStamp(currentFieldName).writeTimeStamp(reader.getTimestampLong()); - break; - case INTERVAL: - throw new UnsupportedOperationException("Interval is currently not supported."); - case START_MAP: - writeToMap(reader, map.map(currentFieldName)); - break; - case START_ARRAY: - writeToList(reader, map.list(currentFieldName)); - break; - case END_ARRAY: - throw new IllegalStateException("Shouldn't get a END_ARRAY inside a map"); - default: - throw new UnsupportedOperationException("Unsupported type: " + event); + String fieldName = reader.getFieldName(); + try { + switch (event) { + case NULL: + break; // not setting the field will leave it as null + case BINARY: + writeBinary(map.varBinary(fieldName), reader.getBinary()); + break; + case BOOLEAN: + map.bit(fieldName).writeBit(reader.getBoolean() ? 1 : 0); + break; + case STRING: + writeString(map.varChar(fieldName), reader.getString()); + break; + case BYTE: + map.tinyInt(fieldName).writeTinyInt(reader.getByte()); + break; + case SHORT: + map.smallInt(fieldName).writeSmallInt(reader.getShort()); + break; + case INT: + map.integer(fieldName).writeInt(reader.getInt()); + break; + case LONG: + map.bigInt(fieldName).writeBigInt(reader.getLong()); + break; + case FLOAT: + map.float4(fieldName).writeFloat4(reader.getFloat()); + break; + case DOUBLE: + map.float8(fieldName).writeFloat8(reader.getDouble()); + break; + case DECIMAL: + throw unsupportedError("Decimal type is currently not supported."); + case DATE: + map.date(fieldName).writeDate(reader.getDate().toDate().getTime()); + break; + case TIME: + map.time(fieldName).writeTime(reader.getTimeInt()); + break; + case TIMESTAMP: + map.timeStamp(fieldName).writeTimeStamp(reader.getTimestampLong()); + break; + case INTERVAL: + throw unsupportedError("Interval type is currently not supported."); + case START_MAP: + writeToMap(reader, map.map(fieldName)); + break; + case START_ARRAY: + writeToList(reader, map.list(fieldName)); + break; + case END_ARRAY: + throw dataReadError("Encountered an END_ARRAY event inside a map."); + default: + throw unsupportedError("Unsupported type: %s encountered during the query.", event); + } + } catch (IllegalArgumentException e) { + logger.warn(String.format("Possible schema change at _id: '%s', field: '%s'", + IdCodec.asString(reader.getId()), fieldName), e); } } map.end(); @@ -239,7 +246,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { switch (event) { case NULL: - list.varChar().write(null); // treat as VARCHAR for now + throw unsupportedError("Null values are not supported in lists."); case BINARY: writeBinary(list.varBinary(), reader.getBinary()); break; @@ -268,7 +275,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { list.float8().writeFloat8(reader.getDouble()); break; case DECIMAL: - throw new UnsupportedOperationException("Decimals are currently not supported."); + throw unsupportedError("Decimals are currently not supported."); case DATE: list.date().writeDate(reader.getDate().toDate().getTime()); break; @@ -279,17 +286,17 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { list.timeStamp().writeTimeStamp(reader.getTimestampLong()); break; case INTERVAL: - throw new UnsupportedOperationException("Interval is currently not supported."); + throw unsupportedError("Interval is currently not supported."); case START_MAP: writeToMap(reader, list.map()); break; case END_MAP: - throw new IllegalStateException("Shouldn't get a END_MAP inside a list"); + throw dataReadError("Encountered an END_MAP event inside a list."); case START_ARRAY: writeToList(reader, list.list()); break; default: - throw new UnsupportedOperationException("Unsupported type: " + event); + throw unsupportedError("Unsupported type: %s encountered during the query.%s", event); } } list.endList(); @@ -308,6 +315,18 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { varCharWriter.writeVarChar(0, strBytes.length, buffer); } + private UserException unsupportedError(String format, Object... args) { + return UserException.unsupportedError() + .message(String.format(format, args)) + .build(logger); + } + + private UserException dataReadError(String format, Object... args) { + return UserException.dataReadError() + .message(String.format(format, args)) + .build(logger); + } + private DBDocumentReaderBase nextDocumentReader() { final OperatorStats operatorStats = operatorContext == null ? null : operatorContext.getStats(); try { @@ -326,7 +345,7 @@ public class MaprDBJsonRecordReader extends AbstractRecordReader { } } } catch (DBException e) { - throw new DrillRuntimeException(e); + throw UserException.dataReadError(e).build(logger); } }