DRILL-4203: Fix date values written in parquet files created by Drill

Drill was writing non-standard dates into parquet files for all releases
before 1.9.0. The values have been read by Drill correctly by Drill, but
external tools like Spark reading the files will see corrupted values for
all dates that have been written by Drill.

This change corrects the behavior of the Drill parquet writer to correctly
store dates in the format given in the parquet specification.

To maintain compatibility with old files, the parquet reader code has
been updated to check for the old format and automatically shift the
corrupted values into corrected ones automatically.

The test cases included here should ensure that all files produced by
historical versions of Drill will continue to return the same values they
had in previous releases. For compatibility with external tools, any old
files with corrupted dates can be re-written using the CREATE TABLE AS
command (as the writer will now only produce the specification-compliant
values, even if after reading out of older corrupt files).

While the old behavior was a consistent shift into an unlikely range
to be used in a modern database (over 10,000 years in the future), these are 
still
valid date values. In the case where these may have been written into
files intentionally, and we cannot be certain from the metadata if Drill
produced the files, an option is included to turn off the auto-correction.
Use of this option is assumed to be extremely unlikely, but it is included
for completeness.

This patch was originally written against version 1.5.0, when rebasing
the corruption threshold was updated to 1.9.0.

Added regenerated binary files, updated metadata cache files accordingly.

One small fix in the ParquetGroupScan to accommodate changes in master that 
changed
when metadata is read.

Tests for bugs revealed by the regression suite.

Fix drill version number in metadata file generation


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ae34d5c3
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ae34d5c3
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ae34d5c3

Branch: refs/heads/master
Commit: ae34d5c30582de777db19360abf013bc50c8640b
Parents: 2f4b5ef
Author: Jason Altekruse <altekruseja...@gmail.com>
Authored: Thu Dec 31 10:22:04 2015 -0600
Committer: Parth Chandra <par...@apache.org>
Committed: Fri Oct 14 11:08:07 2016 -0700

----------------------------------------------------------------------
 .../hive/HiveDrillNativeScanBatchCreator.java   |   8 +-
 .../templates/ParquetOutputRecordWriter.java    |   4 +-
 .../sql/handlers/RefreshMetadataHandler.java    |   5 +-
 .../drill/exec/store/parquet/Metadata.java      |  77 ++-
 .../exec/store/parquet/ParquetFormatConfig.java |  22 +-
 .../exec/store/parquet/ParquetFormatPlugin.java |   9 +-
 .../exec/store/parquet/ParquetGroupScan.java    |  34 +-
 .../store/parquet/ParquetReaderUtility.java     | 232 ++++++++
 .../store/parquet/ParquetScanBatchCreator.java  |   8 +-
 .../columnreaders/ColumnReaderFactory.java      |  28 +-
 .../columnreaders/FixedByteAlignedReader.java   |  65 ++-
 .../NullableFixedByteAlignedReaders.java        |  63 ++-
 .../columnreaders/ParquetRecordReader.java      |  85 ++-
 .../parquet2/DrillParquetGroupConverter.java    |  71 ++-
 .../exec/store/parquet2/DrillParquetReader.java |   8 +-
 .../DrillParquetRecordMaterializer.java         |   6 +-
 .../java/org/apache/drill/DrillTestWrapper.java |   4 +-
 .../TestCorruptParquetDateCorrection.java       | 539 +++++++++++++++++++
 .../dfs/TestFormatPluginOptionExtractor.java    |   4 +-
 .../store/parquet/ParquetRecordReaderTest.java  |   2 +-
 .../0_0_1.parquet                               | Bin 0 -> 257 bytes
 .../0_0_2.parquet                               | Bin 0 -> 257 bytes
 .../0_0_3.parquet                               | Bin 0 -> 257 bytes
 .../0_0_4.parquet                               | Bin 0 -> 257 bytes
 .../0_0_5.parquet                               | Bin 0 -> 257 bytes
 .../0_0_6.parquet                               | Bin 0 -> 257 bytes
 ...ll.parquet.metadata_1_2.requires_replace.txt | 119 ++++
 .../fewtypes_datepartition/0_0_1.parquet        | Bin 0 -> 1226 bytes
 .../fewtypes_datepartition/0_0_10.parquet       | Bin 0 -> 1258 bytes
 .../fewtypes_datepartition/0_0_11.parquet       | Bin 0 -> 1238 bytes
 .../fewtypes_datepartition/0_0_12.parquet       | Bin 0 -> 1258 bytes
 .../fewtypes_datepartition/0_0_13.parquet       | Bin 0 -> 1226 bytes
 .../fewtypes_datepartition/0_0_14.parquet       | Bin 0 -> 1201 bytes
 .../fewtypes_datepartition/0_0_15.parquet       | Bin 0 -> 1216 bytes
 .../fewtypes_datepartition/0_0_16.parquet       | Bin 0 -> 1253 bytes
 .../fewtypes_datepartition/0_0_17.parquet       | Bin 0 -> 1231 bytes
 .../fewtypes_datepartition/0_0_18.parquet       | Bin 0 -> 1216 bytes
 .../fewtypes_datepartition/0_0_19.parquet       | Bin 0 -> 1186 bytes
 .../fewtypes_datepartition/0_0_2.parquet        | Bin 0 -> 1268 bytes
 .../fewtypes_datepartition/0_0_20.parquet       | Bin 0 -> 1228 bytes
 .../fewtypes_datepartition/0_0_21.parquet       | Bin 0 -> 1231 bytes
 .../fewtypes_datepartition/0_0_3.parquet        | Bin 0 -> 1278 bytes
 .../fewtypes_datepartition/0_0_4.parquet        | Bin 0 -> 1242 bytes
 .../fewtypes_datepartition/0_0_5.parquet        | Bin 0 -> 1335 bytes
 .../fewtypes_datepartition/0_0_6.parquet        | Bin 0 -> 1222 bytes
 .../fewtypes_datepartition/0_0_7.parquet        | Bin 0 -> 1273 bytes
 .../fewtypes_datepartition/0_0_8.parquet        | Bin 0 -> 1263 bytes
 .../fewtypes_datepartition/0_0_9.parquet        | Bin 0 -> 1268 bytes
 .../fewtypes_varcharpartition/0_0_1.parquet     | Bin 0 -> 2128 bytes
 .../fewtypes_varcharpartition/0_0_10.parquet    | Bin 0 -> 2086 bytes
 .../fewtypes_varcharpartition/0_0_11.parquet    | Bin 0 -> 2121 bytes
 .../fewtypes_varcharpartition/0_0_12.parquet    | Bin 0 -> 2114 bytes
 .../fewtypes_varcharpartition/0_0_13.parquet    | Bin 0 -> 2128 bytes
 .../fewtypes_varcharpartition/0_0_14.parquet    | Bin 0 -> 2068 bytes
 .../fewtypes_varcharpartition/0_0_15.parquet    | Bin 0 -> 2054 bytes
 .../fewtypes_varcharpartition/0_0_16.parquet    | Bin 0 -> 2114 bytes
 .../fewtypes_varcharpartition/0_0_17.parquet    | Bin 0 -> 2135 bytes
 .../fewtypes_varcharpartition/0_0_18.parquet    | Bin 0 -> 2223 bytes
 .../fewtypes_varcharpartition/0_0_19.parquet    | Bin 0 -> 2072 bytes
 .../fewtypes_varcharpartition/0_0_2.parquet     | Bin 0 -> 2107 bytes
 .../fewtypes_varcharpartition/0_0_20.parquet    | Bin 0 -> 2012 bytes
 .../fewtypes_varcharpartition/0_0_21.parquet    | Bin 0 -> 2033 bytes
 .../fewtypes_varcharpartition/0_0_3.parquet     | Bin 0 -> 2068 bytes
 .../fewtypes_varcharpartition/0_0_4.parquet     | Bin 0 -> 2075 bytes
 .../fewtypes_varcharpartition/0_0_5.parquet     | Bin 0 -> 2075 bytes
 .../fewtypes_varcharpartition/0_0_6.parquet     | Bin 0 -> 2091 bytes
 .../fewtypes_varcharpartition/0_0_7.parquet     | Bin 0 -> 2063 bytes
 .../fewtypes_varcharpartition/0_0_8.parquet     | Bin 0 -> 2142 bytes
 .../fewtypes_varcharpartition/0_0_9.parquet     | Bin 0 -> 2054 bytes
 .../4203_corrected_dates.parquet                | Bin 0 -> 278 bytes
 .../4203_corrupt_dates.parquet                  | Bin 0 -> 181 bytes
 .../4203_corrupted_dates_1.4.parquet            | Bin 0 -> 278 bytes
 .../drill_0_6_currupt_dates_no_stats.parquet    | Bin 0 -> 181 bytes
 ...on_partitioned_metadata.requires_replace.txt | 301 +++++++++++
 .../null_date_cols_with_corruption_4203.parquet | Bin 0 -> 364 bytes
 .../0_0_1.parquet                               | Bin 0 -> 257 bytes
 .../0_0_2.parquet                               | Bin 0 -> 257 bytes
 .../0_0_3.parquet                               | Bin 0 -> 257 bytes
 .../0_0_4.parquet                               | Bin 0 -> 257 bytes
 .../0_0_5.parquet                               | Bin 0 -> 257 bytes
 .../0_0_6.parquet                               | Bin 0 -> 257 bytes
 .../0_0_1.parquet                               | Bin 0 -> 160 bytes
 .../0_0_2.parquet                               | Bin 0 -> 160 bytes
 .../0_0_3.parquet                               | Bin 0 -> 160 bytes
 .../0_0_4.parquet                               | Bin 0 -> 160 bytes
 .../0_0_5.parquet                               | Bin 0 -> 160 bytes
 .../0_0_6.parquet                               | Bin 0 -> 160 bytes
 87 files changed, 1607 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
index a9575ba..1ded153 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.fs.FileSystem;
@@ -118,6 +119,10 @@ public class HiveDrillNativeScanBatchCreator implements 
BatchCreator<HiveDrillNa
         final List<Integer> rowGroupNums = 
getRowGroupNumbersFromFileSplit(fileSplit, parquetMetadata);
 
         for(int rowGroupNum : rowGroupNums) {
+          // Drill has only ever written a single row group per file, only 
detect corruption
+          // in the first row group
+          ParquetReaderUtility.DateCorruptionStatus containsCorruptDates =
+              ParquetReaderUtility.detectCorruptDates(parquetMetadata, 
config.getColumns(), true);
           readers.add(new ParquetRecordReader(
                   context,
                   Path.getPathWithoutSchemeAndAuthority(finalPath).toString(),
@@ -125,7 +130,8 @@ public class HiveDrillNativeScanBatchCreator implements 
BatchCreator<HiveDrillNa
                   CodecFactory.createDirectCodecFactory(fs.getConf(),
                       new 
ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),
                   parquetMetadata,
-                  newColumns)
+                  newColumns,
+                  containsCorruptDates)
           );
           Map<String, String> implicitValues = Maps.newLinkedHashMap();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java 
b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
index 74af3ea..aac0f0c 100644
--- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
@@ -156,12 +156,12 @@ public abstract class ParquetOutputRecordWriter extends 
AbstractRecordWriter imp
   <#elseif minor.class == "Date">
     <#if mode.prefix == "Repeated" >
       reader.read(i, holder);
-      consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) 
+ JULIAN_DAY_EPOC));
+      consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) 
- JULIAN_DAY_EPOC));
     <#else>
       consumer.startField(fieldName, fieldId);
       reader.read(holder);
       // convert from internal Drill date format to Julian Day centered around 
Unix Epoc
-      consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) 
+ JULIAN_DAY_EPOC));
+      consumer.addInteger((int) (DateTimeUtils.toJulianDayNumber(holder.value) 
- JULIAN_DAY_EPOC));
       consumer.endField(fieldName, fieldId);
     </#if>
   <#elseif

http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
index 7be46f0..b36356a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
@@ -110,7 +110,10 @@ public class RefreshMetadataHandler extends 
DefaultSqlHandler {
         return notSupported(tableName);
       }
 
-      Metadata.createMeta(fs, selectionRoot);
+      if (!(formatConfig instanceof ParquetFormatConfig)) {
+        formatConfig = new ParquetFormatConfig();
+      }
+      Metadata.createMeta(fs, selectionRoot, (ParquetFormatConfig) 
formatConfig);
       return direct(true, "Successfully updated metadata for table %s.", 
tableName);
 
     } catch(Exception e) {

http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
index 86b860a..d6a739d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -26,6 +26,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.DrillVersionInfo;
+import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.TimedRunnable;
 import org.apache.drill.exec.store.dfs.DrillPathFilter;
 import org.apache.drill.exec.store.dfs.MetadataContext;
@@ -80,6 +82,7 @@ public class Metadata {
   public static final String METADATA_DIRECTORIES_FILENAME = 
".drill.parquet_metadata_directories";
 
   private final FileSystem fs;
+  private final ParquetFormatConfig formatConfig;
 
   private ParquetTableMetadataBase parquetTableMetadata;
   private ParquetTableMetadataDirs parquetTableMetadataDirs;
@@ -91,8 +94,8 @@ public class Metadata {
    * @param path
    * @throws IOException
    */
-  public static void createMeta(FileSystem fs, String path) throws IOException 
{
-    Metadata metadata = new Metadata(fs);
+  public static void createMeta(FileSystem fs, String path, 
ParquetFormatConfig formatConfig) throws IOException {
+    Metadata metadata = new Metadata(fs, formatConfig);
     metadata.createMetaFilesRecursively(path);
   }
 
@@ -104,9 +107,9 @@ public class Metadata {
    * @return
    * @throws IOException
    */
-  public static ParquetTableMetadata_v2 getParquetTableMetadata(FileSystem fs, 
String path)
+  public static ParquetTableMetadata_v2 getParquetTableMetadata(FileSystem fs, 
String path, ParquetFormatConfig formatConfig)
       throws IOException {
-    Metadata metadata = new Metadata(fs);
+    Metadata metadata = new Metadata(fs, formatConfig);
     return metadata.getParquetTableMetadata(path);
   }
 
@@ -119,8 +122,8 @@ public class Metadata {
    * @throws IOException
    */
   public static ParquetTableMetadata_v2 getParquetTableMetadata(FileSystem fs,
-      List<FileStatus> fileStatuses) throws IOException {
-    Metadata metadata = new Metadata(fs);
+      List<FileStatus> fileStatuses, ParquetFormatConfig formatConfig) throws 
IOException {
+    Metadata metadata = new Metadata(fs, formatConfig);
     return metadata.getParquetTableMetadata(fileStatuses);
   }
 
@@ -132,20 +135,21 @@ public class Metadata {
    * @return
    * @throws IOException
    */
-  public static ParquetTableMetadataBase readBlockMeta(FileSystem fs, String 
path, MetadataContext metaContext) throws IOException {
-    Metadata metadata = new Metadata(fs);
+  public static ParquetTableMetadataBase readBlockMeta(FileSystem fs, String 
path, MetadataContext metaContext, ParquetFormatConfig formatConfig) throws 
IOException {
+    Metadata metadata = new Metadata(fs, formatConfig);
     metadata.readBlockMeta(path, false, metaContext);
     return metadata.parquetTableMetadata;
   }
 
-  public static ParquetTableMetadataDirs readMetadataDirs(FileSystem fs, 
String path, MetadataContext metaContext) throws IOException {
-    Metadata metadata = new Metadata(fs);
+  public static ParquetTableMetadataDirs readMetadataDirs(FileSystem fs, 
String path, MetadataContext metaContext, ParquetFormatConfig formatConfig) 
throws IOException {
+    Metadata metadata = new Metadata(fs, formatConfig);
     metadata.readBlockMeta(path, true, metaContext);
     return metadata.parquetTableMetadataDirs;
   }
 
-  private Metadata(FileSystem fs) {
+  private Metadata(FileSystem fs, ParquetFormatConfig formatConfig) {
     this.fs = 
ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), 
fs.getConf());
+    this.formatConfig = formatConfig;
   }
 
   /**
@@ -345,6 +349,10 @@ public class Metadata {
 
     List<RowGroupMetadata_v2> rowGroupMetadataList = Lists.newArrayList();
 
+    ArrayList<SchemaPath> ALL_COLS = new ArrayList<>();
+    ALL_COLS.add(AbstractRecordReader.STAR_COLUMN);
+    boolean autoCorrectCorruptDates = formatConfig.autoCorrectCorruptDates;
+    ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = 
ParquetReaderUtility.detectCorruptDates(metadata, ALL_COLS, 
autoCorrectCorruptDates);
     for (BlockMetaData rowGroup : metadata.getBlocks()) {
       List<ColumnMetadata_v2> columnMetadataList = Lists.newArrayList();
       long length = 0;
@@ -367,9 +375,13 @@ public class Metadata {
         if (statsAvailable) {
           // Write stats only if minVal==maxVal. Also, we then store only 
maxVal
           Object mxValue = null;
-          if (stats.genericGetMax() != null && stats.genericGetMin() != null 
&& stats.genericGetMax()
-              .equals(stats.genericGetMin())) {
+          if (stats.genericGetMax() != null && stats.genericGetMin() != null &&
+              stats.genericGetMax().equals(stats.genericGetMin())) {
             mxValue = stats.genericGetMax();
+            if (containsCorruptDates == 
ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION
+                && columnTypeMetadata.originalType == OriginalType.DATE) {
+              mxValue = 
ParquetReaderUtility.autoCorrectCorruptedDate((Integer) mxValue);
+            }
           }
           columnMetadata =
               new ColumnMetadata_v2(columnTypeMetadata.name, col.getType(), 
mxValue, stats.getNumNulls());
@@ -521,7 +533,6 @@ public class Metadata {
    * Check if the parquet metadata needs to be updated by comparing the 
modification time of the directories with
    * the modification time of the metadata file
    *
-   * @param tableMetadata
    * @param metaFilePath
    * @return
    * @throws IOException
@@ -585,6 +596,7 @@ public class Metadata {
     @JsonIgnore public abstract OriginalType getOriginalType(String[] 
columnName);
 
     @JsonIgnore public abstract ParquetTableMetadataBase clone();
+    @JsonIgnore public abstract String getDrillVersion();
   }
 
   public static abstract class ParquetFileMetadata {
@@ -618,6 +630,24 @@ public class Metadata {
 
     public abstract Object getMaxValue();
 
+    /**
+     * Set the max value recorded in the parquet metadata statistics.
+     *
+     * This object would just be immutable, but due to Drill-4203 we need to 
correct
+     * date values that had been corrupted by earlier versions of Drill.
+     * @return
+     */
+    public abstract void setMax(Object newMax);
+
+    /**
+     * Set the max value recorded in the parquet metadata statistics.
+     *
+     * This object would just be immutable, but due to Drill-4203 we need to 
correct
+     * date values that had been corrupted by earlier versions of Drill.
+     * @return
+     */
+    public abstract void setMin(Object newMax);
+
     public abstract PrimitiveTypeName getPrimitiveType();
 
     public abstract OriginalType getOriginalType();
@@ -681,6 +711,10 @@ public class Metadata {
     @JsonIgnore @Override public ParquetTableMetadataBase clone() {
       return new ParquetTableMetadata_v1(files, directories);
     }
+    @Override
+    public String getDrillVersion() {
+      return null;
+    }
   }
 
 
@@ -870,7 +904,6 @@ public class Metadata {
       return max;
     }
 
-
   }
 
   /**
@@ -885,9 +918,10 @@ public class Metadata {
     @JsonProperty public ConcurrentHashMap<ColumnTypeMetadata_v2.Key, 
ColumnTypeMetadata_v2> columnTypeInfo;
     @JsonProperty List<ParquetFileMetadata_v2> files;
     @JsonProperty List<String> directories;
+    @JsonProperty String drillVersion;
 
     public ParquetTableMetadata_v2() {
-      super();
+      this.drillVersion = DrillVersionInfo.getVersion();
     }
 
     public ParquetTableMetadata_v2(ParquetTableMetadataBase parquetTable,
@@ -895,6 +929,7 @@ public class Metadata {
       this.files = files;
       this.directories = directories;
       this.columnTypeInfo = ((ParquetTableMetadata_v2) 
parquetTable).columnTypeInfo;
+      this.drillVersion = DrillVersionInfo.getVersion();
     }
 
     public ParquetTableMetadata_v2(List<ParquetFileMetadata_v2> files, 
List<String> directories,
@@ -935,6 +970,11 @@ public class Metadata {
     @JsonIgnore @Override public ParquetTableMetadataBase clone() {
       return new ParquetTableMetadata_v2(files, directories, columnTypeInfo);
     }
+    @Override
+    public String getDrillVersion() {
+      return drillVersion;
+    }
+
   }
 
 
@@ -1141,6 +1181,11 @@ public class Metadata {
       return mxValue;
     }
 
+    @Override
+    public void setMin(Object newMin) {
+      // noop - min value not stored in this version of the metadata
+    }
+
     @Override public PrimitiveTypeName getPrimitiveType() {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
index 74a90c0..9ba03df 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatConfig.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.drill.common.logical.FormatPluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -24,14 +25,25 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 @JsonTypeName("parquet")
 public class ParquetFormatConfig implements FormatPluginConfig{
 
+  public boolean autoCorrectCorruptDates = true;
+
   @Override
-  public int hashCode() {
-    return 7;
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    ParquetFormatConfig that = (ParquetFormatConfig) o;
+
+    return autoCorrectCorruptDates == that.autoCorrectCorruptDates;
+
   }
 
   @Override
-  public boolean equals(Object obj) {
-    return obj instanceof ParquetFormatConfig;
+  public int hashCode() {
+    return (autoCorrectCorruptDates ? 1 : 0);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 1ab621b..f17d414 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -92,7 +92,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
       StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){
     this.context = context;
     this.config = formatConfig;
-    this.formatMatcher = new ParquetFormatMatcher(this);
+    this.formatMatcher = new ParquetFormatMatcher(this, config);
     this.storageConfig = storageConfig;
     this.fsConf = fsConf;
     this.name = name == null ? DEFAULT_NAME : name;
@@ -196,8 +196,11 @@ public class ParquetFormatPlugin implements FormatPlugin{
 
   private static class ParquetFormatMatcher extends BasicFormatMatcher{
 
-    public ParquetFormatMatcher(ParquetFormatPlugin plugin) {
+    private final ParquetFormatConfig formatConfig;
+
+    public ParquetFormatMatcher(ParquetFormatPlugin plugin, 
ParquetFormatConfig formatConfig) {
       super(plugin, PATTERNS, MAGIC_STRINGS);
+      this.formatConfig = formatConfig;
     }
 
     @Override
@@ -218,7 +221,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
           // create a metadata context that will be used for the duration of 
the query for this table
           MetadataContext metaContext = new MetadataContext();
 
-          ParquetTableMetadataDirs mDirs = Metadata.readMetadataDirs(fs, 
dirMetaPath.toString(), metaContext);
+          ParquetTableMetadataDirs mDirs = Metadata.readMetadataDirs(fs, 
dirMetaPath.toString(), metaContext, formatConfig);
           if (mDirs.getDirectories().size() > 0) {
             FileSelection dirSelection = 
FileSelection.createFromDirectories(mDirs.getDirectories(), selection,
                 selection.getSelectionRoot() /* cacheFileRoot initially points 
to selectionRoot */);

http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index ec34e7a..649282b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -43,7 +43,6 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.store.ParquetOutputRecordWriter;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.DrillPathFilter;
@@ -81,10 +80,10 @@ import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTimeUtils;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
-import org.joda.time.DateTimeUtils;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -394,6 +393,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan 
{
     return columnTypeMap.get(schemaPath);
   }
 
+  // Map from file names to maps of column name to partition value mappings
   private Map<String, Map<SchemaPath, Object>> partitionValueMap = 
Maps.newHashMap();
 
   public void populatePruningVector(ValueVector v, int index, SchemaPath 
column, String file) {
@@ -479,7 +479,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan 
{
       case DATE: {
         NullableDateVector dateVector = (NullableDateVector) v;
         Integer value = (Integer) partitionValueMap.get(f).get(column);
-        dateVector.getMutator().setSafe(index, 
DateTimeUtils.fromJulianDay(value - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 
0.5));
+        dateVector.getMutator().setSafe(index, DateTimeUtils.fromJulianDay(
+            value + 
ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY));
         return;
       }
       case TIME: {
@@ -582,7 +583,10 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
     // we only select the files that are part of selection (by setting fileSet 
appropriately)
 
     // get (and set internal field) the metadata for the directory by reading 
the metadata file
-    this.parquetTableMetadata = Metadata.readBlockMeta(fs, 
metaFilePath.toString(), selection.getMetaContext());
+    this.parquetTableMetadata = Metadata.readBlockMeta(fs, 
metaFilePath.toString(), selection.getMetaContext(), formatConfig);
+    if (formatConfig.autoCorrectCorruptDates) {
+      
ParquetReaderUtility.correctDatesInMetadataCache(this.parquetTableMetadata);
+    }
     List<FileStatus> fileStatuses = selection.getStatuses(fs);
 
     if (fileSet == null) {
@@ -616,7 +620,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan 
{
         if (status.isDirectory()) {
           //TODO [DRILL-4496] read the metadata cache files in parallel
           final Path metaPath = new Path(status.getPath(), 
Metadata.METADATA_FILENAME);
-          final Metadata.ParquetTableMetadataBase metadata = 
Metadata.readBlockMeta(fs, metaPath.toString(), selection.getMetaContext());
+          final Metadata.ParquetTableMetadataBase metadata = 
Metadata.readBlockMeta(fs, metaPath.toString(), selection.getMetaContext(), 
formatConfig);
           for (Metadata.ParquetFileMetadata file : metadata.getFiles()) {
             fileSet.add(file.getPath());
           }
@@ -664,9 +668,11 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
       }
       if (metaPath != null && fs.exists(metaPath)) {
         usedMetadataCache = true;
-        parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString(), 
metaContext);
+        if (parquetTableMetadata == null) {
+          parquetTableMetadata = Metadata.readBlockMeta(fs, 
metaPath.toString(), metaContext, formatConfig);
+        }
       } else {
-        parquetTableMetadata = Metadata.getParquetTableMetadata(fs, 
p.toString());
+        parquetTableMetadata = Metadata.getParquetTableMetadata(fs, 
p.toString(), formatConfig);
       }
     } else {
       Path p = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot));
@@ -674,17 +680,25 @@ public class ParquetGroupScan extends 
AbstractFileGroupScan {
       if (fs.isDirectory(new Path(selectionRoot)) && fs.exists(metaPath)) {
         usedMetadataCache = true;
         if (parquetTableMetadata == null) {
-          parquetTableMetadata = Metadata.readBlockMeta(fs, 
metaPath.toString(), metaContext);
+          parquetTableMetadata = Metadata.readBlockMeta(fs, 
metaPath.toString(), metaContext, formatConfig);
         }
         if (fileSet != null) {
-          parquetTableMetadata = removeUnneededRowGroups(parquetTableMetadata);
+          if (parquetTableMetadata == null) {
+            parquetTableMetadata = 
removeUnneededRowGroups(Metadata.readBlockMeta(fs, metaPath.toString(), 
metaContext, formatConfig));
+          } else {
+            parquetTableMetadata = 
removeUnneededRowGroups(parquetTableMetadata);
+          }
+        } else {
+          if (parquetTableMetadata == null) {
+            parquetTableMetadata = Metadata.readBlockMeta(fs, 
metaPath.toString(), metaContext, formatConfig);
+          }
         }
       } else {
         final List<FileStatus> fileStatuses = Lists.newArrayList();
         for (ReadEntryWithPath entry : entries) {
           getFiles(entry.getPath(), fileStatuses);
         }
-        parquetTableMetadata = Metadata.getParquetTableMetadata(fs, 
fileStatuses);
+        parquetTableMetadata = Metadata.getParquetTableMetadata(fs, 
fileStatuses, formatConfig);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
index 2f56aa0..9d0886f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java
@@ -18,9 +18,32 @@
 package org.apache.drill.exec.store.parquet;
 
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.ParquetOutputRecordWriter;
 import org.apache.drill.exec.work.ExecErrorConstants;
+import org.apache.parquet.SemanticVersion;
+import org.apache.parquet.VersionParser;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.format.ConvertedType;
+import org.apache.parquet.format.FileMetaData;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.OriginalType;
+import org.joda.time.Chronology;
+import org.joda.time.DateTimeUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 /*
  * Utility class where we can capture common logic between the two parquet 
readers
@@ -28,6 +51,29 @@ import org.apache.drill.exec.work.ExecErrorConstants;
 public class ParquetReaderUtility {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ParquetReaderUtility.class);
 
+  // Note the negation symbol in the beginning
+  public static final double CORRECT_CORRUPT_DATE_SHIFT = 
-ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5;
+  public static final double SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY = 
ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5;
+  // The year 5000 is the threshold for auto-detecting date corruption.
+  // This balances two possible cases of bad auto-correction. External tools 
writing dates in the future will not
+  // be shifted unless they are past this threshold (and we cannot identify 
them as external files based on the metadata).
+  // On the other hand, historical dates written with Drill wouldn't risk 
being incorrectly shifted unless they were
+  // something like 10,000 years in the past.
+  private static final Chronology UTC = 
org.joda.time.chrono.ISOChronology.getInstanceUTC();
+  public static final int DATE_CORRUPTION_THRESHOLD =
+      (int) (DateTimeUtils.toJulianDayNumber(UTC.getDateTimeMillis(5000, 1, 1, 
0)) - ParquetOutputRecordWriter.JULIAN_DAY_EPOC);
+
+  /**
+   * For most recently created parquet files, we can determine if we have 
corrupted dates (see DRILL-4203)
+   * based on the file metadata. For older files that lack statistics we must 
actually test the values
+   * in the data pages themselves to see if they are likely corrupt.
+   */
+  public enum DateCorruptionStatus {
+    META_SHOWS_CORRUPTION, // metadata can determine if the values are 
definitely CORRUPT
+    META_SHOWS_NO_CORRUPTION, // metadata can determine if the values are 
definitely CORRECT
+    META_UNCLEAR_TEST_VALUES // not enough info in metadata, parquet reader 
must test individual values
+  }
+
   public static void checkDecimalTypeEnabled(OptionManager options) {
     if 
(options.getOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY).bool_val == 
false) {
       throw UserException.unsupportedError()
@@ -45,4 +91,190 @@ public class ParquetReaderUtility {
     }
     return out;
   }
+
+  public static Map<String, SchemaElement> 
getColNameToSchemaElementMapping(ParquetMetadata footer) {
+    HashMap<String, SchemaElement> schemaElements = new HashMap<>();
+    FileMetaData fileMetaData = new 
ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, 
footer);
+    for (SchemaElement se : fileMetaData.getSchema()) {
+      schemaElements.put(se.getName(), se);
+    }
+    return schemaElements;
+  }
+
+  public static int autoCorrectCorruptedDate(int corruptedDate) {
+    return (int) (corruptedDate - 2 * 
ParquetOutputRecordWriter.JULIAN_DAY_EPOC);
+  }
+
+  public static void 
correctDatesInMetadataCache(Metadata.ParquetTableMetadataBase 
parquetTableMetadata) {
+    DateCorruptionStatus cacheFileContainsCorruptDates;
+    String drillVersionStr = parquetTableMetadata.getDrillVersion();
+    if (drillVersionStr != null) {
+      try {
+        cacheFileContainsCorruptDates = 
ParquetReaderUtility.drillVersionHasCorruptedDates(drillVersionStr);
+      } catch (VersionParser.VersionParseException e) {
+        cacheFileContainsCorruptDates = 
DateCorruptionStatus.META_SHOWS_CORRUPTION;
+      }
+    } else {
+      cacheFileContainsCorruptDates = 
DateCorruptionStatus.META_SHOWS_CORRUPTION;
+    }
+    if (cacheFileContainsCorruptDates == 
DateCorruptionStatus.META_SHOWS_CORRUPTION) {
+      for (Metadata.ParquetFileMetadata file : 
parquetTableMetadata.getFiles()) {
+        // Drill has only ever written a single row group per file, only need 
to correct the statistics
+        // on the first row group
+        Metadata.RowGroupMetadata rowGroupMetadata = 
file.getRowGroups().get(0);
+        for (Metadata.ColumnMetadata columnMetadata : 
rowGroupMetadata.getColumns()) {
+          OriginalType originalType = columnMetadata.getOriginalType();
+          if (originalType != null && originalType.equals(OriginalType.DATE) &&
+              columnMetadata.hasSingleValue() &&
+              (Integer) columnMetadata.getMaxValue() > 
ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
+            int newMinMax = 
ParquetReaderUtility.autoCorrectCorruptedDate((Integer)columnMetadata.getMaxValue());
+            columnMetadata.setMax(newMinMax);
+            columnMetadata.setMin(newMinMax);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Check for corrupted dates in a parquet file. See Drill-4203
+   * @param footer
+   * @param columns
+   * @return
+   */
+  public static DateCorruptionStatus detectCorruptDates(ParquetMetadata footer,
+                                           List<SchemaPath> columns,
+                                           boolean autoCorrectCorruptDates) {
+    // old drill files have "parquet-mr" as created by string, and no drill 
version, need to check min/max values to see
+    // if they look corrupt
+    //  - option to disable this auto-correction based on the date values, in 
case users are storing these
+    //    dates intentionally
+
+    // migrated parquet files have 1.8.1 parquet-mr version with drill-r0 in 
the part of the name usually containing "SNAPSHOT"
+
+    // new parquet files 1.4+ have drill version number
+    //  - below 1.9.0 dates are corrupt
+    //  - this includes 1.9.0-SNAPSHOT
+
+    String drillVersion = 
footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.DRILL_VERSION_PROPERTY);
+    String createdBy = footer.getFileMetaData().getCreatedBy();
+    try {
+      if (drillVersion == null) {
+        // Possibly an old, un-migrated Drill file, check the column 
statistics to see if min/max values look corrupt
+        // only applies if there is a date column selected
+        if (createdBy.equals("parquet-mr")) {
+          // loop through parquet column metadata to find date columns, check 
for corrupt valuues
+          return checkForCorruptDateValuesInStatistics(footer, columns, 
autoCorrectCorruptDates);
+        } else {
+          // check the created by to see if it is a migrated Drill file
+          VersionParser.ParsedVersion parsedCreatedByVersion = 
VersionParser.parse(createdBy);
+          // check if this is a migrated Drill file, lacking a Drill version 
number, but with
+          // "drill" in the parquet created-by string
+          SemanticVersion semVer = parsedCreatedByVersion.getSemanticVersion();
+          String pre = semVer.pre + "";
+          if (semVer != null && semVer.major == 1 && semVer.minor == 8 && 
semVer.patch == 1 && pre.contains("drill")) {
+            return DateCorruptionStatus.META_SHOWS_CORRUPTION;
+          } else {
+            // written by a tool that wasn't Drill, the dates are not corrupted
+            return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
+          }
+        }
+      } else {
+        // this parser expects an application name before the semantic 
version, just prepending Drill
+        // we know from the property name "drill.version" that we wrote this
+        return drillVersionHasCorruptedDates(drillVersion);
+      }
+    } catch (VersionParser.VersionParseException e) {
+      // Default value of "false" if we cannot parse the version is fine, we 
are covering all
+      // of the metadata values produced by historical versions of Drill
+      // If Drill didn't write it the dates should be fine
+      return DateCorruptionStatus.META_SHOWS_CORRUPTION;
+    }
+  }
+
+  public static DateCorruptionStatus drillVersionHasCorruptedDates(String 
drillVersion) throws VersionParser.VersionParseException {
+    VersionParser.ParsedVersion parsedDrillVersion = 
parseDrillVersion(drillVersion);
+    SemanticVersion semVer = parsedDrillVersion.getSemanticVersion();
+    if (semVer == null || semVer.compareTo(new SemanticVersion(1, 9, 0)) < 0) {
+      return DateCorruptionStatus.META_SHOWS_CORRUPTION;
+    } else {
+      return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
+    }
+
+  }
+
+  public static VersionParser.ParsedVersion parseDrillVersion(String 
drillVersion) throws VersionParser.VersionParseException {
+    return VersionParser.parse("drill version " + drillVersion + " (build 
1234)");
+  }
+
+  /**
+   * Detect corrupt date values by looking at the min/max values in the 
metadata.
+   *
+   * This should only be used when a file does not have enough metadata to 
determine if
+   * the data was written with an older version of Drill, or an external tool. 
Drill
+   * versions 1.3 and beyond should have enough metadata to confirm that the 
data was written
+   * by Drill.
+   *
+   * This method only checks the first Row Group, because Drill has only ever 
written
+   * a single Row Group per file.
+   *
+   * @param footer
+   * @param columns
+   * @param autoCorrectCorruptDates user setting to allow enabling/disabling 
of auto-correction
+   *                                of corrupt dates. There are some rare 
cases (storing dates thousands
+   *                                of years into the future, with tools other 
than Drill writing files)
+   *                                that would result in the date values being 
"corrected" into bad values.
+   * @return
+   */
+  public static DateCorruptionStatus 
checkForCorruptDateValuesInStatistics(ParquetMetadata footer,
+                                                              List<SchemaPath> 
columns,
+                                                              boolean 
autoCorrectCorruptDates) {
+    // Users can turn-off date correction in cases where we are detecting 
corruption based on the date values
+    // that are unlikely to appear in common datasets. In this case report 
that no correction needs to happen
+    // during the file read
+    if (! autoCorrectCorruptDates) {
+      return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
+    }
+    // Drill produced files have only ever have a single row group, if this 
changes in the future it won't matter
+    // as we will know from the Drill version written in the files that the 
dates are correct
+    int rowGroupIndex = 0;
+    Map<String, SchemaElement> schemaElements = 
ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
+    findDateColWithStatsLoop : for (SchemaPath schemaPath : columns) {
+      List<ColumnDescriptor> parquetColumns = 
footer.getFileMetaData().getSchema().getColumns();
+      for (int i = 0; i < parquetColumns.size(); ++i) {
+        ColumnDescriptor column = parquetColumns.get(i);
+        // this reader only supports flat data, this is restricted in the 
ParquetScanBatchCreator
+        // creating a NameSegment makes sure we are using the standard code 
for comparing names,
+        // currently it is all case-insensitive
+        if (AbstractRecordReader.isStarQuery(columns) || new 
PathSegment.NameSegment(column.getPath()[0]).equals(schemaPath.getRootSegment()))
 {
+          int colIndex = -1;
+          ConvertedType convertedType = 
schemaElements.get(column.getPath()[0]).getConverted_type();
+          if (convertedType != null && 
convertedType.equals(ConvertedType.DATE)) {
+            List<ColumnChunkMetaData> colChunkList = 
footer.getBlocks().get(rowGroupIndex).getColumns();
+            for (int j = 0; j < colChunkList.size(); j++) {
+              if 
(colChunkList.get(j).getPath().equals(ColumnPath.get(column.getPath()))) {
+                colIndex = j;
+                break;
+              }
+            }
+          }
+          if (colIndex == -1) {
+            // column does not appear in this file, skip it
+            continue;
+          }
+          Statistics statistics = 
footer.getBlocks().get(rowGroupIndex).getColumns().get(colIndex).getStatistics();
+          Integer max = (Integer) statistics.genericGetMax();
+          if (statistics.hasNonNullValue()) {
+            if (max > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
+              return DateCorruptionStatus.META_SHOWS_CORRUPTION;
+            }
+          } else {
+            // no statistics, go check the first page
+            return DateCorruptionStatus.META_UNCLEAR_TEST_VALUES;
+          }
+        }
+      }
+    }
+    return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 6c7bc41..8f7ace1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -104,6 +104,9 @@ public class ParquetScanBatchCreator implements 
BatchCreator<ParquetRowGroupScan
           logger.trace("ParquetTrace,Read Footer,{},{},{},{},{},{},{}", "", 
e.getPath(), "", 0, 0, 0, timeToRead);
           footers.put(e.getPath(), footer );
         }
+        boolean autoCorrectCorruptDates = 
rowGroupScan.formatConfig.autoCorrectCorruptDates;
+        ParquetReaderUtility.DateCorruptionStatus containsCorruptDates = 
ParquetReaderUtility.detectCorruptDates(footers.get(e.getPath()), 
rowGroupScan.getColumns(),
+                autoCorrectCorruptDates);
         if 
(!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val
 && !isComplex(footers.get(e.getPath()))) {
           readers.add(
               new ParquetRecordReader(
@@ -112,12 +115,13 @@ public class ParquetScanBatchCreator implements 
BatchCreator<ParquetRowGroupScan
                   fs.getConf(),
                   new 
ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),
                   footers.get(e.getPath()),
-                  rowGroupScan.getColumns()
+                  rowGroupScan.getColumns(),
+                  containsCorruptDates
               )
           );
         } else {
           ParquetMetadata footer = footers.get(e.getPath());
-          readers.add(new DrillParquetReader(context, footer, e, 
columnExplorer.getTableColumns(), fs));
+          readers.add(new DrillParquetReader(context, footer, e, 
columnExplorer.getTableColumns(), fs, containsCorruptDates));
         }
 
         Map<String, String> implicitValues = 
columnExplorer.populateImplicitColumns(e, rowGroupScan.getSelectionRoot());

http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
index e38c51c..ea65615 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReaderFactory.java
@@ -95,7 +95,19 @@ public class ColumnReaderFactory {
           return new FixedByteAlignedReader.FixedBinaryReader(recordReader, 
allocateSize, descriptor, columnChunkMetaData, (VariableWidthVector) v, 
schemaElement);
         }
       } else if (columnChunkMetaData.getType() == 
PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
-        return new FixedByteAlignedReader.DateReader(recordReader, 
allocateSize, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, 
schemaElement);
+        switch(recordReader.getDateCorruptionStatus()) {
+          case META_SHOWS_CORRUPTION:
+            return new FixedByteAlignedReader.CorruptDateReader(recordReader, 
allocateSize, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, 
schemaElement);
+          case META_SHOWS_NO_CORRUPTION:
+            return new FixedByteAlignedReader.DateReader(recordReader, 
allocateSize, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, 
schemaElement);
+          case META_UNCLEAR_TEST_VALUES:
+            return new 
FixedByteAlignedReader.CorruptionDetectingDateReader(recordReader, 
allocateSize, descriptor, columnChunkMetaData, fixedLength, (DateVector) v, 
schemaElement);
+          default:
+            throw new ExecutionSetupException(
+                String.format("Issue setting up parquet reader for date type, 
" +
+                        "unrecognized date corruption status %s. See 
DRILL-4203 for more info.",
+                recordReader.getDateCorruptionStatus()));
+        }
       } else{
         if 
(columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
           switch (columnChunkMetaData.getType()) {
@@ -144,7 +156,19 @@ public class ColumnReaderFactory {
         return new NullableBitReader(recordReader, allocateSize, descriptor, 
columnChunkMetaData,
             fixedLength, (NullableBitVector) v, schemaElement);
       } else if (columnChunkMetaData.getType() == 
PrimitiveType.PrimitiveTypeName.INT32 && convertedType == ConvertedType.DATE){
-        return new 
NullableFixedByteAlignedReaders.NullableDateReader(recordReader, allocateSize, 
descriptor, columnChunkMetaData, fixedLength, (NullableDateVector) v, 
schemaElement);
+        switch(recordReader.getDateCorruptionStatus()) {
+          case META_SHOWS_CORRUPTION:
+            return new 
NullableFixedByteAlignedReaders.NullableCorruptDateReader(recordReader, 
allocateSize, descriptor, columnChunkMetaData, fixedLength, 
(NullableDateVector)v, schemaElement);
+          case META_SHOWS_NO_CORRUPTION:
+            return new 
NullableFixedByteAlignedReaders.NullableDateReader(recordReader, allocateSize, 
descriptor, columnChunkMetaData, fixedLength, (NullableDateVector) v, 
schemaElement);
+          case META_UNCLEAR_TEST_VALUES:
+            return new 
NullableFixedByteAlignedReaders.CorruptionDetectingNullableDateReader(recordReader,
 allocateSize, descriptor, columnChunkMetaData, fixedLength, 
(NullableDateVector) v, schemaElement);
+          default:
+            throw new ExecutionSetupException(
+                String.format("Issue setting up parquet reader for date type, 
" +
+                        "unrecognized date corruption status %s. See 
DRILL-4203 for more info.",
+                    recordReader.getDateCorruptionStatus()));
+        }
       } else if (columnChunkMetaData.getType() == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
         if (convertedType == ConvertedType.DECIMAL) {
           int length = schemaElement.type_length;

http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
index d4b43d8..cccb06f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
@@ -22,7 +22,6 @@ import java.math.BigDecimal;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
 import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
-import org.apache.drill.exec.store.ParquetOutputRecordWriter;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.util.DecimalUtility;
 import org.apache.drill.exec.vector.DateVector;
@@ -119,9 +118,11 @@ class FixedByteAlignedReader<V extends ValueVector> 
extends ColumnReader<V> {
 
   public static class DateReader extends ConvertedReader<DateVector> {
 
+    private final DateVector.Mutator mutator;
     DateReader(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
                     boolean fixedLength, DateVector v, SchemaElement 
schemaElement) throws ExecutionSetupException {
       super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+      mutator = v.getMutator();
     }
 
     @Override
@@ -133,7 +134,67 @@ class FixedByteAlignedReader<V extends ValueVector> 
extends ColumnReader<V> {
         intValue = readIntLittleEndian(bytebuf, start);
       }
 
-      valueVec.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue - 
ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
+      mutator.set(index, DateTimeUtils.fromJulianDay(intValue + 
ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY));
+    }
+  }
+
+  /**
+   * Old versions of Drill were writing a non-standard format for date. See 
DRILL-4203
+   */
+  public static class CorruptDateReader extends ConvertedReader<DateVector> {
+
+    private final DateVector.Mutator mutator;
+
+    CorruptDateReader(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
+                      boolean fixedLength, DateVector v, SchemaElement 
schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+      mutator = v.getMutator();
+    }
+
+    @Override
+    void addNext(int start, int index) {
+      int intValue;
+      if (usingDictionary) {
+        intValue = pageReader.dictionaryValueReader.readInteger();
+      } else {
+        intValue = readIntLittleEndian(bytebuf, start);
+      }
+
+      mutator.set(index, DateTimeUtils.fromJulianDay(intValue + 
ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT));
+    }
+
+  }
+
+  /**
+   * Old versions of Drill were writing a non-standard format for date. See 
DRILL-4203
+   * <p/>
+   * For files that lack enough metadata to determine if the dates are 
corrupt, we must just
+   * correct values when they look corrupt during this low level read.
+   */
+  public static class CorruptionDetectingDateReader extends 
ConvertedReader<DateVector> {
+
+    private final DateVector.Mutator mutator;
+
+    CorruptionDetectingDateReader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData 
columnChunkMetaData,
+                                  boolean fixedLength, DateVector v, 
SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+      mutator = v.getMutator();
+    }
+
+    @Override
+    void addNext(int start, int index) {
+      int intValue;
+      if (usingDictionary) {
+        intValue = pageReader.dictionaryValueReader.readInteger();
+      } else {
+        intValue = readIntLittleEndian(bytebuf, start);
+      }
+
+      if (intValue > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
+        mutator.set(index, DateTimeUtils.fromJulianDay(intValue + 
ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT));
+      } else {
+        mutator.set(index, DateTimeUtils.fromJulianDay(intValue + 
ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY));
+      }
     }
 
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index 800d422..10e0c72 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.expr.holders.NullableDecimal28SparseHolder;
 import org.apache.drill.exec.expr.holders.NullableDecimal38SparseHolder;
-import org.apache.drill.exec.store.ParquetOutputRecordWriter;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.util.DecimalUtility;
 import org.apache.drill.exec.vector.NullableBigIntVector;
@@ -328,12 +327,72 @@ public class NullableFixedByteAlignedReaders {
         intValue = readIntLittleEndian(bytebuf, start);
       }
 
-      valueVec.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue - 
ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
+      valueVec.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue + 
ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY));
     }
 
   }
 
+  /**
+   * Old versions of Drill were writing a non-standard format for date. See 
DRILL-4203
+   */
+  public static class NullableCorruptDateReader extends 
NullableConvertedReader<NullableDateVector> {
+
+    NullableCorruptDateReader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData 
columnChunkMetaData,
+                       boolean fixedLength, NullableDateVector v, 
SchemaElement schemaElement) throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+    }
+
+    @Override
+    void addNext(int start, int index) {
+      int intValue;
+      if (usingDictionary) {
+        intValue =  pageReader.dictionaryValueReader.readInteger();
+      } else {
+        intValue = readIntLittleEndian(bytebuf, start);
+      }
+
+      valueVec.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue + 
ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT));
+    }
+
+  }
+
+  /**
+   * Old versions of Drill were writing a non-standard format for date. See 
DRILL-4203
+   *
+   * For files that lack enough metadata to determine if the dates are 
corrupt, we must just
+   * correct values when they look corrupt during this low level read.
+   */
+  public static class CorruptionDetectingNullableDateReader extends 
NullableConvertedReader<NullableDateVector> {
+
+    NullableDateVector dateVector;
+
+    CorruptionDetectingNullableDateReader(ParquetRecordReader parentReader, 
int allocateSize,
+                                          ColumnDescriptor descriptor, 
ColumnChunkMetaData columnChunkMetaData,
+                                          boolean fixedLength, 
NullableDateVector v, SchemaElement schemaElement)
+        throws ExecutionSetupException {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
+      dateVector = (NullableDateVector) v;
+    }
+
+    @Override
+    void addNext(int start, int index) {
+      int intValue;
+      if (usingDictionary) {
+        intValue =  pageReader.dictionaryValueReader.readInteger();
+      } else {
+        intValue = readIntLittleEndian(bytebuf, start);
+      }
+
+      if (intValue > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
+        dateVector.getMutator().set(index, 
DateTimeUtils.fromJulianDay(intValue + 
ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT));
+      } else {
+        dateVector.getMutator().set(index, 
DateTimeUtils.fromJulianDay(intValue + 
ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY));
+      }
+    }
+  }
+
   public static class NullableDecimal28Reader extends 
NullableConvertedReader<NullableDecimal28SparseVector> {
+
     NullableDecimal28Reader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData 
columnChunkMetaData,
                             boolean fixedLength, NullableDecimal28SparseVector 
v, SchemaElement schemaElement) throws ExecutionSetupException {
       super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);

http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 23c0759..99cf0f5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import com.google.common.collect.ImmutableList;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -40,13 +41,19 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.parquet.ParquetReaderStats;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.store.parquet.ParquetRecordWriter;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.RepeatedValueVector;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.SemanticVersion;
+import org.apache.parquet.VersionParser;
 import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.format.ConvertedType;
 import org.apache.parquet.format.FileMetaData;
 import org.apache.parquet.format.SchemaElement;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
@@ -109,18 +116,21 @@ public class ParquetRecordReader extends 
AbstractRecordReader {
   int rowGroupIndex;
   long totalRecordsRead;
   private final FragmentContext fragmentContext;
+  ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus;
 
   public ParquetReaderStats parquetReaderStats = new ParquetReaderStats();
 
   public ParquetRecordReader(FragmentContext fragmentContext,
-      String path,
-      int rowGroupIndex,
-      FileSystem fs,
-      CodecFactory codecFactory,
-      ParquetMetadata footer,
-      List<SchemaPath> columns) throws ExecutionSetupException {
+                             String path,
+                             int rowGroupIndex,
+                             FileSystem fs,
+                             CodecFactory codecFactory,
+                             ParquetMetadata footer,
+                             List<SchemaPath> columns,
+                             ParquetReaderUtility.DateCorruptionStatus 
dateCorruptionStatus)
+                             throws ExecutionSetupException {
     this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, 
fs, codecFactory, footer,
-        columns);
+        columns, dateCorruptionStatus);
   }
 
   public ParquetRecordReader(
@@ -131,17 +141,29 @@ public class ParquetRecordReader extends 
AbstractRecordReader {
       FileSystem fs,
       CodecFactory codecFactory,
       ParquetMetadata footer,
-      List<SchemaPath> columns) throws ExecutionSetupException {
+      List<SchemaPath> columns,
+      ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) throws 
ExecutionSetupException {
     this.hadoopPath = new Path(path);
     this.fileSystem = fs;
     this.codecFactory = codecFactory;
     this.rowGroupIndex = rowGroupIndex;
     this.batchSize = batchSize;
     this.footer = footer;
+    this.dateCorruptionStatus = dateCorruptionStatus;
     this.fragmentContext = fragmentContext;
     setColumns(columns);
   }
 
+  /**
+   * Flag indicating if the old non-standard data format appears
+   * in this file, see DRILL-4203.
+   *
+   * @return true if the dates are corrupted and need to be corrected
+   */
+  public ParquetReaderUtility.DateCorruptionStatus getDateCorruptionStatus() {
+    return dateCorruptionStatus;
+  }
+
   public CodecFactory getCodecFactory() {
     return codecFactory;
   }
@@ -207,6 +229,31 @@ public class ParquetRecordReader extends 
AbstractRecordReader {
     return operatorContext;
   }
 
+  /**
+   * Returns data type length for a given {@see ColumnDescriptor} and it's 
corresponding
+   * {@see SchemaElement}. Neither is enough information alone as the max
+   * repetition level (indicating if it is an array type) is in the 
ColumnDescriptor and
+   * the length of a fixed width field is stored at the schema level.
+   *
+   * @param column
+   * @param se
+   * @return the length if fixed width, else -1
+   */
+  private int getDataTypeLength(ColumnDescriptor column, SchemaElement se) {
+    if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
+      if (column.getMaxRepetitionLevel() > 0) {
+        return -1;
+      }
+      if (column.getType() == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+        return se.getType_length() * 8;
+      } else {
+        return getTypeLengthInBits(column.getType());
+      }
+    } else {
+      return -1;
+    }
+  }
+
   @Override
   public void setup(OperatorContext operatorContext, OutputMutator output) 
throws ExecutionSetupException {
     this.operatorContext = operatorContext;
@@ -233,16 +280,11 @@ public class ParquetRecordReader extends 
AbstractRecordReader {
 
     // TODO - figure out how to deal with this better once we add nested 
reading, note also look where this map is used below
     // store a map from column name to converted types if they are non-null
-    HashMap<String, SchemaElement> schemaElements = new HashMap<>();
-    fileMetaData = new 
ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, 
footer);
-    for (SchemaElement se : fileMetaData.getSchema()) {
-      schemaElements.put(se.getName(), se);
-    }
+    Map<String, SchemaElement> schemaElements = 
ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
 
     // loop to add up the length of the fixed width columns and build the 
schema
     for (int i = 0; i < columns.size(); ++i) {
       column = columns.get(i);
-      logger.debug("name: " + fileMetaData.getSchema().get(i).name);
       SchemaElement se = schemaElements.get(column.getPath()[0]);
       MajorType mt = ParquetToDrillTypeConverter.toMajorType(column.getType(), 
se.getType_length(),
           getDataMode(column), se, fragmentContext.getOptions());
@@ -251,18 +293,11 @@ public class ParquetRecordReader extends 
AbstractRecordReader {
         continue;
       }
       columnsToScan++;
-      // sum the lengths of all of the fixed length fields
-      if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
-        if (column.getMaxRepetitionLevel() > 0) {
-          allFieldsFixedLength = false;
-        }
-        if (column.getType() == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
-            bitWidthAllFixedFields += se.getType_length() * 8;
-        } else {
-          bitWidthAllFixedFields += getTypeLengthInBits(column.getType());
-        }
-      } else {
+      int dataTypeLength = getDataTypeLength(column, se);
+      if (dataTypeLength == -1) {
         allFieldsFixedLength = false;
+      } else {
+        bitWidthAllFixedFields += dataTypeLength;
       }
     }
 //    rowGroupOffset = 
footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset();

http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
index 5bc8ad2..32295b9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -25,6 +25,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.expr.holders.BigIntHolder;
@@ -44,7 +45,6 @@ import org.apache.drill.exec.expr.holders.VarBinaryHolder;
 import org.apache.drill.exec.expr.holders.VarCharHolder;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.ParquetOutputRecordWriter;
 import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.util.DecimalUtility;
@@ -87,16 +87,23 @@ public class DrillParquetGroupConverter extends 
GroupConverter {
   private MapWriter mapWriter;
   private final OutputMutator mutator;
   private final OptionManager options;
+  // See DRILL-4203
+  private final ParquetReaderUtility.DateCorruptionStatus 
containsCorruptedDates;
 
-  public DrillParquetGroupConverter(OutputMutator mutator, ComplexWriterImpl 
complexWriter, MessageType schema, Collection<SchemaPath> columns, 
OptionManager options) {
-    this(mutator, complexWriter.rootAsMap(), schema, columns, options);
+  public DrillParquetGroupConverter(OutputMutator mutator, ComplexWriterImpl 
complexWriter, MessageType schema,
+                                    Collection<SchemaPath> columns, 
OptionManager options,
+                                    ParquetReaderUtility.DateCorruptionStatus 
containsCorruptedDates) {
+    this(mutator, complexWriter.rootAsMap(), schema, columns, options, 
containsCorruptedDates);
   }
 
   // This function assumes that the fields in the schema parameter are in the 
same order as the fields in the columns parameter. The
   // columns parameter may have fields that are not present in the schema, 
though.
-  public DrillParquetGroupConverter(OutputMutator mutator, MapWriter 
mapWriter, GroupType schema, Collection<SchemaPath> columns, OptionManager 
options) {
+  public DrillParquetGroupConverter(OutputMutator mutator, MapWriter 
mapWriter, GroupType schema,
+                                    Collection<SchemaPath> columns, 
OptionManager options,
+                                    ParquetReaderUtility.DateCorruptionStatus 
containsCorruptedDates) {
     this.mapWriter = mapWriter;
     this.mutator = mutator;
+    this.containsCorruptedDates = containsCorruptedDates;
     converters = Lists.newArrayList();
     this.options = options;
 
@@ -144,10 +151,12 @@ public class DrillParquetGroupConverter extends 
GroupConverter {
           c.add(s);
         }
         if (rep != Repetition.REPEATED) {
-          DrillParquetGroupConverter converter = new 
DrillParquetGroupConverter(mutator, mapWriter.map(name), type.asGroupType(), c, 
options);
+          DrillParquetGroupConverter converter = new 
DrillParquetGroupConverter(
+              mutator, mapWriter.map(name), type.asGroupType(), c, options, 
containsCorruptedDates);
           converters.add(converter);
         } else {
-          DrillParquetGroupConverter converter = new 
DrillParquetGroupConverter(mutator, mapWriter.list(name).map(), 
type.asGroupType(), c, options);
+          DrillParquetGroupConverter converter = new 
DrillParquetGroupConverter(
+              mutator, mapWriter.list(name).map(), type.asGroupType(), c, 
options, containsCorruptedDates);
           converters.add(converter);
         }
       } else {
@@ -173,7 +182,19 @@ public class DrillParquetGroupConverter extends 
GroupConverter {
           }
           case DATE: {
             DateWriter writer = type.getRepetition() == Repetition.REPEATED ? 
mapWriter.list(name).date() : mapWriter.date(name);
-            return new DrillDateConverter(writer);
+            switch(containsCorruptedDates) {
+              case META_SHOWS_CORRUPTION:
+                return new DrillCorruptedDateConverter(writer);
+              case META_SHOWS_NO_CORRUPTION:
+                return new DrillDateConverter(writer);
+              case META_UNCLEAR_TEST_VALUES:
+                return new CorruptionDetectingDateConverter(writer);
+              default:
+                throw new DrillRuntimeException(
+                    String.format("Issue setting up parquet reader for date 
type, " +
+                            "unrecognized date corruption status %s. See 
DRILL-4203 for more info.",
+                        containsCorruptedDates));
+            }
           }
           case TIME_MILLIS: {
             TimeWriter writer = type.getRepetition() == Repetition.REPEATED ? 
mapWriter.list(name).time() : mapWriter.time(name);
@@ -325,6 +346,40 @@ public class DrillParquetGroupConverter extends 
GroupConverter {
     }
   }
 
+  public static class CorruptionDetectingDateConverter extends 
PrimitiveConverter {
+    private DateWriter writer;
+    private DateHolder holder = new DateHolder();
+
+    public CorruptionDetectingDateConverter(DateWriter writer) {
+      this.writer = writer;
+    }
+
+    @Override
+    public void addInt(int value) {
+      if (value > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
+        holder.value = DateTimeUtils.fromJulianDay(value + 
ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT);
+      } else {
+        holder.value = DateTimeUtils.fromJulianDay(value + 
ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY);
+      }
+      writer.write(holder);
+    }
+  }
+
+  public static class DrillCorruptedDateConverter extends PrimitiveConverter {
+    private DateWriter writer;
+    private DateHolder holder = new DateHolder();
+
+    public DrillCorruptedDateConverter(DateWriter writer) {
+      this.writer = writer;
+    }
+
+    @Override
+    public void addInt(int value) {
+      holder.value = DateTimeUtils.fromJulianDay(value + 
ParquetReaderUtility.CORRECT_CORRUPT_DATE_SHIFT);
+      writer.write(holder);
+    }
+  }
+
   public static class DrillDateConverter extends PrimitiveConverter {
     private DateWriter writer;
     private DateHolder holder = new DateHolder();
@@ -335,7 +390,7 @@ public class DrillParquetGroupConverter extends 
GroupConverter {
 
     @Override
     public void addInt(int value) {
-      holder.value = DateTimeUtils.fromJulianDay(value - 
ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5);
+      holder.value = DateTimeUtils.fromJulianDay(value + 
ParquetReaderUtility.SHIFT_PARQUET_DAY_COUNT_TO_JULIAN_DAY);
       writer.write(holder);
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 224d6eb..68d3bbb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -42,6 +42,7 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableIntVector;
@@ -104,9 +105,12 @@ public class DrillParquetReader extends 
AbstractRecordReader {
   private List<SchemaPath> columnsNotFound=null;
   boolean noColumnsFound = false; // true if none of the columns in the 
projection list is found in the schema
 
+  // See DRILL-4203
+  private final ParquetReaderUtility.DateCorruptionStatus 
containsCorruptedDates;
 
   public DrillParquetReader(FragmentContext fragmentContext, ParquetMetadata 
footer, RowGroupReadEntry entry,
-      List<SchemaPath> columns, DrillFileSystem fileSystem) {
+      List<SchemaPath> columns, DrillFileSystem fileSystem, 
ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates) {
+    this.containsCorruptedDates = containsCorruptedDates;
     this.footer = footer;
     this.fileSystem = fileSystem;
     this.entry = entry;
@@ -263,7 +267,7 @@ public class DrillParquetReader extends 
AbstractRecordReader {
         // Discard the columns not found in the schema when create 
DrillParquetRecordMaterializer, since they have been added to output already.
         final Collection<SchemaPath> columns = columnsNotFound == null || 
columnsNotFound.size() == 0 ? getColumns(): 
CollectionUtils.subtract(getColumns(), columnsNotFound);
         recordMaterializer = new DrillParquetRecordMaterializer(output, 
writer, projection, columns,
-            fragmentContext.getOptions());
+            fragmentContext.getOptions(), containsCorruptedDates);
         primitiveVectors = writer.getMapVector().getPrimitiveVectors();
         recordReader = columnIO.getRecordReader(pageReadStore, 
recordMaterializer);
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
index 6b7edc4..2d778bd 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.parquet2;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
 import org.apache.parquet.io.api.GroupConverter;
@@ -35,9 +36,10 @@ public class DrillParquetRecordMaterializer extends 
RecordMaterializer<Void> {
   private ComplexWriter complexWriter;
 
   public DrillParquetRecordMaterializer(OutputMutator mutator, ComplexWriter 
complexWriter, MessageType schema,
-                                        Collection<SchemaPath> columns, 
OptionManager options) {
+                                        Collection<SchemaPath> columns, 
OptionManager options,
+                                        
ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates) {
     this.complexWriter = complexWriter;
-    root = new DrillParquetGroupConverter(mutator, complexWriter.rootAsMap(), 
schema, columns, options);
+    root = new DrillParquetGroupConverter(mutator, complexWriter.rootAsMap(), 
schema, columns, options, containsCorruptedDates);
   }
 
   public void setPosition(int position) {

http://git-wip-us.apache.org/repos/asf/drill/blob/ae34d5c3/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java 
b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
index 9df9139..7033be6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
@@ -732,8 +732,8 @@ public class DrillTestWrapper {
           sb.append(printRecord(actualRecords.get(actualRecordDisplayCount)));
         }
         String actualRecordExamples = sb.toString();
-        throw new Exception(String.format("After matching %d records, did not 
find expected record in result set: %s\n\n" +
-            "Some examples of expected records:%s\n\n Some examples of records 
returned by the test query:%s",
+        throw new Exception(String.format("After matching %d records, did not 
find expected record in result set:\n %s\n\n" +
+            "Some examples of expected records:\n%s\n\n Some examples of 
records returned by the test query:\n%s",
             counter, printRecord(expectedRecord), expectedRecordExamples, 
actualRecordExamples));
       } else {
         actualRecords.remove(i);

Reply via email to