[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-06-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/789


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-05-25 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r118590127
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
+/**
+ * Base strategy for reading a batch of Parquet records.
+ */
+public abstract class BatchReader {
+
+  protected final ReadState readState;
+
+  public BatchReader(ReadState readState) {
+this.readState = readState;
+  }
+
+  public int readBatch() throws Exception {
+ColumnReader firstColumnStatus = readState.getFirstColumnReader();
+long recordsToRead = Math.min(getReadCount(firstColumnStatus), 
readState.getRecordsToRead());
+int readCount = readRecords(firstColumnStatus, recordsToRead);
+readState.fillNullVectors(readCount);
+return readCount;
+  }
+
+  protected abstract long getReadCount(ColumnReader firstColumnStatus);
+
+  protected abstract int readRecords(ColumnReader firstColumnStatus, 
long recordsToRead) throws Exception;
+
+  protected void readAllFixedFields(long recordsToRead) throws Exception {
+Stopwatch timer = Stopwatch.createStarted();
+if(readState.useAsyncColReader()){
+  readAllFixedFieldsParallel(recordsToRead);
+} else {
+  readAllFixedFieldsSerial(recordsToRead);
+}
+
readState.parquetReaderStats().timeFixedColumnRead.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
+  }
+
+  protected void readAllFixedFieldsSerial(long recordsToRead) throws 
IOException {
+for (ColumnReader crs : readState.getColumnReaders()) {
+  crs.processPages(recordsToRead);
+}
+  }
+
+  protected void readAllFixedFieldsParallel(long recordsToRead) throws 
Exception {
+ArrayList futures = Lists.newArrayList();
+for (ColumnReader crs : readState.getColumnReaders()) {
+  Future f = crs.processPagesAsync(recordsToRead);
+  futures.add(f);
+}
+Exception exception = null;
+for(Future f: futures){
+  if (exception != null) {
+f.cancel(true);
+  } else {
+try {
+  f.get();
+} catch (Exception e) {
+  f.cancel(true);
+  exception = e;
+}
+  }
+}
+if (exception != null) {
+  throw exception;
+}
+  }
+
+  /**
+   * Strategy for reading mock records. (What are these?)
+   */
--- End diff --

Fixed. Finally found out what this means. Thanks Jinfeng!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-05-25 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r118591443
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetInternalsTest.java
 ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.drill.TestBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.FixtureBuilder;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ParquetInternalsTest extends ClusterTest {
+
+  @BeforeClass
+  public static void setup( ) throws Exception {
+FixtureBuilder builder = ClusterFixture.builder()
+  // Set options, etc.
+  ;
+startCluster(builder);
+  }
+
+  @Test
+  public void testFixedWidth() throws Exception {
+String sql = "SELECT l_orderkey, l_partkey, l_suppkey, l_linenumber, 
l_quantity\n" +
+ "FROM `cp`.`tpch/lineitem.parquet` LIMIT 20";
+//client.queryBuilder().sql(sql).printCsv();
+
+Map typeMap = new HashMap<>();
+typeMap.put(TestBuilder.parsePath("l_orderkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_partkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_suppkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_linenumber"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_quantity"), 
Types.required(TypeProtos.MinorType.FLOAT8));
+client.testBuilder()
+  .sqlQuery(sql)
+  .unOrdered()
+  .csvBaselineFile("parquet/expected/fixedWidth.csv")
+  .baselineColumns("l_orderkey", "l_partkey", "l_suppkey", 
"l_linenumber", "l_quantity")
+  .baselineTypes(typeMap)
+  .build()
+  .run();
+  }
+
+
--- End diff --

Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-05-25 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r118591078
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 ---
@@ -308,163 +232,50 @@ public FragmentContext getFragmentContext() {
   }
 
   /**
-   * Returns data type length for a given {@see ColumnDescriptor} and it's 
corresponding
-   * {@see SchemaElement}. Neither is enough information alone as the max
-   * repetition level (indicating if it is an array type) is in the 
ColumnDescriptor and
-   * the length of a fixed width field is stored at the schema level.
-   *
-   * @return the length if fixed width, else -1
+   * Prepare the Parquet reader. First determine the set of columns to 
read (the schema
+   * for this read.) Then, create a state object to track the read across 
calls to
+   * the reader next() method. Finally, create one of three 
readers to
+   * read batches depending on whether this scan is for only fixed-width 
fields,
+   * contains at least one variable-width field, or is a "mock" scan 
consisting
+   * only of null fields (fields in the SELECT clause but not in the 
Parquet file.)
*/
-  private int getDataTypeLength(ColumnDescriptor column, SchemaElement se) 
{
-if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
-  if (column.getMaxRepetitionLevel() > 0) {
-return -1;
-  }
-  if (column.getType() == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
-return se.getType_length() * 8;
-  } else {
-return getTypeLengthInBits(column.getType());
-  }
-} else {
-  return -1;
-}
-  }
 
-  @SuppressWarnings({ "resource", "unchecked" })
   @Override
   public void setup(OperatorContext operatorContext, OutputMutator output) 
throws ExecutionSetupException {
 this.operatorContext = operatorContext;
-if (!isStarQuery()) {
-  columnsFound = new boolean[getColumns().size()];
-  nullFilledVectors = new ArrayList<>();
-}
-columnStatuses = new ArrayList<>();
-List columns = 
footer.getFileMetaData().getSchema().getColumns();
-allFieldsFixedLength = true;
-ColumnDescriptor column;
-ColumnChunkMetaData columnChunkMetaData;
-int columnsToScan = 0;
-mockRecordsRead = 0;
-
-MaterializedField field;
+schema = new ParquetSchema(fragmentContext.getOptions(), 
rowGroupIndex, isStarQuery() ? null : getColumns());
 
 logger.debug("Reading row group({}) with {} records in file {}.", 
rowGroupIndex, footer.getBlocks().get(rowGroupIndex).getRowCount(),
 hadoopPath.toUri().getPath());
-totalRecordsRead = 0;
-
-// TODO - figure out how to deal with this better once we add nested 
reading, note also look where this map is used below
-// store a map from column name to converted types if they are non-null
-Map schemaElements = 
ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
-
-// loop to add up the length of the fixed width columns and build the 
schema
-for (int i = 0; i < columns.size(); ++i) {
-  column = columns.get(i);
-  SchemaElement se = schemaElements.get(column.getPath()[0]);
-  MajorType mt = 
ParquetToDrillTypeConverter.toMajorType(column.getType(), se.getType_length(),
-  getDataMode(column), se, fragmentContext.getOptions());
-  field = MaterializedField.create(toFieldName(column.getPath()), mt);
-  if ( ! fieldSelected(field)) {
-continue;
-  }
-  columnsToScan++;
-  int dataTypeLength = getDataTypeLength(column, se);
-  if (dataTypeLength == -1) {
-allFieldsFixedLength = false;
-  } else {
-bitWidthAllFixedFields += dataTypeLength;
-  }
-}
-
-if (columnsToScan != 0  && allFieldsFixedLength) {
-  recordsPerBatch = (int) Math.min(Math.min(batchSize / 
bitWidthAllFixedFields,
-  footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 
DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH);
-}
-else {
-  recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
-}
 
 try {
-  ValueVector vector;
-  SchemaElement schemaElement;
-  final ArrayList 
varLengthColumns = new ArrayList<>();
-  // initialize all of the column read status objects
-  boolean fieldFixedLength;
-  // the column chunk meta-data is not guaranteed to be in the same 
order as the columns in the schema
-  // a map is constructed for fast access to the correct 
columnChunkMetadata to correspond
-

[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-05-25 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r118590602
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.util.Map;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+/**
+ * Represents a single column read from the Parquet file by the record 
reader.
+ */
+
+public class ParquetColumnMetadata {
+
+  ColumnDescriptor column;
+  private SchemaElement se;
+  MaterializedField field;
+  int length;
+  private MajorType type;
+  ColumnChunkMetaData columnChunkMetaData;
+  private ValueVector vector;
+
+  public ParquetColumnMetadata(ColumnDescriptor column) {
+this.column = column;
+  }
+
+  public void resolveDrillType(Map schemaElements, 
OptionManager options) {
+se = schemaElements.get(column.getPath()[0]);
+type = ParquetToDrillTypeConverter.toMajorType(column.getType(), 
se.getType_length(),
+getDataMode(column), se, options);
+field = MaterializedField.create(toFieldName(column.getPath()), type);
+length = getDataTypeLength();
+  }
+
+  private String toFieldName(String[] paths) {
+return SchemaPath.getCompoundPath(paths).getAsUnescapedPath();
+  }
+
+  private TypeProtos.DataMode getDataMode(ColumnDescriptor column) {
+if (isRepeated()) {
+  return DataMode.REPEATED;
+} else if (column.getMaxDefinitionLevel() == 0) {
+  return TypeProtos.DataMode.REQUIRED;
+} else {
+  return TypeProtos.DataMode.OPTIONAL;
+}
+  }
+
+  /**
+   * @param type
+   * @param type a fixed length type from the parquet library enum
+   * @return the length in pageDataByteArray of the type
+   */
+  public static int getTypeLengthInBits(PrimitiveTypeName type) {
+switch (type) {
+  case INT64:   return 64;
+  case INT32:   return 32;
+  case BOOLEAN: return 1;
+  case FLOAT:   return 32;
+  case DOUBLE:  return 64;
+  case INT96:   return 96;
+  // binary and fixed length byte array
+  default:
+throw new IllegalStateException("Length cannot be determined for 
type " + type);
+}
+  }
+
+  /**
+   * Returns data type length for a given {@see ColumnDescriptor} and it's 
corresponding
+   * {@see SchemaElement}. Neither is enough information alone as the max
+   * repetition level (indicating if it is an array type) is in the 
ColumnDescriptor and
+   * the length of a fixed width field is stored at the schema level.
+   *
+   * @return the length if fixed width, else -1
+   */
+  private int getDataTypeLength() {
+if (! isFixedLength()) {
+  return -1;
+} else if (isRepeated()) {
+  return 

[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-05-25 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r118591297
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
 ---
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Mapping from the schema of the Parquet file to that of the record reader
+ * to the schema that Drill and the Parquet reader uses.
+ */
+
+public class ParquetSchema {
+  /**
+   * Set of columns specified in the SELECT clause. Will be null for
+   * a SELECT * query.
+   */
+  private final Collection selectedCols;
+  /**
+   * Parallel list to the columns list above, it is used to determine the 
subset of the project
+   * pushdown columns that do not appear in this file.
+   */
+  private final boolean[] columnsFound;
+  private final OptionManager options;
+  private final int rowGroupIndex;
+  private ParquetMetadata footer;
+  /**
+   * List of metadata for selected columns. This list does two things.
+   * First, it identifies the Parquet columns we wish to select. Second, it
+   * provides metadata for those columns. Note that null columns (columns
+   * in the SELECT clause but not in the file) appear elsewhere.
+   */
+  private List selectedColumnMetadata = new 
ArrayList<>();
+  private int bitWidthAllFixedFields;
+  private boolean allFieldsFixedLength;
+  private long groupRecordCount;
+  private int recordsPerBatch;
+
+  /**
+   * Build the Parquet schema. The schema can be based on a "SELECT *",
+   * meaning we want all columns defined in the Parquet file. In this case,
+   * the list of selected columns is null. Or, the query can be based on
+   * an explicit list of selected columns. In this case, the
+   * columns need not exist in the Parquet file. If a column does not 
exist,
+   * the reader returns null for that column. If no selected column exists
+   * in the file, then we return "mock" records: records with only null
+   * values, but repeated for the number of rows in the Parquet file.
+   *
+   * @param options session options
+   * @param rowGroupIndex row group to read
+   * @param selectedCols columns specified in the SELECT clause, or null if
+   * this is a SELECT * query
+   */
+
+  public ParquetSchema(OptionManager options, int rowGroupIndex, 
Collection selectedCols) {
+this.options = options;
+this.rowGroupIndex = rowGroupIndex;
+this.selectedCols = selectedCols;
+if (selectedCols == null) {
+  columnsFound = null;
+} else {
+  columnsFound = new boolean[selectedCols.size()];
+}
+  }
+
+  /**
+   

[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-05-25 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r118590427
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.util.Map;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+/**
+ * Represents a single column read from the Parquet file by the record 
reader.
+ */
+
+public class ParquetColumnMetadata {
+
+  ColumnDescriptor column;
+  private SchemaElement se;
+  MaterializedField field;
+  int length;
+  private MajorType type;
+  ColumnChunkMetaData columnChunkMetaData;
+  private ValueVector vector;
+
+  public ParquetColumnMetadata(ColumnDescriptor column) {
+this.column = column;
+  }
+
+  public void resolveDrillType(Map schemaElements, 
OptionManager options) {
+se = schemaElements.get(column.getPath()[0]);
+type = ParquetToDrillTypeConverter.toMajorType(column.getType(), 
se.getType_length(),
+getDataMode(column), se, options);
+field = MaterializedField.create(toFieldName(column.getPath()), type);
+length = getDataTypeLength();
+  }
+
+  private String toFieldName(String[] paths) {
+return SchemaPath.getCompoundPath(paths).getAsUnescapedPath();
+  }
+
+  private TypeProtos.DataMode getDataMode(ColumnDescriptor column) {
+if (isRepeated()) {
+  return DataMode.REPEATED;
+} else if (column.getMaxDefinitionLevel() == 0) {
+  return TypeProtos.DataMode.REQUIRED;
+} else {
+  return TypeProtos.DataMode.OPTIONAL;
+}
+  }
+
+  /**
+   * @param type
+   * @param type a fixed length type from the parquet library enum
+   * @return the length in pageDataByteArray of the type
+   */
+  public static int getTypeLengthInBits(PrimitiveTypeName type) {
+switch (type) {
+  case INT64:   return 64;
+  case INT32:   return 32;
+  case BOOLEAN: return 1;
+  case FLOAT:   return 32;
+  case DOUBLE:  return 64;
+  case INT96:   return 96;
+  // binary and fixed length byte array
+  default:
+throw new IllegalStateException("Length cannot be determined for 
type " + type);
+}
+  }
+
+  /**
+   * Returns data type length for a given {@see ColumnDescriptor} and it's 
corresponding
+   * {@see SchemaElement}. Neither is enough information alone as the max
+   * repetition level (indicating if it is an array type) is in the 
ColumnDescriptor and
+   * the length of a fixed width field is stored at the schema level.
+   *
+   * @return the length if fixed width, else -1
+   */
+  private int getDataTypeLength() {
+if (! isFixedLength()) {
+  return -1;
--- End diff --

Original code, but sure, 

[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-04-06 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r110266898
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
+/**
+ * Base strategy for reading a batch of Parquet records.
+ */
+public abstract class BatchReader {
+
+  protected final ReadState readState;
+
+  public BatchReader(ReadState readState) {
+this.readState = readState;
+  }
+
+  public int readBatch() throws Exception {
+ColumnReader firstColumnStatus = readState.getFirstColumnReader();
+long recordsToRead = Math.min(getReadCount(firstColumnStatus), 
readState.getRecordsToRead());
+int readCount = readRecords(firstColumnStatus, recordsToRead);
+readState.fillNullVectors(readCount);
+return readCount;
+  }
+
+  protected abstract long getReadCount(ColumnReader firstColumnStatus);
+
+  protected abstract int readRecords(ColumnReader firstColumnStatus, 
long recordsToRead) throws Exception;
+
+  protected void readAllFixedFields(long recordsToRead) throws Exception {
+Stopwatch timer = Stopwatch.createStarted();
+if(readState.useAsyncColReader()){
+  readAllFixedFieldsParallel(recordsToRead);
+} else {
+  readAllFixedFieldsSerial(recordsToRead);
+}
+
readState.parquetReaderStats().timeFixedColumnRead.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
+  }
+
+  protected void readAllFixedFieldsSerial(long recordsToRead) throws 
IOException {
+for (ColumnReader crs : readState.getColumnReaders()) {
+  crs.processPages(recordsToRead);
+}
+  }
+
+  protected void readAllFixedFieldsParallel(long recordsToRead) throws 
Exception {
+ArrayList futures = Lists.newArrayList();
+for (ColumnReader crs : readState.getColumnReaders()) {
+  Future f = crs.processPagesAsync(recordsToRead);
+  futures.add(f);
+}
+Exception exception = null;
+for(Future f: futures){
+  if (exception != null) {
+f.cancel(true);
+  } else {
+try {
+  f.get();
+} catch (Exception e) {
+  f.cancel(true);
+  exception = e;
+}
+  }
+}
+if (exception != null) {
+  throw exception;
+}
+  }
+
+  /**
+   * Strategy for reading mock records. (What are these?)
+   */
--- End diff --

Please add brief explanation what these are instead of "what are these ?"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-04-06 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r110278523
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetInternalsTest.java
 ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.drill.TestBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.FixtureBuilder;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ParquetInternalsTest extends ClusterTest {
+
+  @BeforeClass
+  public static void setup( ) throws Exception {
+FixtureBuilder builder = ClusterFixture.builder()
+  // Set options, etc.
+  ;
+startCluster(builder);
+  }
+
+  @Test
+  public void testFixedWidth() throws Exception {
+String sql = "SELECT l_orderkey, l_partkey, l_suppkey, l_linenumber, 
l_quantity\n" +
+ "FROM `cp`.`tpch/lineitem.parquet` LIMIT 20";
+//client.queryBuilder().sql(sql).printCsv();
+
+Map typeMap = new HashMap<>();
+typeMap.put(TestBuilder.parsePath("l_orderkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_partkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_suppkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_linenumber"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_quantity"), 
Types.required(TypeProtos.MinorType.FLOAT8));
+client.testBuilder()
+  .sqlQuery(sql)
+  .unOrdered()
+  .csvBaselineFile("parquet/expected/fixedWidth.csv")
+  .baselineColumns("l_orderkey", "l_partkey", "l_suppkey", 
"l_linenumber", "l_quantity")
+  .baselineTypes(typeMap)
+  .build()
+  .run();
+  }
+
+
--- End diff --

remove extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-04-06 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r110267977
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.util.Map;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+/**
+ * Represents a single column read from the Parquet file by the record 
reader.
+ */
+
+public class ParquetColumnMetadata {
+
+  ColumnDescriptor column;
+  private SchemaElement se;
+  MaterializedField field;
+  int length;
+  private MajorType type;
+  ColumnChunkMetaData columnChunkMetaData;
+  private ValueVector vector;
+
+  public ParquetColumnMetadata(ColumnDescriptor column) {
+this.column = column;
+  }
+
+  public void resolveDrillType(Map schemaElements, 
OptionManager options) {
+se = schemaElements.get(column.getPath()[0]);
+type = ParquetToDrillTypeConverter.toMajorType(column.getType(), 
se.getType_length(),
+getDataMode(column), se, options);
+field = MaterializedField.create(toFieldName(column.getPath()), type);
+length = getDataTypeLength();
+  }
+
+  private String toFieldName(String[] paths) {
+return SchemaPath.getCompoundPath(paths).getAsUnescapedPath();
+  }
+
+  private TypeProtos.DataMode getDataMode(ColumnDescriptor column) {
+if (isRepeated()) {
+  return DataMode.REPEATED;
+} else if (column.getMaxDefinitionLevel() == 0) {
+  return TypeProtos.DataMode.REQUIRED;
+} else {
+  return TypeProtos.DataMode.OPTIONAL;
+}
+  }
+
+  /**
+   * @param type
+   * @param type a fixed length type from the parquet library enum
+   * @return the length in pageDataByteArray of the type
+   */
+  public static int getTypeLengthInBits(PrimitiveTypeName type) {
+switch (type) {
+  case INT64:   return 64;
+  case INT32:   return 32;
+  case BOOLEAN: return 1;
+  case FLOAT:   return 32;
+  case DOUBLE:  return 64;
+  case INT96:   return 96;
+  // binary and fixed length byte array
+  default:
+throw new IllegalStateException("Length cannot be determined for 
type " + type);
+}
+  }
+
+  /**
+   * Returns data type length for a given {@see ColumnDescriptor} and it's 
corresponding
+   * {@see SchemaElement}. Neither is enough information alone as the max
+   * repetition level (indicating if it is an array type) is in the 
ColumnDescriptor and
+   * the length of a fixed width field is stored at the schema level.
+   *
+   * @return the length if fixed width, else -1
+   */
+  private int getDataTypeLength() {
+if (! isFixedLength()) {
+  return -1;
--- End diff --

Use static final instead of 

[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-04-06 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r110279210
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
 ---
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Mapping from the schema of the Parquet file to that of the record reader
+ * to the schema that Drill and the Parquet reader uses.
+ */
+
+public class ParquetSchema {
+  /**
+   * Set of columns specified in the SELECT clause. Will be null for
+   * a SELECT * query.
+   */
+  private final Collection selectedCols;
+  /**
+   * Parallel list to the columns list above, it is used to determine the 
subset of the project
+   * pushdown columns that do not appear in this file.
+   */
+  private final boolean[] columnsFound;
+  private final OptionManager options;
+  private final int rowGroupIndex;
+  private ParquetMetadata footer;
+  /**
+   * List of metadata for selected columns. This list does two things.
+   * First, it identifies the Parquet columns we wish to select. Second, it
+   * provides metadata for those columns. Note that null columns (columns
+   * in the SELECT clause but not in the file) appear elsewhere.
+   */
+  private List selectedColumnMetadata = new 
ArrayList<>();
+  private int bitWidthAllFixedFields;
+  private boolean allFieldsFixedLength;
+  private long groupRecordCount;
+  private int recordsPerBatch;
+
+  /**
+   * Build the Parquet schema. The schema can be based on a "SELECT *",
+   * meaning we want all columns defined in the Parquet file. In this case,
+   * the list of selected columns is null. Or, the query can be based on
+   * an explicit list of selected columns. In this case, the
+   * columns need not exist in the Parquet file. If a column does not 
exist,
+   * the reader returns null for that column. If no selected column exists
+   * in the file, then we return "mock" records: records with only null
+   * values, but repeated for the number of rows in the Parquet file.
+   *
+   * @param options session options
+   * @param rowGroupIndex row group to read
+   * @param selectedCols columns specified in the SELECT clause, or null if
+   * this is a SELECT * query
+   */
+
+  public ParquetSchema(OptionManager options, int rowGroupIndex, 
Collection selectedCols) {
+this.options = options;
+this.rowGroupIndex = rowGroupIndex;
+this.selectedCols = selectedCols;
+if (selectedCols == null) {
+  columnsFound = null;
+} else {
+  columnsFound = new boolean[selectedCols.size()];
+}
+  }
+
+  /**
+   * 

[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-04-06 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r110277141
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 ---
@@ -308,163 +232,50 @@ public FragmentContext getFragmentContext() {
   }
 
   /**
-   * Returns data type length for a given {@see ColumnDescriptor} and it's 
corresponding
-   * {@see SchemaElement}. Neither is enough information alone as the max
-   * repetition level (indicating if it is an array type) is in the 
ColumnDescriptor and
-   * the length of a fixed width field is stored at the schema level.
-   *
-   * @return the length if fixed width, else -1
+   * Prepare the Parquet reader. First determine the set of columns to 
read (the schema
+   * for this read.) Then, create a state object to track the read across 
calls to
+   * the reader next() method. Finally, create one of three 
readers to
+   * read batches depending on whether this scan is for only fixed-width 
fields,
+   * contains at least one variable-width field, or is a "mock" scan 
consisting
+   * only of null fields (fields in the SELECT clause but not in the 
Parquet file.)
*/
-  private int getDataTypeLength(ColumnDescriptor column, SchemaElement se) 
{
-if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
-  if (column.getMaxRepetitionLevel() > 0) {
-return -1;
-  }
-  if (column.getType() == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
-return se.getType_length() * 8;
-  } else {
-return getTypeLengthInBits(column.getType());
-  }
-} else {
-  return -1;
-}
-  }
 
-  @SuppressWarnings({ "resource", "unchecked" })
   @Override
   public void setup(OperatorContext operatorContext, OutputMutator output) 
throws ExecutionSetupException {
 this.operatorContext = operatorContext;
-if (!isStarQuery()) {
-  columnsFound = new boolean[getColumns().size()];
-  nullFilledVectors = new ArrayList<>();
-}
-columnStatuses = new ArrayList<>();
-List columns = 
footer.getFileMetaData().getSchema().getColumns();
-allFieldsFixedLength = true;
-ColumnDescriptor column;
-ColumnChunkMetaData columnChunkMetaData;
-int columnsToScan = 0;
-mockRecordsRead = 0;
-
-MaterializedField field;
+schema = new ParquetSchema(fragmentContext.getOptions(), 
rowGroupIndex, isStarQuery() ? null : getColumns());
 
 logger.debug("Reading row group({}) with {} records in file {}.", 
rowGroupIndex, footer.getBlocks().get(rowGroupIndex).getRowCount(),
 hadoopPath.toUri().getPath());
-totalRecordsRead = 0;
-
-// TODO - figure out how to deal with this better once we add nested 
reading, note also look where this map is used below
-// store a map from column name to converted types if they are non-null
-Map schemaElements = 
ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
-
-// loop to add up the length of the fixed width columns and build the 
schema
-for (int i = 0; i < columns.size(); ++i) {
-  column = columns.get(i);
-  SchemaElement se = schemaElements.get(column.getPath()[0]);
-  MajorType mt = 
ParquetToDrillTypeConverter.toMajorType(column.getType(), se.getType_length(),
-  getDataMode(column), se, fragmentContext.getOptions());
-  field = MaterializedField.create(toFieldName(column.getPath()), mt);
-  if ( ! fieldSelected(field)) {
-continue;
-  }
-  columnsToScan++;
-  int dataTypeLength = getDataTypeLength(column, se);
-  if (dataTypeLength == -1) {
-allFieldsFixedLength = false;
-  } else {
-bitWidthAllFixedFields += dataTypeLength;
-  }
-}
-
-if (columnsToScan != 0  && allFieldsFixedLength) {
-  recordsPerBatch = (int) Math.min(Math.min(batchSize / 
bitWidthAllFixedFields,
-  footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 
DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH);
-}
-else {
-  recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
-}
 
 try {
-  ValueVector vector;
-  SchemaElement schemaElement;
-  final ArrayList 
varLengthColumns = new ArrayList<>();
-  // initialize all of the column read status objects
-  boolean fieldFixedLength;
-  // the column chunk meta-data is not guaranteed to be in the same 
order as the columns in the schema
-  // a map is constructed for fast access to the correct 
columnChunkMetadata to correspond
-  // 

[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-04-06 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r110273380
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetColumnMetadata.java
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.util.Map;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.RepeatedValueVector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+/**
+ * Represents a single column read from the Parquet file by the record 
reader.
+ */
+
+public class ParquetColumnMetadata {
+
+  ColumnDescriptor column;
+  private SchemaElement se;
+  MaterializedField field;
+  int length;
+  private MajorType type;
+  ColumnChunkMetaData columnChunkMetaData;
+  private ValueVector vector;
+
+  public ParquetColumnMetadata(ColumnDescriptor column) {
+this.column = column;
+  }
+
+  public void resolveDrillType(Map schemaElements, 
OptionManager options) {
+se = schemaElements.get(column.getPath()[0]);
+type = ParquetToDrillTypeConverter.toMajorType(column.getType(), 
se.getType_length(),
+getDataMode(column), se, options);
+field = MaterializedField.create(toFieldName(column.getPath()), type);
+length = getDataTypeLength();
+  }
+
+  private String toFieldName(String[] paths) {
+return SchemaPath.getCompoundPath(paths).getAsUnescapedPath();
+  }
+
+  private TypeProtos.DataMode getDataMode(ColumnDescriptor column) {
+if (isRepeated()) {
+  return DataMode.REPEATED;
+} else if (column.getMaxDefinitionLevel() == 0) {
+  return TypeProtos.DataMode.REQUIRED;
+} else {
+  return TypeProtos.DataMode.OPTIONAL;
+}
+  }
+
+  /**
+   * @param type
+   * @param type a fixed length type from the parquet library enum
+   * @return the length in pageDataByteArray of the type
+   */
+  public static int getTypeLengthInBits(PrimitiveTypeName type) {
+switch (type) {
+  case INT64:   return 64;
+  case INT32:   return 32;
+  case BOOLEAN: return 1;
+  case FLOAT:   return 32;
+  case DOUBLE:  return 64;
+  case INT96:   return 96;
+  // binary and fixed length byte array
+  default:
+throw new IllegalStateException("Length cannot be determined for 
type " + type);
+}
+  }
+
+  /**
+   * Returns data type length for a given {@see ColumnDescriptor} and it's 
corresponding
+   * {@see SchemaElement}. Neither is enough information alone as the max
+   * repetition level (indicating if it is an array type) is in the 
ColumnDescriptor and
+   * the length of a fixed width field is stored at the schema level.
+   *
+   * @return the length if fixed width, else -1
+   */
+  private int getDataTypeLength() {
+if (! isFixedLength()) {
+  return -1;
+} else if (isRepeated()) {
+  return -1;
 

[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-04-03 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r109513793
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Mapping from the schema of the Parquet file to that of the record reader
+ * to the schema that Drill and the Parquet reader uses.
+ */
+
+public class ParquetSchema {
+  private Collection selectedCols;
+  // This is a parallel list to the columns list above, it is used to 
determine the subset of the project
+  // pushdown columns that do not appear in this file
+  private boolean[] columnsFound;
+  private ParquetMetadata footer;
+  private Map schemaElements;
+  private int columnsToScan;
+  private List columns;
+  private List columnMd = new ArrayList<>();
+  private int bitWidthAllFixedFields;
+  private boolean allFieldsFixedLength = true;
+  private long groupRecordCount;
+  private int recordsPerBatch;
+  private int rowGroupIndex;
+  private final OptionManager options;
+
+  public ParquetSchema(OptionManager options, int rowGroupIndex) {
+selectedCols = null;
+this.rowGroupIndex = rowGroupIndex;
+this.options = options;
+  }
+
+  public ParquetSchema(OptionManager options, Collection 
selectedCols) {
+this.options = options;
--- End diff --

Merged constructors as suggested and added comments. Please let me know 
where additional comments are needed to clarify what's happening.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-04-03 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r109511016
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 ---
@@ -307,164 +231,49 @@ public FragmentContext getFragmentContext() {
 return fragmentContext;
   }
 
-  /**
-   * Returns data type length for a given {@see ColumnDescriptor} and it's 
corresponding
-   * {@see SchemaElement}. Neither is enough information alone as the max
-   * repetition level (indicating if it is an array type) is in the 
ColumnDescriptor and
-   * the length of a fixed width field is stored at the schema level.
-   *
-   * @return the length if fixed width, else -1
-   */
-  private int getDataTypeLength(ColumnDescriptor column, SchemaElement se) 
{
-if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
-  if (column.getMaxRepetitionLevel() > 0) {
-return -1;
-  }
-  if (column.getType() == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
-return se.getType_length() * 8;
-  } else {
-return getTypeLengthInBits(column.getType());
-  }
-} else {
-  return -1;
-}
-  }
-
-  @SuppressWarnings({ "resource", "unchecked" })
   @Override
   public void setup(OperatorContext operatorContext, OutputMutator output) 
throws ExecutionSetupException {
 this.operatorContext = operatorContext;
-if (!isStarQuery()) {
-  columnsFound = new boolean[getColumns().size()];
-  nullFilledVectors = new ArrayList<>();
+if (isStarQuery()) {
+  schema = new ParquetSchema(fragmentContext.getOptions(), 
rowGroupIndex);
+} else {
+  schema = new ParquetSchema(fragmentContext.getOptions(), 
getColumns());
--- End diff --

Fixed. Added comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-04-03 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r109519725
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.parquet.ParquetReaderStats;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+
+/**
+ * Internal state for reading from a Parquet file.
+ */
+
+public class ReadState {
+  private final ParquetSchema schema;
+  ParquetReaderStats parquetReaderStats;
+  private VarLenBinaryReader varLengthReader;
+  // For columns not found in the file, we need to return a schema element 
with the correct number of values
+  // at that position in the schema. Currently this requires a vector be 
present. Here is a list of all of these vectors
+  // that need only have their value count set at the end of each call to 
next(), as the values default to null.
+  private List nullFilledVectors;
+  // Keeps track of the number of records returned in the case where only 
columns outside of the file were selected.
+  // No actual data needs to be read out of the file, we only need to 
return batches until we have 'read' the number of
+  // records specified in the row group metadata
+  long mockRecordsRead;
+  private List columnStatuses = new ArrayList<>();
+  private long numRecordsToRead; // number of records to read
+  private long totalRecordsRead;
+  boolean useAsyncColReader;
+
+  public ReadState(ParquetSchema schema, ParquetReaderStats 
parquetReaderStats, long numRecordsToRead, boolean useAsyncColReader) {
+this.schema = schema;
+this.parquetReaderStats = parquetReaderStats;
+this.useAsyncColReader = useAsyncColReader;
+if (! schema.isStarQuery()) {
+  nullFilledVectors = new ArrayList<>();
+}
+mockRecordsRead = 0;
+// Callers can pass -1 if they want to read all rows.
+if (numRecordsToRead == 
ParquetRecordReader.NUM_RECORDS_TO_READ_NOT_SPECIFIED) {
+  this.numRecordsToRead = schema.rowCount();
+} else {
+  assert (numRecordsToRead >= 0);
+  this.numRecordsToRead = Math.min(numRecordsToRead, 
schema.rowCount());
+}
+  }
+
+  @SuppressWarnings("unchecked")
+  public void buildReader(ParquetRecordReader reader, OutputMutator 
output) throws Exception {
+final ArrayList 
varLengthColumns = new ArrayList<>();
+// initialize all of the column read status objects
+BlockMetaData rowGroupMetadata = schema.getRowGroupMetadata();
+Map columnChunkMetadataPositionsInList = 
schema.buildChunkMap(rowGroupMetadata);
+for (ParquetColumnMetadata colMd : schema.getColumnMetadata()) {
+  ColumnDescriptor column = colMd.column;
+  colMd.columnChunkMetaData = rowGroupMetadata.getColumns().get(
+  
columnChunkMetadataPositionsInList.get(Arrays.toString(column.getPath(;
+  colMd.buildVector(output);
+  if (! colMd.isFixedLength( )) {
+// create a reader and add it to the appropriate list
+varLengthColumns.add(colMd.makeVariableWidthReader(reader));
+  } else if (colMd.isRepeated()) {
+varLengthColumns.add(colMd.makeRepeatedFixedWidthReader(reader, 
schema.getRecordsPerBatch()));
+  }
+  else {
+columnStatuses.add(colMd.makeFixedWidthReader(reader, 
schema.getRecordsPerBatch()));
+  }
+}
  

[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-04-03 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r109496281
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
+/**
+ * Base strategy for reading a batch of Parquet records.
+ */
+public abstract class BatchReader {
+
+  protected final ReadState readState;
+
+  public BatchReader(ReadState readState) {
+this.readState = readState;
+  }
+
+  public int readBatch() throws Exception {
+ColumnReader firstColumnStatus = readState.getFirstColumnStatus();
+long recordsToRead = Math.min(getReadCount(firstColumnStatus), 
readState.getRecordsToRead());
+int readCount = readRecords(firstColumnStatus, recordsToRead);
+readState.fillNullVectors(readCount);
+return readCount;
+  }
+
+  protected abstract long getReadCount(ColumnReader firstColumnStatus);
+
+  protected abstract int readRecords(ColumnReader firstColumnStatus, 
long recordsToRead) throws Exception;
+
+  protected void readAllFixedFields(long recordsToRead) throws Exception {
+Stopwatch timer = Stopwatch.createStarted();
+if(readState.useAsyncColReader()){
+  readAllFixedFieldsParallel(recordsToRead);
+} else {
+  readAllFixedFieldsSerial(recordsToRead);
+}
+
readState.parquetReaderStats.timeFixedColumnRead.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
+  }
+
+  protected void readAllFixedFieldsSerial(long recordsToRead) throws 
IOException {
+for (ColumnReader crs : readState.getReaders()) {
+  crs.processPages(recordsToRead);
+}
+  }
+
+  protected void readAllFixedFieldsParallel(long recordsToRead) throws 
Exception {
+ArrayList futures = Lists.newArrayList();
+for (ColumnReader crs : readState.getReaders()) {
+  Future f = crs.processPagesAsync(recordsToRead);
+  futures.add(f);
+}
+Exception exception = null;
+for(Future f: futures){
+  if (exception != null) {
+f.cancel(true);
+  } else {
+try {
+  f.get();
+} catch (Exception e) {
+  f.cancel(true);
+  exception = e;
+}
+  }
+}
+if (exception != null) {
+  throw exception;
+}
+  }
+
+  /**
+   * Strategy for reading mock records. (What are these?)
+   */
+
+  public static class MockBatchReader extends BatchReader {
+
+public MockBatchReader(ReadState readState) {
+  super(readState);
+}
+
+@Override
+protected long getReadCount(ColumnReader firstColumnStatus) {
+  if (readState.mockRecordsRead == 
readState.schema().getGroupRecordCount()) {
+return 0;
--- End diff --

Was being lazy. We have a mock read count and a "real" read count, but only 
one or the other is used. Got rid of the mock count and just used the same 
record count variable for all cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-04-03 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r109515637
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Mapping from the schema of the Parquet file to that of the record reader
+ * to the schema that Drill and the Parquet reader uses.
+ */
+
+public class ParquetSchema {
+  private Collection selectedCols;
+  // This is a parallel list to the columns list above, it is used to 
determine the subset of the project
+  // pushdown columns that do not appear in this file
+  private boolean[] columnsFound;
+  private ParquetMetadata footer;
+  private Map schemaElements;
+  private int columnsToScan;
+  private List columns;
+  private List columnMd = new ArrayList<>();
+  private int bitWidthAllFixedFields;
+  private boolean allFieldsFixedLength = true;
+  private long groupRecordCount;
+  private int recordsPerBatch;
+  private int rowGroupIndex;
+  private final OptionManager options;
+
+  public ParquetSchema(OptionManager options, int rowGroupIndex) {
+selectedCols = null;
+this.rowGroupIndex = rowGroupIndex;
+this.options = options;
+  }
+
+  public ParquetSchema(OptionManager options, Collection 
selectedCols) {
+this.options = options;
+this.selectedCols = selectedCols;
+columnsFound = new boolean[selectedCols.size()];
+  }
+
+  public void buildSchema(ParquetMetadata footer, long batchSize) throws 
Exception {
+this.footer = footer;
+columns = footer.getFileMetaData().getSchema().getColumns();
+groupRecordCount = footer.getBlocks().get(rowGroupIndex).getRowCount();
+loadParquetSchema();
+computeFixedPart();
+//rowGroupOffset = 
footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset();
+
+if (columnsToScan != 0  && allFieldsFixedLength) {
+  recordsPerBatch = (int) Math.min(Math.min(batchSize / 
bitWidthAllFixedFields,
+  footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 
ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH);
+}
+else {
+  recordsPerBatch = 
ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
+}
+  }
+
+  private void loadParquetSchema() {
+// TODO - figure out how to deal with this better once we add nested 
reading, note also look where this map is used below
+// store a map from column name to converted types if they are non-null
+schemaElements = 

[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-04-03 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r109512919
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 ---
@@ -307,164 +231,49 @@ public FragmentContext getFragmentContext() {
 return fragmentContext;
   }
 
-  /**
-   * Returns data type length for a given {@see ColumnDescriptor} and it's 
corresponding
-   * {@see SchemaElement}. Neither is enough information alone as the max
-   * repetition level (indicating if it is an array type) is in the 
ColumnDescriptor and
-   * the length of a fixed width field is stored at the schema level.
-   *
-   * @return the length if fixed width, else -1
-   */
-  private int getDataTypeLength(ColumnDescriptor column, SchemaElement se) 
{
-if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
-  if (column.getMaxRepetitionLevel() > 0) {
-return -1;
-  }
-  if (column.getType() == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
-return se.getType_length() * 8;
-  } else {
-return getTypeLengthInBits(column.getType());
-  }
-} else {
-  return -1;
-}
-  }
-
-  @SuppressWarnings({ "resource", "unchecked" })
   @Override
   public void setup(OperatorContext operatorContext, OutputMutator output) 
throws ExecutionSetupException {
 this.operatorContext = operatorContext;
-if (!isStarQuery()) {
-  columnsFound = new boolean[getColumns().size()];
-  nullFilledVectors = new ArrayList<>();
+if (isStarQuery()) {
+  schema = new ParquetSchema(fragmentContext.getOptions(), 
rowGroupIndex);
+} else {
+  schema = new ParquetSchema(fragmentContext.getOptions(), 
getColumns());
 }
-columnStatuses = new ArrayList<>();
-List columns = 
footer.getFileMetaData().getSchema().getColumns();
-allFieldsFixedLength = true;
-ColumnDescriptor column;
-ColumnChunkMetaData columnChunkMetaData;
-int columnsToScan = 0;
-mockRecordsRead = 0;
 
-MaterializedField field;
+//ParquetMetadataConverter metaConverter = new 
ParquetMetadataConverter();
+//FileMetaData fileMetaData;
 
--- End diff --

Was being paranoid, but sure, removed the lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-04-03 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r109520484
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetInternalsTest.java
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.drill.TestBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.FixtureBuilder;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ParquetInternalsTest extends ClusterTest {
+
+  @BeforeClass
+  public static void setup( ) throws Exception {
+FixtureBuilder builder = ClusterFixture.builder()
+  // Set options, etc.
+  ;
+startCluster(builder);
+  }
+
+  @Test
+  public void testFixedWidth() throws Exception {
+String sql = "SELECT l_orderkey, l_partkey, l_suppkey, l_linenumber, 
l_quantity\n" +
+ "FROM `cp`.`tpch/lineitem.parquet` LIMIT 20";
+//client.queryBuilder().sql(sql).printCsv();
+
+Map typeMap = new HashMap<>();
+typeMap.put(TestBuilder.parsePath("l_orderkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_partkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_suppkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_linenumber"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_quantity"), 
Types.required(TypeProtos.MinorType.FLOAT8));
+client.testBuilder()
+  .sqlQuery(sql)
+  .unOrdered()
+  .csvBaselineFile("parquet/expected/fixedWidth.csv")
+  .baselineColumns("l_orderkey", "l_partkey", "l_suppkey", 
"l_linenumber", "l_quantity")
+  .baselineTypes(typeMap)
+  .build()
+  .run();
+  }
+
+
+  @Test
+  public void testVariableWidth() throws Exception {
+String sql = "SELECT s_name, s_address, s_phone, s_comment\n" +
+ "FROM `cp`.`tpch/supplier.parquet` LIMIT 20";
+client.queryBuilder().sql(sql).printCsv();
--- End diff --

The fluent style is not self-explanatory? Build a query, using a SQL 
statement, that prints CSV to the console?

Actually, just commented out the line since it was primarily to help me 
debug the test; the real testing is below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-04-03 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r109520632
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetInternalsTest.java
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.drill.TestBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.FixtureBuilder;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ParquetInternalsTest extends ClusterTest {
+
+  @BeforeClass
+  public static void setup( ) throws Exception {
+FixtureBuilder builder = ClusterFixture.builder()
+  // Set options, etc.
+  ;
+startCluster(builder);
+  }
+
+  @Test
+  public void testFixedWidth() throws Exception {
+String sql = "SELECT l_orderkey, l_partkey, l_suppkey, l_linenumber, 
l_quantity\n" +
+ "FROM `cp`.`tpch/lineitem.parquet` LIMIT 20";
+//client.queryBuilder().sql(sql).printCsv();
+
+Map typeMap = new HashMap<>();
+typeMap.put(TestBuilder.parsePath("l_orderkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_partkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_suppkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_linenumber"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_quantity"), 
Types.required(TypeProtos.MinorType.FLOAT8));
+client.testBuilder()
+  .sqlQuery(sql)
+  .unOrdered()
+  .csvBaselineFile("parquet/expected/fixedWidth.csv")
+  .baselineColumns("l_orderkey", "l_partkey", "l_suppkey", 
"l_linenumber", "l_quantity")
+  .baselineTypes(typeMap)
+  .build()
+  .run();
+  }
+
+
+  @Test
+  public void testVariableWidth() throws Exception {
+String sql = "SELECT s_name, s_address, s_phone, s_comment\n" +
+ "FROM `cp`.`tpch/supplier.parquet` LIMIT 20";
+client.queryBuilder().sql(sql).printCsv();
+
+Map typeMap = new HashMap<>();
+typeMap.put(TestBuilder.parsePath("s_name"), 
Types.required(TypeProtos.MinorType.VARCHAR));
+typeMap.put(TestBuilder.parsePath("s_address"), 
Types.required(TypeProtos.MinorType.VARCHAR));
+typeMap.put(TestBuilder.parsePath("s_phone"), 
Types.required(TypeProtos.MinorType.VARCHAR));
+typeMap.put(TestBuilder.parsePath("s_comment"), 
Types.required(TypeProtos.MinorType.VARCHAR));
+client.testBuilder()
+  .sqlQuery(sql)
+  .unOrdered()
+  .csvBaselineFile("parquet/expected/variableWidth.csv")
+  .baselineColumns("s_name", "s_address", "s_phone", "s_comment")
+  .baselineTypes(typeMap)
+  .build()
+  .run();
+  }
+
+  @Test
+  public void testMixedWidth() throws Exception {
+String sql = "SELECT s_suppkey, s_name, s_address, s_phone, 
s_acctbal\n" +
+ "FROM `cp`.`tpch/supplier.parquet` LIMIT 20";
+//client.queryBuilder().sql(sql).printCsv();
+
+Map typeMap = new HashMap<>();
+typeMap.put(TestBuilder.parsePath("s_suppkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("s_name"), 
Types.required(TypeProtos.MinorType.VARCHAR));
+typeMap.put(TestBuilder.parsePath("s_address"), 
Types.required(TypeProtos.MinorType.VARCHAR));
+

[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-03-29 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r108667453
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetInternalsTest.java
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.drill.TestBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.FixtureBuilder;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ParquetInternalsTest extends ClusterTest {
+
+  @BeforeClass
+  public static void setup( ) throws Exception {
+FixtureBuilder builder = ClusterFixture.builder()
+  // Set options, etc.
+  ;
+startCluster(builder);
+  }
+
+  @Test
+  public void testFixedWidth() throws Exception {
+String sql = "SELECT l_orderkey, l_partkey, l_suppkey, l_linenumber, 
l_quantity\n" +
+ "FROM `cp`.`tpch/lineitem.parquet` LIMIT 20";
+//client.queryBuilder().sql(sql).printCsv();
+
+Map typeMap = new HashMap<>();
+typeMap.put(TestBuilder.parsePath("l_orderkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_partkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_suppkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_linenumber"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_quantity"), 
Types.required(TypeProtos.MinorType.FLOAT8));
+client.testBuilder()
+  .sqlQuery(sql)
+  .unOrdered()
+  .csvBaselineFile("parquet/expected/fixedWidth.csv")
+  .baselineColumns("l_orderkey", "l_partkey", "l_suppkey", 
"l_linenumber", "l_quantity")
+  .baselineTypes(typeMap)
+  .build()
+  .run();
+  }
+
+
+  @Test
+  public void testVariableWidth() throws Exception {
+String sql = "SELECT s_name, s_address, s_phone, s_comment\n" +
+ "FROM `cp`.`tpch/supplier.parquet` LIMIT 20";
+client.queryBuilder().sql(sql).printCsv();
--- End diff --

do you want to comment this line ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-03-29 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r108560194
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Mapping from the schema of the Parquet file to that of the record reader
+ * to the schema that Drill and the Parquet reader uses.
+ */
+
+public class ParquetSchema {
+  private Collection selectedCols;
+  // This is a parallel list to the columns list above, it is used to 
determine the subset of the project
+  // pushdown columns that do not appear in this file
+  private boolean[] columnsFound;
+  private ParquetMetadata footer;
+  private Map schemaElements;
+  private int columnsToScan;
+  private List columns;
+  private List columnMd = new ArrayList<>();
+  private int bitWidthAllFixedFields;
+  private boolean allFieldsFixedLength = true;
+  private long groupRecordCount;
+  private int recordsPerBatch;
+  private int rowGroupIndex;
+  private final OptionManager options;
+
+  public ParquetSchema(OptionManager options, int rowGroupIndex) {
+selectedCols = null;
+this.rowGroupIndex = rowGroupIndex;
+this.options = options;
+  }
+
+  public ParquetSchema(OptionManager options, Collection 
selectedCols) {
+this.options = options;
--- End diff --

It is not clear which constructor is supposed to be used when. Please add 
some comments. why is rowGroupIndex not needed for the second case ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-03-29 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r108693574
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BatchReader.java
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
+/**
+ * Base strategy for reading a batch of Parquet records.
+ */
+public abstract class BatchReader {
+
+  protected final ReadState readState;
+
+  public BatchReader(ReadState readState) {
+this.readState = readState;
+  }
+
+  public int readBatch() throws Exception {
+ColumnReader firstColumnStatus = readState.getFirstColumnStatus();
+long recordsToRead = Math.min(getReadCount(firstColumnStatus), 
readState.getRecordsToRead());
+int readCount = readRecords(firstColumnStatus, recordsToRead);
+readState.fillNullVectors(readCount);
+return readCount;
+  }
+
+  protected abstract long getReadCount(ColumnReader firstColumnStatus);
+
+  protected abstract int readRecords(ColumnReader firstColumnStatus, 
long recordsToRead) throws Exception;
+
+  protected void readAllFixedFields(long recordsToRead) throws Exception {
+Stopwatch timer = Stopwatch.createStarted();
+if(readState.useAsyncColReader()){
+  readAllFixedFieldsParallel(recordsToRead);
+} else {
+  readAllFixedFieldsSerial(recordsToRead);
+}
+
readState.parquetReaderStats.timeFixedColumnRead.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
+  }
+
+  protected void readAllFixedFieldsSerial(long recordsToRead) throws 
IOException {
+for (ColumnReader crs : readState.getReaders()) {
+  crs.processPages(recordsToRead);
+}
+  }
+
+  protected void readAllFixedFieldsParallel(long recordsToRead) throws 
Exception {
+ArrayList futures = Lists.newArrayList();
+for (ColumnReader crs : readState.getReaders()) {
+  Future f = crs.processPagesAsync(recordsToRead);
+  futures.add(f);
+}
+Exception exception = null;
+for(Future f: futures){
+  if (exception != null) {
+f.cancel(true);
+  } else {
+try {
+  f.get();
+} catch (Exception e) {
+  f.cancel(true);
+  exception = e;
+}
+  }
+}
+if (exception != null) {
+  throw exception;
+}
+  }
+
+  /**
+   * Strategy for reading mock records. (What are these?)
+   */
+
+  public static class MockBatchReader extends BatchReader {
+
+public MockBatchReader(ReadState readState) {
+  super(readState);
+}
+
+@Override
+protected long getReadCount(ColumnReader firstColumnStatus) {
+  if (readState.mockRecordsRead == 
readState.schema().getGroupRecordCount()) {
+return 0;
--- End diff --

How about moving mockRecordsRead to this class instead of keeping it in 
readState ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-03-29 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r108557760
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 ---
@@ -307,164 +231,49 @@ public FragmentContext getFragmentContext() {
 return fragmentContext;
   }
 
-  /**
-   * Returns data type length for a given {@see ColumnDescriptor} and it's 
corresponding
-   * {@see SchemaElement}. Neither is enough information alone as the max
-   * repetition level (indicating if it is an array type) is in the 
ColumnDescriptor and
-   * the length of a fixed width field is stored at the schema level.
-   *
-   * @return the length if fixed width, else -1
-   */
-  private int getDataTypeLength(ColumnDescriptor column, SchemaElement se) 
{
-if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
-  if (column.getMaxRepetitionLevel() > 0) {
-return -1;
-  }
-  if (column.getType() == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
-return se.getType_length() * 8;
-  } else {
-return getTypeLengthInBits(column.getType());
-  }
-} else {
-  return -1;
-}
-  }
-
-  @SuppressWarnings({ "resource", "unchecked" })
   @Override
   public void setup(OperatorContext operatorContext, OutputMutator output) 
throws ExecutionSetupException {
 this.operatorContext = operatorContext;
-if (!isStarQuery()) {
-  columnsFound = new boolean[getColumns().size()];
-  nullFilledVectors = new ArrayList<>();
+if (isStarQuery()) {
+  schema = new ParquetSchema(fragmentContext.getOptions(), 
rowGroupIndex);
+} else {
+  schema = new ParquetSchema(fragmentContext.getOptions(), 
getColumns());
 }
-columnStatuses = new ArrayList<>();
-List columns = 
footer.getFileMetaData().getSchema().getColumns();
-allFieldsFixedLength = true;
-ColumnDescriptor column;
-ColumnChunkMetaData columnChunkMetaData;
-int columnsToScan = 0;
-mockRecordsRead = 0;
 
-MaterializedField field;
+//ParquetMetadataConverter metaConverter = new 
ParquetMetadataConverter();
+//FileMetaData fileMetaData;
 
--- End diff --

instead of commenting, remove these lines if not needed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-03-29 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r108561492
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 ---
@@ -307,164 +231,49 @@ public FragmentContext getFragmentContext() {
 return fragmentContext;
   }
 
-  /**
-   * Returns data type length for a given {@see ColumnDescriptor} and it's 
corresponding
-   * {@see SchemaElement}. Neither is enough information alone as the max
-   * repetition level (indicating if it is an array type) is in the 
ColumnDescriptor and
-   * the length of a fixed width field is stored at the schema level.
-   *
-   * @return the length if fixed width, else -1
-   */
-  private int getDataTypeLength(ColumnDescriptor column, SchemaElement se) 
{
-if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
-  if (column.getMaxRepetitionLevel() > 0) {
-return -1;
-  }
-  if (column.getType() == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
-return se.getType_length() * 8;
-  } else {
-return getTypeLengthInBits(column.getType());
-  }
-} else {
-  return -1;
-}
-  }
-
-  @SuppressWarnings({ "resource", "unchecked" })
   @Override
   public void setup(OperatorContext operatorContext, OutputMutator output) 
throws ExecutionSetupException {
 this.operatorContext = operatorContext;
-if (!isStarQuery()) {
-  columnsFound = new boolean[getColumns().size()];
-  nullFilledVectors = new ArrayList<>();
+if (isStarQuery()) {
+  schema = new ParquetSchema(fragmentContext.getOptions(), 
rowGroupIndex);
+} else {
+  schema = new ParquetSchema(fragmentContext.getOptions(), 
getColumns());
--- End diff --

why do we need to pass rowGroupIndex in one case and not other ? can we add 
comments here ? Is it possible to have a single constructor for ParquetSchema ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-03-29 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r108667588
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetInternalsTest.java
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.drill.TestBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.FixtureBuilder;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ParquetInternalsTest extends ClusterTest {
+
+  @BeforeClass
+  public static void setup( ) throws Exception {
+FixtureBuilder builder = ClusterFixture.builder()
+  // Set options, etc.
+  ;
+startCluster(builder);
+  }
+
+  @Test
+  public void testFixedWidth() throws Exception {
+String sql = "SELECT l_orderkey, l_partkey, l_suppkey, l_linenumber, 
l_quantity\n" +
+ "FROM `cp`.`tpch/lineitem.parquet` LIMIT 20";
+//client.queryBuilder().sql(sql).printCsv();
+
+Map typeMap = new HashMap<>();
+typeMap.put(TestBuilder.parsePath("l_orderkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_partkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_suppkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_linenumber"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("l_quantity"), 
Types.required(TypeProtos.MinorType.FLOAT8));
+client.testBuilder()
+  .sqlQuery(sql)
+  .unOrdered()
+  .csvBaselineFile("parquet/expected/fixedWidth.csv")
+  .baselineColumns("l_orderkey", "l_partkey", "l_suppkey", 
"l_linenumber", "l_quantity")
+  .baselineTypes(typeMap)
+  .build()
+  .run();
+  }
+
+
+  @Test
+  public void testVariableWidth() throws Exception {
+String sql = "SELECT s_name, s_address, s_phone, s_comment\n" +
+ "FROM `cp`.`tpch/supplier.parquet` LIMIT 20";
+client.queryBuilder().sql(sql).printCsv();
+
+Map typeMap = new HashMap<>();
+typeMap.put(TestBuilder.parsePath("s_name"), 
Types.required(TypeProtos.MinorType.VARCHAR));
+typeMap.put(TestBuilder.parsePath("s_address"), 
Types.required(TypeProtos.MinorType.VARCHAR));
+typeMap.put(TestBuilder.parsePath("s_phone"), 
Types.required(TypeProtos.MinorType.VARCHAR));
+typeMap.put(TestBuilder.parsePath("s_comment"), 
Types.required(TypeProtos.MinorType.VARCHAR));
+client.testBuilder()
+  .sqlQuery(sql)
+  .unOrdered()
+  .csvBaselineFile("parquet/expected/variableWidth.csv")
+  .baselineColumns("s_name", "s_address", "s_phone", "s_comment")
+  .baselineTypes(typeMap)
+  .build()
+  .run();
+  }
+
+  @Test
+  public void testMixedWidth() throws Exception {
+String sql = "SELECT s_suppkey, s_name, s_address, s_phone, 
s_acctbal\n" +
+ "FROM `cp`.`tpch/supplier.parquet` LIMIT 20";
+//client.queryBuilder().sql(sql).printCsv();
+
+Map typeMap = new HashMap<>();
+typeMap.put(TestBuilder.parsePath("s_suppkey"), 
Types.required(TypeProtos.MinorType.INT));
+typeMap.put(TestBuilder.parsePath("s_name"), 
Types.required(TypeProtos.MinorType.VARCHAR));
+typeMap.put(TestBuilder.parsePath("s_address"), 
Types.required(TypeProtos.MinorType.VARCHAR));
+

[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-03-29 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r108559596
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Mapping from the schema of the Parquet file to that of the record reader
+ * to the schema that Drill and the Parquet reader uses.
+ */
+
+public class ParquetSchema {
+  private Collection selectedCols;
+  // This is a parallel list to the columns list above, it is used to 
determine the subset of the project
+  // pushdown columns that do not appear in this file
+  private boolean[] columnsFound;
+  private ParquetMetadata footer;
+  private Map schemaElements;
+  private int columnsToScan;
+  private List columns;
+  private List columnMd = new ArrayList<>();
+  private int bitWidthAllFixedFields;
+  private boolean allFieldsFixedLength = true;
+  private long groupRecordCount;
+  private int recordsPerBatch;
+  private int rowGroupIndex;
+  private final OptionManager options;
+
+  public ParquetSchema(OptionManager options, int rowGroupIndex) {
+selectedCols = null;
+this.rowGroupIndex = rowGroupIndex;
+this.options = options;
+  }
+
+  public ParquetSchema(OptionManager options, Collection 
selectedCols) {
+this.options = options;
+this.selectedCols = selectedCols;
+columnsFound = new boolean[selectedCols.size()];
+  }
+
+  public void buildSchema(ParquetMetadata footer, long batchSize) throws 
Exception {
+this.footer = footer;
+columns = footer.getFileMetaData().getSchema().getColumns();
+groupRecordCount = footer.getBlocks().get(rowGroupIndex).getRowCount();
+loadParquetSchema();
+computeFixedPart();
+//rowGroupOffset = 
footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset();
+
+if (columnsToScan != 0  && allFieldsFixedLength) {
+  recordsPerBatch = (int) Math.min(Math.min(batchSize / 
bitWidthAllFixedFields,
+  footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 
ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH);
+}
+else {
+  recordsPerBatch = 
ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
+}
+  }
+
+  private void loadParquetSchema() {
+// TODO - figure out how to deal with this better once we add nested 
reading, note also look where this map is used below
+// store a map from column name to converted types if they are non-null
+schemaElements = 

[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-03-29 Thread ppadma
Github user ppadma commented on a diff in the pull request:

https://github.com/apache/drill/pull/789#discussion_r108692365
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.parquet.ParquetReaderStats;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+
+/**
+ * Internal state for reading from a Parquet file.
+ */
+
+public class ReadState {
+  private final ParquetSchema schema;
+  ParquetReaderStats parquetReaderStats;
+  private VarLenBinaryReader varLengthReader;
+  // For columns not found in the file, we need to return a schema element 
with the correct number of values
+  // at that position in the schema. Currently this requires a vector be 
present. Here is a list of all of these vectors
+  // that need only have their value count set at the end of each call to 
next(), as the values default to null.
+  private List nullFilledVectors;
+  // Keeps track of the number of records returned in the case where only 
columns outside of the file were selected.
+  // No actual data needs to be read out of the file, we only need to 
return batches until we have 'read' the number of
+  // records specified in the row group metadata
+  long mockRecordsRead;
+  private List columnStatuses = new ArrayList<>();
+  private long numRecordsToRead; // number of records to read
+  private long totalRecordsRead;
+  boolean useAsyncColReader;
+
+  public ReadState(ParquetSchema schema, ParquetReaderStats 
parquetReaderStats, long numRecordsToRead, boolean useAsyncColReader) {
+this.schema = schema;
+this.parquetReaderStats = parquetReaderStats;
+this.useAsyncColReader = useAsyncColReader;
+if (! schema.isStarQuery()) {
+  nullFilledVectors = new ArrayList<>();
+}
+mockRecordsRead = 0;
+// Callers can pass -1 if they want to read all rows.
+if (numRecordsToRead == 
ParquetRecordReader.NUM_RECORDS_TO_READ_NOT_SPECIFIED) {
+  this.numRecordsToRead = schema.rowCount();
+} else {
+  assert (numRecordsToRead >= 0);
+  this.numRecordsToRead = Math.min(numRecordsToRead, 
schema.rowCount());
+}
+  }
+
+  @SuppressWarnings("unchecked")
+  public void buildReader(ParquetRecordReader reader, OutputMutator 
output) throws Exception {
+final ArrayList 
varLengthColumns = new ArrayList<>();
+// initialize all of the column read status objects
+BlockMetaData rowGroupMetadata = schema.getRowGroupMetadata();
+Map columnChunkMetadataPositionsInList = 
schema.buildChunkMap(rowGroupMetadata);
+for (ParquetColumnMetadata colMd : schema.getColumnMetadata()) {
+  ColumnDescriptor column = colMd.column;
+  colMd.columnChunkMetaData = rowGroupMetadata.getColumns().get(
+  
columnChunkMetadataPositionsInList.get(Arrays.toString(column.getPath(;
+  colMd.buildVector(output);
+  if (! colMd.isFixedLength( )) {
+// create a reader and add it to the appropriate list
+varLengthColumns.add(colMd.makeVariableWidthReader(reader));
+  } else if (colMd.isRepeated()) {
+varLengthColumns.add(colMd.makeRepeatedFixedWidthReader(reader, 
schema.getRecordsPerBatch()));
+  }
+  else {
+columnStatuses.add(colMd.makeFixedWidthReader(reader, 
schema.getRecordsPerBatch()));
+  }
+}
+  

[GitHub] drill pull request #789: DRILL-5356: Refactor Parquet Record Reader

2017-03-19 Thread paul-rogers
GitHub user paul-rogers opened a pull request:

https://github.com/apache/drill/pull/789

DRILL-5356: Refactor Parquet Record Reader

The Parquet reader is Drill's premier data source and has worked very well
for many years. As with any piece of code, it has grown in complexity over
that time and has become hard to understand and maintain.

In work in another project, we found that Parquet is accidentally creating
"low density" batches: record batches with little actual data compared to
the amount of memory allocated. We'd like to fix that.

However, the current complexity of the reader code creates a barrier to
making improvements: the code is so complex that it is often better to
leave bugs unfixed, or risk spending large amounts of time struggling to
make small changes.

This commit offers to help revitalize the Parquet reader. Functionality is
identical to the code in master; but code has been pulled apart into
various classes each of which focuses on one part of the task: building
up a schema, keeping track of read state, a strategy for reading various
combinations of records, etc. The idea is that it is easier to understand
several small, focused classes than one huge, complex class. Indeed, the
idea of small, focused classes is common in the industry; it is nothing new.

Unit tests pass with the change. Since no logic has chanaged, we only moved
lines of code, that is a good indication that everything still works.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/paul-rogers/drill DRILL-5356

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/789.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #789


commit f54fc657ef4bda5db2743032ab64b504183f93c8
Author: Paul Rogers 
Date:   2017-03-15T20:49:07Z

DRILL-5356: Refactor Parquet Record Reader

The Parquet reader is Drill's premier data source and has worked very well
for many years. As with any piece of code, it has grown in complexity over
that time and has become hard to understand and maintain.

In work in another project, we found that Parquet is accidentally creating
"low density" batches: record batches with little actual data compared to
the amount of memory allocated. We'd like to fix that.

However, the current complexity of the reader code creates a barrier to
making improvements: the code is so complex that it is often better to
leave bugs unfixed, or risk spending large amounts of time struggling to
make small changes.

This commit offers to help revitalize the Parquet reader. Functionality is
identical to the code in master; but code has been pulled apart into
various classes each of which focuses on one part of the task: building
up a schema, keeping track of read state, a strategy for reading various
combinations of records, etc. The idea is that it is easier to understand
several small, focused classes than one huge, complex class. Indeed, the
idea of small, focused classes is common in the industry; it is nothing new.

Unit tests pass with the change. Since no logic has chanaged, we only moved
lines of code, that is a good indication that everything still works.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---