DRILL-4653: Malformed JSON should not stop the entire query from progressing

This closes #518


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/db482989
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/db482989
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/db482989

Branch: refs/heads/master
Commit: db48298920575cb1c2283e03bdfc7b50e83ae217
Parents: 42948fe
Author: Subbu Srinivasan <ssriniva...@zscaler.com>
Authored: Fri Jun 10 15:58:49 2016 -0700
Committer: Parth Chandra <par...@apache.org>
Committed: Tue Oct 18 10:47:52 2016 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   5 +-
 .../server/options/SystemOptionManager.java     |   4 +
 .../exec/store/easy/json/JSONRecordReader.java  |  68 ++--
 .../exec/store/easy/json/JsonProcessor.java     |   5 +
 .../easy/json/reader/BaseJsonProcessor.java     |  69 +++-
 .../easy/json/reader/CountingJsonReader.java    |  44 ++-
 .../exec/vector/complex/fn/JsonReader.java      | 356 ++++++++++---------
 .../exec/store/json/TestJsonRecordReader.java   | 179 +++++++---
 .../resources/jsoninput/drill4653/file.json     |  10 +
 9 files changed, 476 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/db482989/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 0f2321b..027c942 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -152,7 +152,10 @@ public interface ExecConstants {
   BooleanValidator JSON_EXTENDED_TYPES = new 
BooleanValidator("store.json.extended_types", false);
   BooleanValidator JSON_WRITER_UGLIFY = new 
BooleanValidator("store.json.writer.uglify", false);
   BooleanValidator JSON_WRITER_SKIPNULLFIELDS = new 
BooleanValidator("store.json.writer.skip_null_fields", true);
-
+  String JSON_READER_SKIP_INVALID_RECORDS_FLAG = 
"store.json.reader.skip_invalid_records";
+  BooleanValidator JSON_SKIP_MALFORMED_RECORDS_VALIDATOR = new 
BooleanValidator(JSON_READER_SKIP_INVALID_RECORDS_FLAG, false);
+  String JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG = 
"store.json.reader.print_skipped_invalid_record_number";
+  BooleanValidator JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG_VALIDATOR = 
new BooleanValidator(JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG, false);
   DoubleValidator TEXT_ESTIMATED_ROW_SIZE = new RangeDoubleValidator(
       "store.text.estimated_row_size_bytes", 1, Long.MAX_VALUE, 100.0);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/db482989/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index ee94493..d43c868 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -26,6 +26,7 @@ import java.util.Set;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
 import org.apache.commons.collections.IteratorUtils;
 import org.apache.drill.common.config.LogicalPlanPersistence;
 import org.apache.drill.common.map.CaseInsensitiveMap;
@@ -35,6 +36,7 @@ import org.apache.drill.exec.compile.ClassTransformer;
 import org.apache.drill.exec.compile.QueryClassLoader;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator;
 import org.apache.drill.exec.store.sys.PersistentStore;
 import org.apache.drill.exec.store.sys.PersistentStoreConfig;
 import org.apache.drill.exec.store.sys.PersistentStoreProvider;
@@ -105,6 +107,8 @@ public class SystemOptionManager extends BaseOptionManager 
implements AutoClosea
       ExecConstants.JSON_WRITER_UGLIFY,
       ExecConstants.JSON_WRITER_SKIPNULLFIELDS,
       ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR,
+      ExecConstants.JSON_SKIP_MALFORMED_RECORDS_VALIDATOR,
+      ExecConstants.JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG_VALIDATOR,
       ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL_VALIDATOR,
       ExecConstants.MONGO_READER_ALL_TEXT_MODE_VALIDATOR,
       ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR,

http://git-wip-us.apache.org/repos/asf/drill/blob/db482989/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index dbbe6b0..7d929a1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -20,8 +20,8 @@ package org.apache.drill.exec.store.easy.json;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
-
 import com.google.common.collect.Lists;
+
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -29,7 +29,6 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
@@ -64,6 +63,10 @@ public class JSONRecordReader extends AbstractRecordReader {
   private final boolean enableAllTextMode;
   private final boolean readNumbersAsDouble;
   private final boolean unionEnabled;
+  private long parseErrorCount;
+  private final boolean skipMalformedJSONRecords;
+  private final boolean printSkippedMalformedJSONRecordLineNumber;
+  ReadState write = null;
 
   /**
    * Create a JSON Record Reader that uses a file based input stream.
@@ -109,11 +112,12 @@ public class JSONRecordReader extends 
AbstractRecordReader {
 
     this.fileSystem = fileSystem;
     this.fragmentContext = fragmentContext;
-
     // only enable all text mode if we aren't using embedded content mode.
     this.enableAllTextMode = embeddedContent == null && 
fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR);
     this.readNumbersAsDouble = embeddedContent == null && 
fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR);
     this.unionEnabled = embeddedContent == null && 
fragmentContext.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+    this.skipMalformedJSONRecords = 
fragmentContext.getOptions().getOption(ExecConstants.JSON_SKIP_MALFORMED_RECORDS_VALIDATOR);
+    this.printSkippedMalformedJSONRecordLineNumber = 
fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG_VALIDATOR);
     setColumns(columns);
   }
 
@@ -122,7 +126,8 @@ public class JSONRecordReader extends AbstractRecordReader {
     return super.toString()
         + "[hadoopPath = " + hadoopPath
         + ", recordCount = " + recordCount
-        + ", runningRecordCount = " + runningRecordCount + ", ...]";
+        + ", parseErrorCount = " + parseErrorCount
+         + ", runningRecordCount = " + runningRecordCount + ", ...]";
   }
 
   @Override
@@ -154,6 +159,7 @@ public class JSONRecordReader extends AbstractRecordReader {
     }else{
       jsonReader.setSource(embeddedContent);
     }
+    jsonReader.setIgnoreJSONParseErrors(skipMalformedJSONRecords);
   }
 
   protected void handleAndRaise(String suffix, Exception e) throws 
UserException {
@@ -189,39 +195,43 @@ public class JSONRecordReader extends 
AbstractRecordReader {
   public int next() {
     writer.allocate();
     writer.reset();
-
     recordCount = 0;
-    ReadState write = null;
-//    Stopwatch p = new Stopwatch().start();
-    try{
-      outside: while(recordCount < DEFAULT_ROWS_PER_BATCH) {
+    parseErrorCount = 0;
+    if(write == ReadState.JSON_RECORD_PARSE_EOF_ERROR){
+      return recordCount;
+    }
+    outside: while(recordCount < DEFAULT_ROWS_PER_BATCH){
+      try{
         writer.setPosition(recordCount);
         write = jsonReader.write(writer);
-
-        if(write == ReadState.WRITE_SUCCEED) {
-//          logger.debug("Wrote record.");
+        if(write == ReadState.WRITE_SUCCEED){
           recordCount++;
-        }else{
-//          logger.debug("Exiting.");
+        }
+        else if(write == ReadState.JSON_RECORD_PARSE_ERROR || write == 
ReadState.JSON_RECORD_PARSE_EOF_ERROR){
+          if(skipMalformedJSONRecords == false){
+            handleAndRaise("Error parsing JSON", new 
Exception(hadoopPath.getName() + " : line nos :" + (recordCount+1)));
+          }
+          ++parseErrorCount;
+          if(printSkippedMalformedJSONRecordLineNumber){
+            logger.debug("Error parsing JSON in " + hadoopPath.getName() + " : 
line nos :" + (recordCount+parseErrorCount));
+          }
+          if(write == ReadState.JSON_RECORD_PARSE_EOF_ERROR){
+            break outside;
+          }
+        }
+        else{
           break outside;
         }
-
       }
-
-      jsonReader.ensureAtLeastOneField(writer);
-
-      writer.setValueCount(recordCount);
-//      p.stop();
-//      System.out.println(String.format("Wrote %d records in %dms.", 
recordCount, p.elapsed(TimeUnit.MILLISECONDS)));
-
-      updateRunningCount();
-      return recordCount;
-
-    } catch (final Exception e) {
-      handleAndRaise("Error parsing JSON", e);
+      catch(IOException ex)
+        {
+           handleAndRaise("Error parsing JSON", ex);
+        }
     }
-    // this is never reached
-    return 0;
+    jsonReader.ensureAtLeastOneField(writer);
+    writer.setValueCount(recordCount);
+    updateRunningCount();
+    return recordCount;
   }
 
   private void updateRunningCount() {

http://git-wip-us.apache.org/repos/asf/drill/blob/db482989/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
index 4d8d4ba..179a134 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonProcessor.java
@@ -30,6 +30,8 @@ public interface JsonProcessor {
 
   public static enum ReadState {
     END_OF_STREAM,
+    JSON_RECORD_PARSE_ERROR,
+    JSON_RECORD_PARSE_EOF_ERROR,
     WRITE_SUCCEED
   }
 
@@ -50,4 +52,7 @@ public interface JsonProcessor {
                                                        String msg,
                                                        Object... args);
 
+  public boolean ignoreJSONParseError() ;
+
+  public void setIgnoreJSONParseErrors(boolean ignoreJSONParseErrors);
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/db482989/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
index a89fa86..95ebe6e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonProcessor.java
@@ -25,20 +25,38 @@ import java.io.InputStream;
 import org.apache.drill.exec.store.easy.json.JsonProcessor;
 
 import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.TreeTraversingParser;
 import com.google.common.base.Preconditions;
+
 import org.apache.drill.common.exceptions.UserException;
 
 public abstract class BaseJsonProcessor implements JsonProcessor {
 
-  private static final ObjectMapper MAPPER = new ObjectMapper()
-    .configure(JsonParser.Feature.ALLOW_COMMENTS, true)
-    .configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
+  private static final ObjectMapper MAPPER = new ObjectMapper().configure(
+      JsonParser.Feature.ALLOW_COMMENTS, true).configure(
+      JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
+
+  private static final String JACKSON_PARSER_EOF_FILE_MSG = "Unexpected 
end-of-input:";
+
+  public static enum JsonExceptionProcessingState {
+    END_OF_STREAM, PROC_SUCCEED
+  }
 
   protected JsonParser parser;
   protected DrillBuf workBuf;
+  protected JsonToken lastSeenJsonToken = null;
+  boolean ignoreJSONParseErrors = false; // default False
+
+  public boolean ignoreJSONParseError() {
+    return ignoreJSONParseErrors;
+  }
+
+  public void setIgnoreJSONParseErrors(boolean ignoreJSONParseErrors) {
+    this.ignoreJSONParseErrors = ignoreJSONParseErrors;
+  }
 
   public BaseJsonProcessor(DrillBuf workBuf) {
     workBuf = Preconditions.checkNotNull(workBuf);
@@ -55,27 +73,52 @@ public abstract class BaseJsonProcessor implements 
JsonProcessor {
   }
 
   @Override
-  public UserException.Builder getExceptionWithContext(UserException.Builder 
exceptionBuilder,
-                                                       String field,
-                                                       String msg,
-                                                       Object... args) {
+  public UserException.Builder getExceptionWithContext(
+      UserException.Builder exceptionBuilder, String field, String msg,
+      Object... args) {
     if (msg != null) {
       exceptionBuilder.message(msg, args);
     }
-    if(field != null) {
+    if (field != null) {
       exceptionBuilder.pushContext("Field ", field);
     }
-    exceptionBuilder.pushContext("Column ", 
parser.getCurrentLocation().getColumnNr()+1)
-            .pushContext("Line ", parser.getCurrentLocation().getLineNr());
+    exceptionBuilder.pushContext("Column ",
+        parser.getCurrentLocation().getColumnNr() + 1).pushContext("Line ",
+        parser.getCurrentLocation().getLineNr());
     return exceptionBuilder;
   }
 
   @Override
   public UserException.Builder getExceptionWithContext(Throwable e,
-                                                       String field,
-                                                       String msg,
-                                                       Object... args) {
+      String field, String msg, Object... args) {
     UserException.Builder exceptionBuilder = UserException.dataReadError(e);
     return getExceptionWithContext(exceptionBuilder, field, msg, args);
   }
+
+  /*
+   * DRILL - 4653 This method processes JSON tokens until it reaches end of the
+   * current line when it processes start of a new JSON line { - return
+   * PROC_SUCCEED when it sees EOF the stream - there may not be a closing }
+   */
+
+  protected JsonExceptionProcessingState processJSONException()
+      throws IOException {
+    while (!parser.isClosed()) {
+      try {
+        JsonToken currentToken = parser.nextToken();
+        if(currentToken ==  JsonToken.START_OBJECT && (lastSeenJsonToken == 
JsonToken.END_OBJECT || lastSeenJsonToken == null))
+        {
+          lastSeenJsonToken =currentToken;
+          break;
+        }
+        lastSeenJsonToken =currentToken;
+        } catch (com.fasterxml.jackson.core.JsonParseException ex1) {
+        if (ex1.getOriginalMessage().startsWith(JACKSON_PARSER_EOF_FILE_MSG)) {
+          return JsonExceptionProcessingState.END_OF_STREAM;
+        }
+       continue;
+       }
+    }
+    return JsonExceptionProcessingState.PROC_SUCCEED;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/db482989/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
index c4ab1ee..5f7a7a4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java
@@ -20,8 +20,11 @@ package org.apache.drill.exec.store.easy.json.reader;
 import java.io.IOException;
 
 import com.fasterxml.jackson.core.JsonToken;
+
 import io.netty.buffer.DrillBuf;
-import org.apache.drill.exec.store.easy.json.JsonProcessor;
+
+import org.apache.drill.exec.store.easy.json.JsonProcessor.ReadState;
+import 
org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor.JsonExceptionProcessingState;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter;
 
 public class CountingJsonReader extends BaseJsonProcessor {
@@ -32,14 +35,39 @@ public class CountingJsonReader extends BaseJsonProcessor {
 
   @Override
   public ReadState write(BaseWriter.ComplexWriter writer) throws IOException {
-    final JsonToken token = parser.nextToken();
-    if (!parser.hasCurrentToken()) {
-      return ReadState.END_OF_STREAM;
-    } else if (token != JsonToken.START_OBJECT) {
-      throw new IllegalStateException(String.format("Cannot read from the 
middle of a record. Current token was %s", token));
+    try {
+      JsonToken token = lastSeenJsonToken;
+      if (token == null || token == JsonToken.END_OBJECT){
+        token = parser.nextToken();
+      }
+      lastSeenJsonToken = null;
+      if (!parser.hasCurrentToken()) {
+        return ReadState.END_OF_STREAM;
+      } else if (token != JsonToken.START_OBJECT) {
+        throw new com.fasterxml.jackson.core.JsonParseException(
+            parser,
+            String
+                .format(
+                    "Cannot read from the middle of a record. Current token 
was %s ",
+                    token));
+        // throw new
+        // IllegalStateException(String.format("Cannot read from the middle of 
a record. Current token was %s",
+        // token));
+      }
+      writer.rootAsMap().bit("count").writeBit(1);
+      parser.skipChildren();
+    } catch (com.fasterxml.jackson.core.JsonParseException ex) {
+      if (ignoreJSONParseError()) {
+        if (processJSONException() == 
JsonExceptionProcessingState.END_OF_STREAM){
+          return ReadState.JSON_RECORD_PARSE_EOF_ERROR;
+        }
+        else{
+          return ReadState.JSON_RECORD_PARSE_ERROR;
+        }
+      } else {
+        throw ex;
+      }
     }
-    writer.rootAsMap().bit("count").writeBit(1);
-    parser.skipChildren();
     return ReadState.WRITE_SUCCEED;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/db482989/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 64ee449..1bc5eaa 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -28,7 +28,9 @@ import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.store.easy.json.JsonProcessor.ReadState;
 import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor;
+import 
org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor.JsonExceptionProcessingState;
 import org.apache.drill.exec.vector.complex.fn.VectorOutput.ListVectorOutput;
 import org.apache.drill.exec.vector.complex.fn.VectorOutput.MapVectorOutput;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter;
@@ -44,7 +46,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 public class JsonReader extends BaseJsonProcessor {
-  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(JsonReader.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
+      .getLogger(JsonReader.class);
   public final static int MAX_RECORD_SIZE = 128 * 1024;
 
   private final WorkingBuffer workingBuffer;
@@ -56,12 +59,14 @@ public class JsonReader extends BaseJsonProcessor {
   private final boolean readNumbersAsDouble;
 
   /**
-   * Describes whether or not this reader can unwrap a single root array 
record and treat it like a set of distinct records.
+   * Describes whether or not this reader can unwrap a single root array record
+   * and treat it like a set of distinct records.
    */
   private final boolean skipOuterList;
 
   /**
-   * Whether the reader is currently in a situation where we are unwrapping an 
outer list.
+   * Whether the reader is currently in a situation where we are unwrapping an
+   * outer list.
    */
   private boolean inOuterList;
   /**
@@ -71,11 +76,14 @@ public class JsonReader extends BaseJsonProcessor {
 
   private FieldSelection selection;
 
-  public JsonReader(DrillBuf managedBuf, boolean allTextMode, boolean 
skipOuterList, boolean readNumbersAsDouble) {
-    this(managedBuf, GroupScan.ALL_COLUMNS, allTextMode, skipOuterList, 
readNumbersAsDouble);
+  public JsonReader(DrillBuf managedBuf, boolean allTextMode,
+      boolean skipOuterList, boolean readNumbersAsDouble) {
+    this(managedBuf, GroupScan.ALL_COLUMNS, allTextMode, skipOuterList,
+        readNumbersAsDouble);
   }
 
-  public JsonReader(DrillBuf managedBuf, List<SchemaPath> columns, boolean 
allTextMode, boolean skipOuterList, boolean readNumbersAsDouble) {
+  public JsonReader(DrillBuf managedBuf, List<SchemaPath> columns,
+      boolean allTextMode, boolean skipOuterList, boolean readNumbersAsDouble) 
{
     super(managedBuf);
     assert Preconditions.checkNotNull(columns).size() > 0 : "JSON record 
reader requires at least one column";
     this.selection = FieldSelection.getFieldSelection(columns);
@@ -85,7 +93,7 @@ public class JsonReader extends BaseJsonProcessor {
     this.columns = columns;
     this.mapOutput = new MapVectorOutput(workingBuffer);
     this.listOutput = new ListVectorOutput(workingBuffer);
-    this.currentFieldName="<none>";
+    this.currentFieldName = "<none>";
     this.readNumbersAsDouble = readNumbersAsDouble;
   }
 
@@ -100,7 +108,7 @@ public class JsonReader extends BaseJsonProcessor {
       SchemaPath sp = columns.get(i);
       PathSegment fieldPath = sp.getRootSegment();
       BaseWriter.MapWriter fieldWriter = writer.rootAsMap();
-      while (fieldPath.getChild() != null && ! fieldPath.getChild().isArray()) 
{
+      while (fieldPath.getChild() != null && !fieldPath.getChild().isArray()) {
         fieldWriter = fieldWriter.map(fieldPath.getNameSegment().getPath());
         fieldPath = fieldPath.getChild();
       }
@@ -110,12 +118,18 @@ public class JsonReader extends BaseJsonProcessor {
         emptyStatus.set(i, true);
       }
       if (i == 0 && !allTextMode) {
-        // when allTextMode is false, there is not much benefit to producing 
all the empty
-        // fields; just produce 1 field.  The reason is that the type of the 
fields is
-        // unknown, so if we produce multiple Integer fields by default, a 
subsequent batch
-        // that contains non-integer fields will error out in any case.  
Whereas, with
-        // allTextMode true, we are sure that all fields are going to be 
treated as varchar,
-        // so it makes sense to produce all the fields, and in fact is 
necessary in order to
+        // when allTextMode is false, there is not much benefit to producing 
all
+        // the empty
+        // fields; just produce 1 field. The reason is that the type of the
+        // fields is
+        // unknown, so if we produce multiple Integer fields by default, a
+        // subsequent batch
+        // that contains non-integer fields will error out in any case. 
Whereas,
+        // with
+        // allTextMode true, we are sure that all fields are going to be 
treated
+        // as varchar,
+        // so it makes sense to produce all the fields, and in fact is 
necessary
+        // in order to
         // avoid schema change exceptions by downstream operators.
         break;
       }
@@ -123,8 +137,10 @@ public class JsonReader extends BaseJsonProcessor {
     }
 
     // second pass: create default typed vectors corresponding to empty fields
-    // Note: this is not easily do-able in 1 pass because the same fieldWriter 
may be
-    // shared by multiple fields whereas we want to keep track of all fields 
independently,
+    // Note: this is not easily do-able in 1 pass because the same fieldWriter
+    // may be
+    // shared by multiple fields whereas we want to keep track of all fields
+    // independently,
     // so we rely on the emptyStatus.
     for (int j = 0; j < fieldPathList.size(); j++) {
       BaseWriter.MapWriter fieldWriter = writerList.get(j);
@@ -143,7 +159,6 @@ public class JsonReader extends BaseJsonProcessor {
     setSource(DrillBufInputStream.getStream(start, end, buf));
   }
 
-
   @Override
   public void setSource(InputStream is) throws IOException {
     super.setSource(is);
@@ -168,102 +183,116 @@ public class JsonReader extends BaseJsonProcessor {
 
   @Override
   public ReadState write(ComplexWriter writer) throws IOException {
-    JsonToken t = parser.nextToken();
 
-    while (!parser.hasCurrentToken() && !parser.isClosed()) {
-      t = parser.nextToken();
-    }
+    ReadState readState = null;
+    try {
+      JsonToken t = lastSeenJsonToken;
+      if (t == null || t == JsonToken.END_OBJECT) {
+        t = parser.nextToken();
+      }
+      while (!parser.hasCurrentToken() && !parser.isClosed()) {
+        t = parser.nextToken();
+      }
+      lastSeenJsonToken = null;
 
-    if (parser.isClosed()) {
-      return ReadState.END_OF_STREAM;
-    }
+      if (parser.isClosed()) {
+        return ReadState.END_OF_STREAM;
+      }
 
-    ReadState readState = writeToVector(writer, t);
+      readState = writeToVector(writer, t);
 
-    switch (readState) {
-    case END_OF_STREAM:
-      break;
-    case WRITE_SUCCEED:
-      break;
-    default:
-      throw
-        getExceptionWithContext(
-          UserException.dataReadError(), currentFieldName, null)
-          .message("Failure while reading JSON. (Got an invalid read state %s 
)", readState.toString())
-          .build(logger);
+      switch (readState) {
+      case END_OF_STREAM:
+        break;
+      case WRITE_SUCCEED:
+        break;
+      default:
+        throw getExceptionWithContext(UserException.dataReadError(),
+            currentFieldName, null).message(
+            "Failure while reading JSON. (Got an invalid read state %s )",
+            readState.toString()).build(logger);
+      }
+    } catch (com.fasterxml.jackson.core.JsonParseException ex) {
+      if (ignoreJSONParseError()) {
+        if (processJSONException() == 
JsonExceptionProcessingState.END_OF_STREAM) {
+          return ReadState.JSON_RECORD_PARSE_EOF_ERROR;
+        } else {
+          return ReadState.JSON_RECORD_PARSE_ERROR;
+        }
+      } else {
+        throw ex;
+      }
     }
-
     return readState;
   }
 
-  private void confirmLast() throws IOException{
+  private void confirmLast() throws IOException {
     parser.nextToken();
-    if(!parser.isClosed()){
-      throw
-        getExceptionWithContext(
-          UserException.dataReadError(), currentFieldName, null)
-        .message("Drill attempted to unwrap a toplevel list "
-          + "in your document.  However, it appears that there is trailing 
content after this top level list.  Drill only "
-          + "supports querying a set of distinct maps or a single json array 
with multiple inner maps.")
-        .build(logger);
+    if (!parser.isClosed()) {
+      throw getExceptionWithContext(UserException.dataReadError(),
+          currentFieldName, null)
+          .message(
+              "Drill attempted to unwrap a toplevel list "
+                  + "in your document.  However, it appears that there is 
trailing content after this top level list.  Drill only "
+                  + "supports querying a set of distinct maps or a single json 
array with multiple inner maps.")
+          .build(logger);
     }
   }
 
-  private ReadState writeToVector(ComplexWriter writer, JsonToken t) throws 
IOException {
+  private ReadState writeToVector(ComplexWriter writer, JsonToken t)
+      throws IOException {
+
     switch (t) {
     case START_OBJECT:
       writeDataSwitch(writer.rootAsMap());
       break;
     case START_ARRAY:
-      if(inOuterList){
-        throw
-          getExceptionWithContext(
-            UserException.dataReadError(), currentFieldName, null)
-          .message("The top level of your document must either be a single 
array of maps or a set "
-            + "of white space delimited maps.")
-          .build(logger);
+      if (inOuterList) {
+        throw getExceptionWithContext(UserException.dataReadError(),
+            currentFieldName, null)
+            .message(
+                "The top level of your document must either be a single array 
of maps or a set "
+                    + "of white space delimited maps.").build(logger);
       }
 
-      if(skipOuterList){
+      if (skipOuterList) {
         t = parser.nextToken();
-        if(t == JsonToken.START_OBJECT){
+        if (t == JsonToken.START_OBJECT) {
           inOuterList = true;
           writeDataSwitch(writer.rootAsMap());
-        }else{
-          throw
-            getExceptionWithContext(
-              UserException.dataReadError(), currentFieldName, null)
-            .message("The top level of your document must either be a single 
array of maps or a set "
-              + "of white space delimited maps.")
-            .build(logger);
+        } else {
+          throw getExceptionWithContext(UserException.dataReadError(),
+              currentFieldName, null)
+              .message(
+                  "The top level of your document must either be a single 
array of maps or a set "
+                      + "of white space delimited maps.").build(logger);
         }
 
-      }else{
+      } else {
         writeDataSwitch(writer.rootAsList());
       }
       break;
     case END_ARRAY:
 
-      if(inOuterList){
+      if (inOuterList) {
         confirmLast();
         return ReadState.END_OF_STREAM;
-      }else{
-        throw
-          getExceptionWithContext(
-            UserException.dataReadError(), currentFieldName, null)
-          .message("Failure while parsing JSON.  Ran across unexpected %s.", 
JsonToken.END_ARRAY)
-          .build(logger);
+      } else {
+        throw getExceptionWithContext(UserException.dataReadError(),
+            currentFieldName, null).message(
+            "Failure while parsing JSON.  Ran across unexpected %s.",
+            JsonToken.END_ARRAY).build(logger);
       }
 
     case NOT_AVAILABLE:
       return ReadState.END_OF_STREAM;
     default:
-      throw
-        getExceptionWithContext(
-          UserException.dataReadError(), currentFieldName, null)
-          .message("Failure while parsing JSON.  Found token of [%s].  Drill 
currently only supports parsing "
-              + "json strings that contain either lists or maps.  The root 
object cannot be a scalar.", t)
-          .build(logger);
+      throw getExceptionWithContext(UserException.dataReadError(),
+          currentFieldName, null)
+          .message(
+              "Failure while parsing JSON.  Found token of [%s].  Drill 
currently only supports parsing "
+                  + "json strings that contain either lists or maps.  The root 
object cannot be a scalar.",
+              t).build(logger);
     }
 
     return ReadState.WRITE_SUCCEED;
@@ -304,16 +333,17 @@ public class JsonReader extends BaseJsonProcessor {
    * @param map
    * @param selection
    * @param moveForward
-   *          Whether or not we should start with using the current token or 
the next token. If moveForward = true, we
-   *          should start with the next token and ignore the current one.
+   *          Whether or not we should start with using the current token or 
the
+   *          next token. If moveForward = true, we should start with the next
+   *          token and ignore the current one.
    * @throws IOException
    */
-  private void writeData(MapWriter map, FieldSelection selection, boolean 
moveForward) throws IOException {
+  private void writeData(MapWriter map, FieldSelection selection,
+      boolean moveForward) throws IOException {
     //
     map.start();
     try {
-      outside:
-      while (true) {
+      outside: while (true) {
 
         JsonToken t;
         if (moveForward) {
@@ -322,12 +352,12 @@ public class JsonReader extends BaseJsonProcessor {
           t = parser.getCurrentToken();
           moveForward = true;
         }
-
         if (t == JsonToken.NOT_AVAILABLE || t == JsonToken.END_OBJECT) {
           return;
         }
 
-        assert t == JsonToken.FIELD_NAME : String.format("Expected FIELD_NAME 
but got %s.", t.name());
+        assert t == JsonToken.FIELD_NAME : String.format(
+            "Expected FIELD_NAME but got %s.", t.name());
 
         final String fieldName = parser.getText();
         this.currentFieldName = fieldName;
@@ -375,11 +405,9 @@ public class JsonReader extends BaseJsonProcessor {
           break;
 
         default:
-          throw
-                  getExceptionWithContext(
-                          UserException.dataReadError(), currentFieldName, 
null)
-                          .message("Unexpected token %s", 
parser.getCurrentToken())
-                          .build(logger);
+          throw getExceptionWithContext(UserException.dataReadError(),
+              currentFieldName, null).message("Unexpected token %s",
+              parser.getCurrentToken()).build(logger);
         }
 
       }
@@ -389,26 +417,26 @@ public class JsonReader extends BaseJsonProcessor {
 
   }
 
-  private void writeDataAllText(MapWriter map, FieldSelection selection, 
boolean moveForward) throws IOException {
+  private void writeDataAllText(MapWriter map, FieldSelection selection,
+      boolean moveForward) throws IOException {
     //
     map.start();
     outside: while (true) {
 
-
       JsonToken t;
 
-      if(moveForward){
+      if (moveForward) {
         t = parser.nextToken();
-      }else{
+      } else {
         t = parser.getCurrentToken();
         moveForward = true;
       }
-
-      if (t == JsonToken.NOT_AVAILABLE || t == JsonToken.END_OBJECT) {
+       if (t == JsonToken.NOT_AVAILABLE || t == JsonToken.END_OBJECT) {
         return;
       }
 
-      assert t == JsonToken.FIELD_NAME : String.format("Expected FIELD_NAME 
but got %s.", t.name());
+      assert t == JsonToken.FIELD_NAME : String.format(
+          "Expected FIELD_NAME but got %s.", t.name());
 
       final String fieldName = parser.getText();
       this.currentFieldName = fieldName;
@@ -443,11 +471,9 @@ public class JsonReader extends BaseJsonProcessor {
         break;
 
       default:
-        throw
-          getExceptionWithContext(
-            UserException.dataReadError(), currentFieldName, null)
-          .message("Unexpected token %s", parser.getCurrentToken())
-          .build(logger);
+        throw getExceptionWithContext(UserException.dataReadError(),
+            currentFieldName, null).message("Unexpected token %s",
+            parser.getCurrentToken()).build(logger);
       }
     }
     map.end();
@@ -455,13 +481,16 @@ public class JsonReader extends BaseJsonProcessor {
   }
 
   /**
-   * Will attempt to take the current value and consume it as an extended 
value (if extended mode is enabled).  Whether extended is enable or disabled, 
will consume the next token in the stream.
+   * Will attempt to take the current value and consume it as an extended value
+   * (if extended mode is enabled). Whether extended is enable or disabled, 
will
+   * consume the next token in the stream.
    * @param writer
    * @param fieldName
    * @return
    * @throws IOException
    */
-  private boolean writeMapDataIfTyped(MapWriter writer, String fieldName) 
throws IOException {
+  private boolean writeMapDataIfTyped(MapWriter writer, String fieldName)
+      throws IOException {
     if (extended) {
       return mapOutput.run(writer, fieldName);
     } else {
@@ -471,7 +500,9 @@ public class JsonReader extends BaseJsonProcessor {
   }
 
   /**
-   * Will attempt to take the current value and consume it as an extended 
value (if extended mode is enabled).  Whether extended is enable or disabled, 
will consume the next token in the stream.
+   * Will attempt to take the current value and consume it as an extended value
+   * (if extended mode is enabled). Whether extended is enable or disabled, 
will
+   * consume the next token in the stream.
    * @param writer
    * @return
    * @throws IOException
@@ -485,69 +516,76 @@ public class JsonReader extends BaseJsonProcessor {
     }
   }
 
-  private void handleString(JsonParser parser, MapWriter writer, String 
fieldName) throws IOException {
-    writer.varChar(fieldName).writeVarChar(0, 
workingBuffer.prepareVarCharHolder(parser.getText()),
+  private void handleString(JsonParser parser, MapWriter writer,
+      String fieldName) throws IOException {
+    writer.varChar(fieldName).writeVarChar(0,
+        workingBuffer.prepareVarCharHolder(parser.getText()),
         workingBuffer.getBuf());
   }
 
-  private void handleString(JsonParser parser, ListWriter writer) throws 
IOException {
-    writer.varChar().writeVarChar(0, 
workingBuffer.prepareVarCharHolder(parser.getText()), workingBuffer.getBuf());
+  private void handleString(JsonParser parser, ListWriter writer)
+      throws IOException {
+    writer.varChar().writeVarChar(0,
+        workingBuffer.prepareVarCharHolder(parser.getText()),
+        workingBuffer.getBuf());
   }
 
   private void writeData(ListWriter list) throws IOException {
     list.startList();
     outside: while (true) {
       try {
-      switch (parser.nextToken()) {
-      case START_ARRAY:
-        writeData(list.list());
-        break;
-      case START_OBJECT:
-        if (!writeListDataIfTyped(list)) {
-          writeData(list.map(), FieldSelection.ALL_VALID, false);
-        }
-        break;
-      case END_ARRAY:
-      case END_OBJECT:
-        break outside;
+        switch (parser.nextToken()) {
+        case START_ARRAY:
+          writeData(list.list());
+          break;
+        case START_OBJECT:
+          if (!writeListDataIfTyped(list)) {
+            writeData(list.map(), FieldSelection.ALL_VALID, false);
+          }
+          break;
+        case END_ARRAY:
+        case END_OBJECT:
+          break outside;
 
-      case VALUE_EMBEDDED_OBJECT:
-      case VALUE_FALSE: {
-        list.bit().writeBit(0);
-        break;
-      }
-      case VALUE_TRUE: {
-        list.bit().writeBit(1);
-        break;
-      }
-      case VALUE_NULL:
-        throw UserException.unsupportedError()
-          .message("Null values are not supported in lists by default. " +
-            "Please set `store.json.all_text_mode` to true to read lists 
containing nulls. " +
-            "Be advised that this will treat JSON null values as a string 
containing the word 'null'.")
-          .build(logger);
-      case VALUE_NUMBER_FLOAT:
-        list.float8().writeFloat8(parser.getDoubleValue());
-        break;
-      case VALUE_NUMBER_INT:
-        if (this.readNumbersAsDouble) {
-          list.float8().writeFloat8(parser.getDoubleValue());
+        case VALUE_EMBEDDED_OBJECT:
+        case VALUE_FALSE: {
+          list.bit().writeBit(0);
+          break;
         }
-        else {
-          list.bigInt().writeBigInt(parser.getLongValue());
+        case VALUE_TRUE: {
+          list.bit().writeBit(1);
+          break;
         }
-        break;
-      case VALUE_STRING:
-        handleString(parser, list);
-        break;
-      default:
-        throw UserException.dataReadError()
-          .message("Unexpected token %s", parser.getCurrentToken())
-          .build(logger);
-    }
-    } catch (Exception e) {
-      throw getExceptionWithContext(e, this.currentFieldName, 
null).build(logger);
-    }
+        case VALUE_NULL:
+          throw UserException
+              .unsupportedError()
+              .message(
+                  "Null values are not supported in lists by default. "
+                      + "Please set `store.json.all_text_mode` to true to read 
lists containing nulls. "
+                      + "Be advised that this will treat JSON null values as a 
string containing the word 'null'.")
+              .build(logger);
+        case VALUE_NUMBER_FLOAT:
+          list.float8().writeFloat8(parser.getDoubleValue());
+          break;
+        case VALUE_NUMBER_INT:
+          if (this.readNumbersAsDouble) {
+            list.float8().writeFloat8(parser.getDoubleValue());
+          } else {
+            list.bigInt().writeBigInt(parser.getLongValue());
+          }
+          break;
+        case VALUE_STRING:
+          handleString(parser, list);
+          break;
+        default:
+          throw UserException.dataReadError()
+              .message("Unexpected token %s", parser.getCurrentToken())
+              .build(logger);
+        }
+      } catch (Exception e) {
+        throw getExceptionWithContext(e, this.currentFieldName, null).build(
+            logger);
+      }
     }
     list.endList();
 
@@ -580,11 +618,9 @@ public class JsonReader extends BaseJsonProcessor {
         handleString(parser, list);
         break;
       default:
-        throw
-          getExceptionWithContext(
-            UserException.dataReadError(), currentFieldName, null)
-          .message("Unexpected token %s", parser.getCurrentToken())
-          .build(logger);
+        throw getExceptionWithContext(UserException.dataReadError(),
+            currentFieldName, null).message("Unexpected token %s",
+            parser.getCurrentToken()).build(logger);
       }
     }
     list.endList();

http://git-wip-us.apache.org/repos/asf/drill/blob/db482989/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
index 02b98fc..342bea4 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.store.json;
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.ExecConstants;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.Assert;
 
@@ -27,15 +29,16 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class TestJsonRecordReader extends BaseTestQuery {
-  //private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestJsonRecordReader.class);
+  // private static final org.slf4j.Logger logger =
+  // org.slf4j.LoggerFactory.getLogger(TestJsonRecordReader.class);
 
   @Test
   public void testComplexJsonInput() throws Exception {
-//  test("select z[0]['orange']  from cp.`jsoninput/input2.json` limit 10");
+    // test("select z[0]['orange']  from cp.`jsoninput/input2.json` limit 10");
     test("select `integer`, x['y'] as x1, x['y'] as x2, z[0], z[0]['orange'], 
z[1]['pink']  from cp.`jsoninput/input2.json` limit 10 ");
-//    test("select x from cp.`jsoninput/input2.json`");
+    // test("select x from cp.`jsoninput/input2.json`");
 
-//    test("select z[0]  from cp.`jsoninput/input2.json` limit 10");
+    // test("select z[0]  from cp.`jsoninput/input2.json` limit 10");
   }
 
   @Test
@@ -45,8 +48,8 @@ public class TestJsonRecordReader extends BaseTestQuery {
 
   @Test
   public void testComplexMultipleTimes() throws Exception {
-    for(int i =0 ; i < 5; i++) {
-    test("select * from cp.`join/merge_join.json`");
+    for (int i = 0; i < 5; i++) {
+      test("select * from cp.`join/merge_join.json`");
     }
   }
 
@@ -55,11 +58,13 @@ public class TestJsonRecordReader extends BaseTestQuery {
     test("select * from cp.`limit/test1.json` limit 10");
   }
 
-  @Test// DRILL-1634 : retrieve an element in a nested array in a repeated 
map.  RepeatedMap (Repeated List (Repeated varchar))
+  @Test
+  // DRILL-1634 : retrieve an element in a nested array in a repeated map.
+  // RepeatedMap (Repeated List (Repeated varchar))
   public void testNestedArrayInRepeatedMap() throws Exception {
     test("select a[0].b[0] from cp.`jsoninput/nestedArray.json`");
     test("select a[0].b[1] from cp.`jsoninput/nestedArray.json`");
-    test("select a[1].b[1] from cp.`jsoninput/nestedArray.json`");  // index 
out of the range. Should return empty list.
+    test("select a[1].b[1] from cp.`jsoninput/nestedArray.json`"); // index 
out of the range. Should return empty list.
   }
 
   @Test
@@ -79,32 +84,31 @@ public class TestJsonRecordReader extends BaseTestQuery {
   public void testExceptionHandling() throws Exception {
     try {
       test("select * from cp.`jsoninput/DRILL-2350.json`");
-    } catch(UserException e) {
-      
Assert.assertEquals(UserBitShared.DrillPBError.ErrorType.UNSUPPORTED_OPERATION, 
e.getOrCreatePBError(false).getErrorType());
+    } catch (UserException e) {
+      Assert.assertEquals(
+          UserBitShared.DrillPBError.ErrorType.UNSUPPORTED_OPERATION, e
+              .getOrCreatePBError(false).getErrorType());
       String s = e.getMessage();
-      assertEquals("Expected Unsupported Operation Exception.", true, 
s.contains("Drill does not support lists of different types."));
+      assertEquals("Expected Unsupported Operation Exception.", true,
+          s.contains("Drill does not support lists of different types."));
     }
 
   }
 
-  @Test //DRILL-1832
+  @Test
+  // DRILL-1832
   public void testJsonWithNulls1() throws Exception {
-    final String query="select * from cp.`jsoninput/twitter_43.json`";
-    testBuilder()
-            .sqlQuery(query)
-            .unOrdered()
-            .jsonBaselineFile("jsoninput/drill-1832-1-result.json")
-            .go();
+    final String query = "select * from cp.`jsoninput/twitter_43.json`";
+    testBuilder().sqlQuery(query).unOrdered()
+        .jsonBaselineFile("jsoninput/drill-1832-1-result.json").go();
   }
 
-  @Test //DRILL-1832
+  @Test
+  // DRILL-1832
   public void testJsonWithNulls2() throws Exception {
-    final String query="select SUM(1) as `sum_Number_of_Records_ok` from 
cp.`/jsoninput/twitter_43.json` having (COUNT(1) > 0)";
-    testBuilder()
-            .sqlQuery(query)
-            .unOrdered()
-            .jsonBaselineFile("jsoninput/drill-1832-2-result.json")
-            .go();
+    final String query = "select SUM(1) as `sum_Number_of_Records_ok` from 
cp.`/jsoninput/twitter_43.json` having (COUNT(1) > 0)";
+    testBuilder().sqlQuery(query).unOrdered()
+        .jsonBaselineFile("jsoninput/drill-1832-2-result.json").go();
   }
 
   @Test
@@ -112,15 +116,18 @@ public class TestJsonRecordReader extends BaseTestQuery {
     try {
       testBuilder()
           .sqlQuery("select * from cp.`jsoninput/mixed_number_types.json`")
-          .unOrdered()
-          .jsonBaselineFile("jsoninput/mixed_number_types.json")
+          .unOrdered().jsonBaselineFile("jsoninput/mixed_number_types.json")
           .build().run();
     } catch (Exception ex) {
-      assertTrue(ex.getMessage().contains("DATA_READ ERROR: Error parsing JSON 
- You tried to write a BigInt type when you are using a ValueWriter of type 
NullableFloat8WriterImpl."));
+      assertTrue(ex
+          .getMessage()
+          .contains(
+              "You tried to write a BigInt type when you are using a 
ValueWriter of type NullableFloat8WriterImpl."));
       // this indicates successful completion of the test
       return;
     }
-    throw new Exception("Mixed number types verification failed, expected 
failure on conflicting number types.");
+    throw new Exception(
+        "Mixed number types verification failed, expected failure on 
conflicting number types.");
   }
 
   @Test
@@ -128,24 +135,18 @@ public class TestJsonRecordReader extends BaseTestQuery {
     testNoResult("alter session set `store.json.all_text_mode`= true");
     testBuilder()
         .sqlQuery("select * from cp.`jsoninput/mixed_number_types.json`")
-        .unOrdered()
-        .baselineColumns("a")
-        .baselineValues("5.2")
-        .baselineValues("6")
-        .build().run();
+        .unOrdered().baselineColumns("a").baselineValues("5.2")
+        .baselineValues("6").build().run();
   }
 
   @Test
   public void testMixedNumberTypesWhenReadingNumbersAsDouble() throws 
Exception {
     try {
-    testNoResult("alter session set `store.json.read_numbers_as_double`= 
true");
-    testBuilder()
-        .sqlQuery("select * from cp.`jsoninput/mixed_number_types.json`")
-        .unOrdered()
-        .baselineColumns("a")
-        .baselineValues(5.2D)
-        .baselineValues(6D)
-        .build().run();
+      testNoResult("alter session set `store.json.read_numbers_as_double`= 
true");
+      testBuilder()
+          .sqlQuery("select * from cp.`jsoninput/mixed_number_types.json`")
+          .unOrdered().baselineColumns("a").baselineValues(5.2D)
+          .baselineValues(6D).build().run();
     } finally {
       testNoResult("alter session set `store.json.read_numbers_as_double`= 
false");
     }
@@ -158,25 +159,97 @@ public class TestJsonRecordReader extends BaseTestQuery {
       test("create table dfs_test.tmp.drill_3353 as select a from 
dfs.`${WORKING_PATH}/src/test/resources/jsoninput/drill_3353` where e = true");
       String query = "select t.a.d cnt from dfs_test.tmp.drill_3353 t where 
t.a.d is not null";
       test(query);
-      testBuilder()
-          .sqlQuery(query)
-          .unOrdered()
-          .baselineColumns("cnt")
-          .baselineValues("1")
-          .go();
+      testBuilder().sqlQuery(query).unOrdered().baselineColumns("cnt")
+          .baselineValues("1").go();
     } finally {
       testNoResult("alter session set `store.json.all_text_mode` = false");
     }
   }
 
-  @Test // See DRILL-3476
+  @Test
+  // See DRILL-3476
   public void testNestedFilter() throws Exception {
     String query = "select a from cp.`jsoninput/nestedFilter.json` t where 
t.a.b = 1";
     String baselineQuery = "select * from cp.`jsoninput/nestedFilter.json` t 
where t.a.b = 1";
-    testBuilder()
-        .sqlQuery(query)
-        .unOrdered()
-        .sqlBaselineQuery(baselineQuery)
+    testBuilder().sqlQuery(query).unOrdered().sqlBaselineQuery(baselineQuery)
         .go();
   }
+
+  @Test
+  // See DRILL-4653
+  /* Test for CountingJSONReader */
+  public void testCountingQuerySkippingInvalidJSONRecords() throws Exception {
+    try {
+      String set = "alter session set `"
+          + ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG + "` = true";
+      String set1 = "alter session set `"
+          + ExecConstants.JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG
+          + "` = true";
+      String query = "select count(*) from cp.`jsoninput/drill4653/file.json`";
+
+      testNoResult(set);
+      testNoResult(set1);
+      testBuilder().unOrdered().sqlQuery(query).sqlBaselineQuery(query).build()
+          .run();
+    } finally {
+      String set = "alter session set `"
+          + ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG + "` = false";
+      testNoResult(set);
+    }
+  }
+
+  @Test
+  // See DRILL-4653
+  /* Test for CountingJSONReader */
+  public void testCountingQueryNotSkippingInvalidJSONRecords() throws 
Exception {
+    try {
+      String query = "select count(*) from cp.`jsoninput/drill4653/file.json`";
+      testBuilder().unOrdered().sqlQuery(query).sqlBaselineQuery(query).build()
+          .run();
+    } catch (Exception ex) {
+      // do nothing just return
+       return;
+    }
+    throw new Exception("testCountingQueryNotSkippingInvalidJSONRecords");
+  }
+
+  @Test
+  // See DRILL-4653
+  /* Test for JSONReader */
+  public void testNotCountingQuerySkippingInvalidJSONRecords() throws 
Exception {
+    try {
+
+      String set = "alter session set `"
+          + ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG + "` = true";
+      String set1 = "alter session set `"
+          + ExecConstants.JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG
+          + "` = true";
+      String query = "select sum(balance) from 
cp.`jsoninput/drill4653/file.json`";
+      testNoResult(set);
+      testNoResult(set1);
+      testBuilder().unOrdered().sqlQuery(query).sqlBaselineQuery(query).build()
+          .run();
+    }
+    finally {
+      String set = "alter session set `"
+          + ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG + "` = false";
+      testNoResult(set);
+    }
+  }
+
+  @Test
+  // See DRILL-4653
+  /* Test for JSONReader */
+  public void testNotCountingQueryNotSkippingInvalidJSONRecords()
+      throws Exception {
+    try {
+      String query = "select sum(balance) from 
cp.`jsoninput/drill4653/file.json`";
+      testBuilder().unOrdered().sqlQuery(query).sqlBaselineQuery(query).build()
+          .run();
+    } catch (Exception ex) {
+      // do nothing just return
+      return;
+    }
+    throw new Exception("testNotCountingQueryNotSkippingInvalidJSONRecords");
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/db482989/exec/java-exec/src/test/resources/jsoninput/drill4653/file.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/jsoninput/drill4653/file.json 
b/exec/java-exec/src/test/resources/jsoninput/drill4653/file.json
new file mode 100644
index 0000000..edfd3e6
--- /dev/null
+++ b/exec/java-exec/src/test/resources/jsoninput/drill4653/file.json
@@ -0,0 +1,10 @@
+{"balance": 1000.0,"num": 100,"is_vip": true,"name": 
"foo3","curr":{"denom":"pound","test":{"value :false}}}
+{"balance": 1000.1,"num": 100,"is_vip": true,"name": 
"foo3","curr":{"denom":"pound","test":{"value":false}}}
+{"balance": 1000.2,"num": 100,"is_vip": true,"name": 
"foo3","curr":{"denom":"pound"}}
+{"balance": 1000.3,"num": 100,"is_vip": true,"name": 
"foo3","curr":{"denom":"pound"}}
+{"balance": 1000.4,"num": 100,"is_vip": true,"name": 
"foo3","curr":{"denom":"pound"}}
+{"balance": 1000.5,"num": 100,"is_vip": true,"name": 
"foo3","curr":{"denom":"pound"}}
+{"balance": 1000.6,"num": 100,"is_vip": true,"name": 
"foo3","curr":{"denom":"pound"}}
+{"balance": 1000.7,"num": 100,"is_vip": true,"name": 
"foo3","curr":{"denom":"pound"}}
+{"balance": 1000.8,"num": 100,"is_vip": true,"name": 
"foo3","curr":{"denom":"pound"}}
+{"balance": 1000.9,"num": 100,"is_vip": true,"name": 
"foo3","curr":{"denom":"pound"}}

Reply via email to