[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2017-02-22 Thread viirya
Github user viirya closed the pull request at:

https://github.com/apache/spark/pull/13775


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-11-22 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r89127052
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * A mapred.RecordReader that returns VectorizedRowBatch.
+ */
+public class SparkVectorizedOrcRecordReader
+  implements RecordReader {
+private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+private final long offset;
+private final long length;
+private float progress = 0.0f;
+private ObjectInspector objectInspector;
+
+SparkVectorizedOrcRecordReader(Reader file, Configuration conf,
+FileSplit fileSplit) throws IOException {
+  this.offset = fileSplit.getStart();
+  this.length = fileSplit.getLength();
+  this.objectInspector = file.getObjectInspector();
+  this.reader = OrcInputFormat.createReaderFromFile(file, conf, 
this.offset,
+this.length);
+  this.progress = reader.getProgress();
+}
+
+/**
+ * Create a ColumnVector based on given ObjectInspector's type info.
+ *
+ * @param inspector ObjectInspector
+ */
+private ColumnVector createColumnVector(ObjectInspector inspector) {
+  switch(inspector.getCategory()) {
+case PRIMITIVE:
+  {
+PrimitiveTypeInfo primitiveTypeInfo =
+  (PrimitiveTypeInfo) 
((PrimitiveObjectInspector)inspector).getTypeInfo();
+switch(primitiveTypeInfo.getPrimitiveCategory()) {
+  case BOOLEAN:
+  case BYTE:
+  case SHORT:
+  case INT:
+  case LONG:
+  case DATE:
+  case INTERVAL_YEAR_MONTH:
+return new 
LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case FLOAT:
+  case DOUBLE:
+return new 
DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case BINARY:
+  case STRING:
+  case CHAR:
+  case VARCHAR:
+BytesColumnVector column = new 
BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+column.initBuffer();
+return column;
+  case DECIMAL:
+DecimalTypeInfo tInfo = (DecimalTypeInfo) 
primitiveTypeInfo;
+return new 
DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+tInfo.precision(), tInfo.scale());
+  default:
+throw 

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83796484
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A RecordReader that returns InternalRow for Spark SQL execution.
+ * This reader uses an internal reader that returns Hive's 
VectorizedRowBatch. An adapter
+ * class is used to return internal row by directly accessing data in 
column vectors.
+ */
+public class VectorizedSparkOrcNewRecordReader
+extends org.apache.hadoop.mapreduce.RecordReader {
+  private final org.apache.hadoop.mapred.RecordReader reader;
+  private final int numColumns;
+  private VectorizedRowBatch internalValue;
+  private float progress = 0.0f;
+  private List columnIDs;
+
+  private long numRowsOfBatch = 0;
+  private int indexOfRow = 0;
+
+  private final Row row;
+
+  public VectorizedSparkOrcNewRecordReader(
+  Reader file,
+  JobConf conf,
+  FileSplit fileSplit,
+  List columnIDs) throws IOException {
+List types = file.getTypes();
+numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+this.reader = new SparkVectorizedOrcRecordReader(file, conf,
+  new org.apache.hadoop.mapred.FileSplit(fileSplit));
+
+this.columnIDs = new ArrayList<>(columnIDs);
+this.internalValue = this.reader.createValue();
+this.progress = reader.getProgress();
+this.row = new Row(this.internalValue.cols, this.columnIDs);
+  }
+
+  @Override
+  public void close() throws IOException {
+reader.close();
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException,
+  InterruptedException {
+return NullWritable.get();
+  }
+
+  @Override
+  public InternalRow getCurrentValue() throws IOException,
+  InterruptedException {
+if (indexOfRow >= numRowsOfBatch) {
+  return null;
+}
+row.rowId = indexOfRow;
+indexOfRow++;
+
+return row;
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return progress;
+  }
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context)
+  throws IOException, InterruptedException {
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if 

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83796409
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala ---
@@ -118,6 +120,11 @@ class OrcFileFormat extends FileFormat with 
DataSourceRegister with Serializable
 val broadcastedHadoopConf =
   sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
+val enableVectorizedReader: Boolean =
+  sparkSession.sessionState.conf.orcVectorizedReaderEnabled &&
+  dataSchema.forall(f => f.dataType.isInstanceOf[AtomicType] &&
--- End diff --

This is similar with ParquetFileFormat does. We might not add new 
`AtomicType` frequently and current `AtomicType` should be relatively stable. 
If we do, it should be easily tested out the data type is not supported in 
reader codes.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-17 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83756912
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * A mapred.RecordReader that returns VectorizedRowBatch.
+ */
+public class SparkVectorizedOrcRecordReader
+  implements RecordReader {
+private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+private final long offset;
+private final long length;
+private float progress = 0.0f;
+private ObjectInspector objectInspector;
+
+SparkVectorizedOrcRecordReader(Reader file, Configuration conf,
+FileSplit fileSplit) throws IOException {
+  this.offset = fileSplit.getStart();
+  this.length = fileSplit.getLength();
+  this.objectInspector = file.getObjectInspector();
+  this.reader = OrcInputFormat.createReaderFromFile(file, conf, 
this.offset,
+this.length);
+  this.progress = reader.getProgress();
+}
+
+/**
+ * Create a ColumnVector based on given ObjectInspector's type info.
+ *
+ * @param inspector ObjectInspector
+ */
+private ColumnVector createColumnVector(ObjectInspector inspector) {
+  switch(inspector.getCategory()) {
+case PRIMITIVE:
+  {
+PrimitiveTypeInfo primitiveTypeInfo =
+  (PrimitiveTypeInfo) 
((PrimitiveObjectInspector)inspector).getTypeInfo();
+switch(primitiveTypeInfo.getPrimitiveCategory()) {
+  case BOOLEAN:
+  case BYTE:
+  case SHORT:
+  case INT:
+  case LONG:
+  case DATE:
+  case INTERVAL_YEAR_MONTH:
+return new 
LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case FLOAT:
+  case DOUBLE:
+return new 
DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case BINARY:
+  case STRING:
+  case CHAR:
+  case VARCHAR:
+BytesColumnVector column = new 
BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+column.initBuffer();
+return column;
+  case DECIMAL:
+DecimalTypeInfo tInfo = (DecimalTypeInfo) 
primitiveTypeInfo;
+return new 
DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+tInfo.precision(), tInfo.scale());
+  default:
+

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-17 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83753217
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * A mapred.RecordReader that returns VectorizedRowBatch.
+ */
+public class SparkVectorizedOrcRecordReader
+  implements RecordReader {
+private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+private final long offset;
+private final long length;
+private float progress = 0.0f;
+private ObjectInspector objectInspector;
+
+SparkVectorizedOrcRecordReader(Reader file, Configuration conf,
--- End diff --

nit: have each param on a separate line for readability. There are other 
places in this PR where the same comment will apply


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-17 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83764569
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala ---
@@ -131,31 +138,43 @@ class OrcFileFormat extends FileFormat with 
DataSourceRegister with Serializable
 val physicalSchema = maybePhysicalSchema.get
 OrcRelation.setRequiredColumns(conf, physicalSchema, 
requiredSchema)
 
-val orcRecordReader = {
-  val job = Job.getInstance(conf)
-  FileInputFormat.setInputPaths(job, file.filePath)
-
-  val fileSplit = new FileSplit(
-new Path(new URI(file.filePath)), file.start, file.length, 
Array.empty
-  )
-  // Custom OrcRecordReader is used to get
-  // ObjectInspector during recordReader creation itself and can
-  // avoid NameNode call in unwrapOrcStructs per file.
-  // Specifically would be helpful for partitioned datasets.
-  val orcReader = OrcFile.createReader(
-new Path(new URI(file.filePath)), OrcFile.readerOptions(conf))
-  new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart, 
fileSplit.getLength)
+val job = Job.getInstance(conf)
+FileInputFormat.setInputPaths(job, file.filePath)
+
+val fileSplit = new FileSplit(
+  new Path(new URI(file.filePath)), file.start, file.length, 
Array.empty
+)
+// Custom OrcRecordReader is used to get
+// ObjectInspector during recordReader creation itself and can
+// avoid NameNode call in unwrapOrcStructs per file.
+// Specifically would be helpful for partitioned datasets.
+val orcReader = OrcFile.createReader(
+  new Path(new URI(file.filePath)), OrcFile.readerOptions(conf))
+
+if (enableVectorizedReader) {
+  val conf = job.getConfiguration.asInstanceOf[JobConf]
--- End diff --

why can't you reuse the `conf` at line 129 
(https://github.com/apache/spark/pull/13775/files#diff-01999ccbf13e95a0ea2d223f69d8ae23R129)
 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-17 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83752360
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * A mapred.RecordReader that returns VectorizedRowBatch.
+ */
+public class SparkVectorizedOrcRecordReader
+  implements RecordReader {
+private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+private final long offset;
+private final long length;
+private float progress = 0.0f;
+private ObjectInspector objectInspector;
+
+SparkVectorizedOrcRecordReader(Reader file, Configuration conf,
+FileSplit fileSplit) throws IOException {
+  this.offset = fileSplit.getStart();
+  this.length = fileSplit.getLength();
+  this.objectInspector = file.getObjectInspector();
+  this.reader = OrcInputFormat.createReaderFromFile(file, conf, 
this.offset,
+this.length);
+  this.progress = reader.getProgress();
+}
+
+/**
+ * Create a ColumnVector based on given ObjectInspector's type info.
+ *
+ * @param inspector ObjectInspector
+ */
+private ColumnVector createColumnVector(ObjectInspector inspector) {
+  switch(inspector.getCategory()) {
+case PRIMITIVE:
+  {
+PrimitiveTypeInfo primitiveTypeInfo =
+  (PrimitiveTypeInfo) 
((PrimitiveObjectInspector)inspector).getTypeInfo();
+switch(primitiveTypeInfo.getPrimitiveCategory()) {
+  case BOOLEAN:
+  case BYTE:
+  case SHORT:
+  case INT:
+  case LONG:
+  case DATE:
+  case INTERVAL_YEAR_MONTH:
+return new 
LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case FLOAT:
+  case DOUBLE:
+return new 
DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case BINARY:
+  case STRING:
+  case CHAR:
+  case VARCHAR:
+BytesColumnVector column = new 
BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+column.initBuffer();
+return column;
+  case DECIMAL:
+DecimalTypeInfo tInfo = (DecimalTypeInfo) 
primitiveTypeInfo;
+return new 
DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+tInfo.precision(), tInfo.scale());
+  default:
+

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-17 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83753291
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * A mapred.RecordReader that returns VectorizedRowBatch.
+ */
+public class SparkVectorizedOrcRecordReader
+  implements RecordReader {
+private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+private final long offset;
+private final long length;
+private float progress = 0.0f;
+private ObjectInspector objectInspector;
+
+SparkVectorizedOrcRecordReader(Reader file, Configuration conf,
+FileSplit fileSplit) throws IOException {
+  this.offset = fileSplit.getStart();
+  this.length = fileSplit.getLength();
+  this.objectInspector = file.getObjectInspector();
+  this.reader = OrcInputFormat.createReaderFromFile(file, conf, 
this.offset,
+this.length);
+  this.progress = reader.getProgress();
+}
+
+/**
+ * Create a ColumnVector based on given ObjectInspector's type info.
+ *
+ * @param inspector ObjectInspector
+ */
+private ColumnVector createColumnVector(ObjectInspector inspector) {
+  switch(inspector.getCategory()) {
+case PRIMITIVE:
+  {
+PrimitiveTypeInfo primitiveTypeInfo =
+  (PrimitiveTypeInfo) 
((PrimitiveObjectInspector)inspector).getTypeInfo();
+switch(primitiveTypeInfo.getPrimitiveCategory()) {
+  case BOOLEAN:
+  case BYTE:
+  case SHORT:
+  case INT:
+  case LONG:
+  case DATE:
+  case INTERVAL_YEAR_MONTH:
+return new 
LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case FLOAT:
+  case DOUBLE:
+return new 
DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case BINARY:
+  case STRING:
+  case CHAR:
+  case VARCHAR:
+BytesColumnVector column = new 
BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+column.initBuffer();
+return column;
+  case DECIMAL:
+DecimalTypeInfo tInfo = (DecimalTypeInfo) 
primitiveTypeInfo;
--- End diff --

`tInfo` -> `decimalTypeInfo`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not 

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-17 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83761287
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A RecordReader that returns InternalRow for Spark SQL execution.
+ * This reader uses an internal reader that returns Hive's 
VectorizedRowBatch. An adapter
+ * class is used to return internal row by directly accessing data in 
column vectors.
+ */
+public class VectorizedSparkOrcNewRecordReader
+extends org.apache.hadoop.mapreduce.RecordReader {
+  private final org.apache.hadoop.mapred.RecordReader reader;
+  private final int numColumns;
+  private VectorizedRowBatch internalValue;
+  private float progress = 0.0f;
+  private List columnIDs;
+
+  private long numRowsOfBatch = 0;
+  private int indexOfRow = 0;
+
+  private final Row row;
+
+  public VectorizedSparkOrcNewRecordReader(
+  Reader file,
+  JobConf conf,
+  FileSplit fileSplit,
+  List columnIDs) throws IOException {
+List types = file.getTypes();
+numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+this.reader = new SparkVectorizedOrcRecordReader(file, conf,
+  new org.apache.hadoop.mapred.FileSplit(fileSplit));
+
+this.columnIDs = new ArrayList<>(columnIDs);
+this.internalValue = this.reader.createValue();
+this.progress = reader.getProgress();
+this.row = new Row(this.internalValue.cols, this.columnIDs);
+  }
+
+  @Override
+  public void close() throws IOException {
+reader.close();
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException,
+  InterruptedException {
+return NullWritable.get();
+  }
+
+  @Override
+  public InternalRow getCurrentValue() throws IOException,
+  InterruptedException {
+if (indexOfRow >= numRowsOfBatch) {
+  return null;
+}
+row.rowId = indexOfRow;
+indexOfRow++;
+
+return row;
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return progress;
+  }
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context)
+  throws IOException, InterruptedException {
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if 

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-17 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83751452
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -251,6 +251,12 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val ORC_VECTORIZED_READER_ENABLED =
+SQLConfigBuilder("spark.sql.orc.enableVectorizedReader")
+  .doc("Enables vectorized orc reader.")
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

Please turn it off by default. Until there is more testing, this might be 
risky thing to do. If you have productionised this code and have been running 
smoothly for sometime, then it would be comfortable to launch with default 
turned on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-17 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83756710
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * A mapred.RecordReader that returns VectorizedRowBatch.
+ */
+public class SparkVectorizedOrcRecordReader
+  implements RecordReader {
+private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+private final long offset;
+private final long length;
+private float progress = 0.0f;
+private ObjectInspector objectInspector;
+
+SparkVectorizedOrcRecordReader(Reader file, Configuration conf,
+FileSplit fileSplit) throws IOException {
+  this.offset = fileSplit.getStart();
+  this.length = fileSplit.getLength();
+  this.objectInspector = file.getObjectInspector();
+  this.reader = OrcInputFormat.createReaderFromFile(file, conf, 
this.offset,
+this.length);
+  this.progress = reader.getProgress();
+}
+
+/**
+ * Create a ColumnVector based on given ObjectInspector's type info.
+ *
+ * @param inspector ObjectInspector
+ */
+private ColumnVector createColumnVector(ObjectInspector inspector) {
+  switch(inspector.getCategory()) {
+case PRIMITIVE:
+  {
+PrimitiveTypeInfo primitiveTypeInfo =
+  (PrimitiveTypeInfo) 
((PrimitiveObjectInspector)inspector).getTypeInfo();
+switch(primitiveTypeInfo.getPrimitiveCategory()) {
+  case BOOLEAN:
+  case BYTE:
+  case SHORT:
+  case INT:
+  case LONG:
+  case DATE:
+  case INTERVAL_YEAR_MONTH:
+return new 
LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case FLOAT:
+  case DOUBLE:
+return new 
DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case BINARY:
+  case STRING:
+  case CHAR:
+  case VARCHAR:
+BytesColumnVector column = new 
BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+column.initBuffer();
+return column;
+  case DECIMAL:
+DecimalTypeInfo tInfo = (DecimalTypeInfo) 
primitiveTypeInfo;
+return new 
DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+tInfo.precision(), tInfo.scale());
+  default:
+

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-17 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83752583
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * A mapred.RecordReader that returns VectorizedRowBatch.
+ */
+public class SparkVectorizedOrcRecordReader
+  implements RecordReader {
+private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+private final long offset;
+private final long length;
+private float progress = 0.0f;
+private ObjectInspector objectInspector;
+
+SparkVectorizedOrcRecordReader(Reader file, Configuration conf,
+FileSplit fileSplit) throws IOException {
+  this.offset = fileSplit.getStart();
+  this.length = fileSplit.getLength();
+  this.objectInspector = file.getObjectInspector();
+  this.reader = OrcInputFormat.createReaderFromFile(file, conf, 
this.offset,
+this.length);
+  this.progress = reader.getProgress();
+}
+
+/**
+ * Create a ColumnVector based on given ObjectInspector's type info.
+ *
+ * @param inspector ObjectInspector
+ */
+private ColumnVector createColumnVector(ObjectInspector inspector) {
+  switch(inspector.getCategory()) {
+case PRIMITIVE:
+  {
+PrimitiveTypeInfo primitiveTypeInfo =
+  (PrimitiveTypeInfo) 
((PrimitiveObjectInspector)inspector).getTypeInfo();
+switch(primitiveTypeInfo.getPrimitiveCategory()) {
+  case BOOLEAN:
+  case BYTE:
+  case SHORT:
+  case INT:
+  case LONG:
+  case DATE:
+  case INTERVAL_YEAR_MONTH:
+return new 
LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case FLOAT:
+  case DOUBLE:
+return new 
DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case BINARY:
+  case STRING:
+  case CHAR:
+  case VARCHAR:
+BytesColumnVector column = new 
BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+column.initBuffer();
+return column;
+  case DECIMAL:
+DecimalTypeInfo tInfo = (DecimalTypeInfo) 
primitiveTypeInfo;
+return new 
DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+tInfo.precision(), tInfo.scale());
+  default:
+

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-17 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83760016
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A RecordReader that returns InternalRow for Spark SQL execution.
+ * This reader uses an internal reader that returns Hive's 
VectorizedRowBatch. An adapter
+ * class is used to return internal row by directly accessing data in 
column vectors.
+ */
+public class VectorizedSparkOrcNewRecordReader
+extends org.apache.hadoop.mapreduce.RecordReader {
+  private final org.apache.hadoop.mapred.RecordReader reader;
+  private final int numColumns;
+  private VectorizedRowBatch internalValue;
+  private float progress = 0.0f;
+  private List columnIDs;
+
+  private long numRowsOfBatch = 0;
+  private int indexOfRow = 0;
+
+  private final Row row;
+
+  public VectorizedSparkOrcNewRecordReader(
+  Reader file,
+  JobConf conf,
+  FileSplit fileSplit,
+  List columnIDs) throws IOException {
+List types = file.getTypes();
+numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+this.reader = new SparkVectorizedOrcRecordReader(file, conf,
+  new org.apache.hadoop.mapred.FileSplit(fileSplit));
+
+this.columnIDs = new ArrayList<>(columnIDs);
+this.internalValue = this.reader.createValue();
+this.progress = reader.getProgress();
+this.row = new Row(this.internalValue.cols, this.columnIDs);
+  }
+
+  @Override
+  public void close() throws IOException {
+reader.close();
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException,
+  InterruptedException {
+return NullWritable.get();
+  }
+
+  @Override
+  public InternalRow getCurrentValue() throws IOException,
+  InterruptedException {
+if (indexOfRow >= numRowsOfBatch) {
+  return null;
+}
+row.rowId = indexOfRow;
+indexOfRow++;
+
+return row;
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return progress;
+  }
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context)
+  throws IOException, InterruptedException {
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if 

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-17 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83763831
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala ---
@@ -118,6 +120,11 @@ class OrcFileFormat extends FileFormat with 
DataSourceRegister with Serializable
 val broadcastedHadoopConf =
   sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
+val enableVectorizedReader: Boolean =
+  sparkSession.sessionState.conf.orcVectorizedReaderEnabled &&
+  dataSchema.forall(f => f.dataType.isInstanceOf[AtomicType] &&
--- End diff --

This is not reliable. If a new `AtomicType` type gets introduced and not 
supported for vectorised reads, this will not guard us against that. Its easy 
for anyone to forget to add new atomic type to the exclusion list in this check


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-17 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83756504
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * A mapred.RecordReader that returns VectorizedRowBatch.
+ */
+public class SparkVectorizedOrcRecordReader
+  implements RecordReader {
+private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+private final long offset;
+private final long length;
+private float progress = 0.0f;
+private ObjectInspector objectInspector;
+
+SparkVectorizedOrcRecordReader(Reader file, Configuration conf,
+FileSplit fileSplit) throws IOException {
+  this.offset = fileSplit.getStart();
+  this.length = fileSplit.getLength();
+  this.objectInspector = file.getObjectInspector();
+  this.reader = OrcInputFormat.createReaderFromFile(file, conf, 
this.offset,
+this.length);
+  this.progress = reader.getProgress();
+}
+
+/**
+ * Create a ColumnVector based on given ObjectInspector's type info.
+ *
+ * @param inspector ObjectInspector
+ */
+private ColumnVector createColumnVector(ObjectInspector inspector) {
+  switch(inspector.getCategory()) {
+case PRIMITIVE:
+  {
+PrimitiveTypeInfo primitiveTypeInfo =
+  (PrimitiveTypeInfo) 
((PrimitiveObjectInspector)inspector).getTypeInfo();
+switch(primitiveTypeInfo.getPrimitiveCategory()) {
+  case BOOLEAN:
+  case BYTE:
+  case SHORT:
+  case INT:
+  case LONG:
+  case DATE:
+  case INTERVAL_YEAR_MONTH:
+return new 
LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case FLOAT:
+  case DOUBLE:
+return new 
DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case BINARY:
+  case STRING:
+  case CHAR:
+  case VARCHAR:
+BytesColumnVector column = new 
BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+column.initBuffer();
+return column;
+  case DECIMAL:
+DecimalTypeInfo tInfo = (DecimalTypeInfo) 
primitiveTypeInfo;
+return new 
DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+tInfo.precision(), tInfo.scale());
+  default:
+

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-17 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83753422
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * A mapred.RecordReader that returns VectorizedRowBatch.
+ */
+public class SparkVectorizedOrcRecordReader
+  implements RecordReader {
+private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+private final long offset;
+private final long length;
+private float progress = 0.0f;
+private ObjectInspector objectInspector;
+
+SparkVectorizedOrcRecordReader(Reader file, Configuration conf,
+FileSplit fileSplit) throws IOException {
+  this.offset = fileSplit.getStart();
+  this.length = fileSplit.getLength();
+  this.objectInspector = file.getObjectInspector();
+  this.reader = OrcInputFormat.createReaderFromFile(file, conf, 
this.offset,
+this.length);
+  this.progress = reader.getProgress();
+}
+
+/**
+ * Create a ColumnVector based on given ObjectInspector's type info.
+ *
+ * @param inspector ObjectInspector
+ */
+private ColumnVector createColumnVector(ObjectInspector inspector) {
+  switch(inspector.getCategory()) {
+case PRIMITIVE:
+  {
+PrimitiveTypeInfo primitiveTypeInfo =
+  (PrimitiveTypeInfo) 
((PrimitiveObjectInspector)inspector).getTypeInfo();
+switch(primitiveTypeInfo.getPrimitiveCategory()) {
+  case BOOLEAN:
+  case BYTE:
+  case SHORT:
+  case INT:
+  case LONG:
+  case DATE:
+  case INTERVAL_YEAR_MONTH:
+return new 
LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case FLOAT:
+  case DOUBLE:
+return new 
DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case BINARY:
+  case STRING:
+  case CHAR:
+  case VARCHAR:
+BytesColumnVector column = new 
BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+column.initBuffer();
+return column;
+  case DECIMAL:
+DecimalTypeInfo tInfo = (DecimalTypeInfo) 
primitiveTypeInfo;
+return new 
DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+tInfo.precision(), tInfo.scale());
+  default:
+

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-17 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83761710
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A RecordReader that returns InternalRow for Spark SQL execution.
+ * This reader uses an internal reader that returns Hive's 
VectorizedRowBatch. An adapter
+ * class is used to return internal row by directly accessing data in 
column vectors.
+ */
+public class VectorizedSparkOrcNewRecordReader
+extends org.apache.hadoop.mapreduce.RecordReader {
+  private final org.apache.hadoop.mapred.RecordReader reader;
+  private final int numColumns;
+  private VectorizedRowBatch internalValue;
+  private float progress = 0.0f;
+  private List columnIDs;
+
+  private long numRowsOfBatch = 0;
+  private int indexOfRow = 0;
+
+  private final Row row;
+
+  public VectorizedSparkOrcNewRecordReader(
+  Reader file,
+  JobConf conf,
+  FileSplit fileSplit,
+  List columnIDs) throws IOException {
+List types = file.getTypes();
+numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+this.reader = new SparkVectorizedOrcRecordReader(file, conf,
+  new org.apache.hadoop.mapred.FileSplit(fileSplit));
+
+this.columnIDs = new ArrayList<>(columnIDs);
+this.internalValue = this.reader.createValue();
+this.progress = reader.getProgress();
+this.row = new Row(this.internalValue.cols, this.columnIDs);
+  }
+
+  @Override
+  public void close() throws IOException {
+reader.close();
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException,
+  InterruptedException {
+return NullWritable.get();
+  }
+
+  @Override
+  public InternalRow getCurrentValue() throws IOException,
+  InterruptedException {
+if (indexOfRow >= numRowsOfBatch) {
+  return null;
+}
+row.rowId = indexOfRow;
+indexOfRow++;
+
+return row;
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return progress;
+  }
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context)
+  throws IOException, InterruptedException {
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if 

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-17 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83757435
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A RecordReader that returns InternalRow for Spark SQL execution.
+ * This reader uses an internal reader that returns Hive's 
VectorizedRowBatch. An adapter
+ * class is used to return internal row by directly accessing data in 
column vectors.
+ */
+public class VectorizedSparkOrcNewRecordReader
+extends org.apache.hadoop.mapreduce.RecordReader {
+  private final org.apache.hadoop.mapred.RecordReader reader;
+  private final int numColumns;
+  private VectorizedRowBatch internalValue;
+  private float progress = 0.0f;
+  private List columnIDs;
+
+  private long numRowsOfBatch = 0;
+  private int indexOfRow = 0;
+
+  private final Row row;
+
+  public VectorizedSparkOrcNewRecordReader(
+  Reader file,
+  JobConf conf,
+  FileSplit fileSplit,
+  List columnIDs) throws IOException {
+List types = file.getTypes();
+numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+this.reader = new SparkVectorizedOrcRecordReader(file, conf,
+  new org.apache.hadoop.mapred.FileSplit(fileSplit));
+
+this.columnIDs = new ArrayList<>(columnIDs);
+this.internalValue = this.reader.createValue();
+this.progress = reader.getProgress();
+this.row = new Row(this.internalValue.cols, this.columnIDs);
+  }
+
+  @Override
+  public void close() throws IOException {
+reader.close();
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException,
+  InterruptedException {
--- End diff --

nit: move all the `InterruptedException` in the previous line. This might 
also apply to other places in the PR but I am not pointing out each instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-17 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83756988
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * A mapred.RecordReader that returns VectorizedRowBatch.
+ */
+public class SparkVectorizedOrcRecordReader
+  implements RecordReader {
+private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+private final long offset;
+private final long length;
+private float progress = 0.0f;
+private ObjectInspector objectInspector;
+
+SparkVectorizedOrcRecordReader(Reader file, Configuration conf,
+FileSplit fileSplit) throws IOException {
+  this.offset = fileSplit.getStart();
+  this.length = fileSplit.getLength();
+  this.objectInspector = file.getObjectInspector();
+  this.reader = OrcInputFormat.createReaderFromFile(file, conf, 
this.offset,
+this.length);
+  this.progress = reader.getProgress();
+}
+
+/**
+ * Create a ColumnVector based on given ObjectInspector's type info.
+ *
+ * @param inspector ObjectInspector
+ */
+private ColumnVector createColumnVector(ObjectInspector inspector) {
+  switch(inspector.getCategory()) {
+case PRIMITIVE:
+  {
+PrimitiveTypeInfo primitiveTypeInfo =
+  (PrimitiveTypeInfo) 
((PrimitiveObjectInspector)inspector).getTypeInfo();
+switch(primitiveTypeInfo.getPrimitiveCategory()) {
+  case BOOLEAN:
+  case BYTE:
+  case SHORT:
+  case INT:
+  case LONG:
+  case DATE:
+  case INTERVAL_YEAR_MONTH:
+return new 
LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case FLOAT:
+  case DOUBLE:
+return new 
DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case BINARY:
+  case STRING:
+  case CHAR:
+  case VARCHAR:
+BytesColumnVector column = new 
BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+column.initBuffer();
+return column;
+  case DECIMAL:
+DecimalTypeInfo tInfo = (DecimalTypeInfo) 
primitiveTypeInfo;
+return new 
DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+tInfo.precision(), tInfo.scale());
+  default:
+

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-17 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83753744
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * A mapred.RecordReader that returns VectorizedRowBatch.
+ */
+public class SparkVectorizedOrcRecordReader
+  implements RecordReader {
+private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+private final long offset;
+private final long length;
+private float progress = 0.0f;
+private ObjectInspector objectInspector;
+
+SparkVectorizedOrcRecordReader(Reader file, Configuration conf,
+FileSplit fileSplit) throws IOException {
+  this.offset = fileSplit.getStart();
+  this.length = fileSplit.getLength();
+  this.objectInspector = file.getObjectInspector();
+  this.reader = OrcInputFormat.createReaderFromFile(file, conf, 
this.offset,
+this.length);
+  this.progress = reader.getProgress();
+}
+
+/**
+ * Create a ColumnVector based on given ObjectInspector's type info.
+ *
+ * @param inspector ObjectInspector
+ */
+private ColumnVector createColumnVector(ObjectInspector inspector) {
+  switch(inspector.getCategory()) {
+case PRIMITIVE:
+  {
+PrimitiveTypeInfo primitiveTypeInfo =
+  (PrimitiveTypeInfo) 
((PrimitiveObjectInspector)inspector).getTypeInfo();
+switch(primitiveTypeInfo.getPrimitiveCategory()) {
+  case BOOLEAN:
+  case BYTE:
+  case SHORT:
+  case INT:
+  case LONG:
+  case DATE:
+  case INTERVAL_YEAR_MONTH:
+return new 
LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case FLOAT:
+  case DOUBLE:
+return new 
DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case BINARY:
+  case STRING:
+  case CHAR:
+  case VARCHAR:
+BytesColumnVector column = new 
BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+column.initBuffer();
+return column;
+  case DECIMAL:
+DecimalTypeInfo tInfo = (DecimalTypeInfo) 
primitiveTypeInfo;
+return new 
DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+tInfo.precision(), tInfo.scale());
+  default:
+

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-17 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83759570
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A RecordReader that returns InternalRow for Spark SQL execution.
+ * This reader uses an internal reader that returns Hive's 
VectorizedRowBatch. An adapter
+ * class is used to return internal row by directly accessing data in 
column vectors.
+ */
+public class VectorizedSparkOrcNewRecordReader
+extends org.apache.hadoop.mapreduce.RecordReader {
+  private final org.apache.hadoop.mapred.RecordReader reader;
+  private final int numColumns;
+  private VectorizedRowBatch internalValue;
+  private float progress = 0.0f;
+  private List columnIDs;
+
+  private long numRowsOfBatch = 0;
+  private int indexOfRow = 0;
+
+  private final Row row;
+
+  public VectorizedSparkOrcNewRecordReader(
+  Reader file,
+  JobConf conf,
+  FileSplit fileSplit,
+  List columnIDs) throws IOException {
+List types = file.getTypes();
+numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+this.reader = new SparkVectorizedOrcRecordReader(file, conf,
+  new org.apache.hadoop.mapred.FileSplit(fileSplit));
+
+this.columnIDs = new ArrayList<>(columnIDs);
+this.internalValue = this.reader.createValue();
+this.progress = reader.getProgress();
+this.row = new Row(this.internalValue.cols, this.columnIDs);
+  }
+
+  @Override
+  public void close() throws IOException {
+reader.close();
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException,
+  InterruptedException {
+return NullWritable.get();
+  }
+
+  @Override
+  public InternalRow getCurrentValue() throws IOException,
+  InterruptedException {
+if (indexOfRow >= numRowsOfBatch) {
+  return null;
+}
+row.rowId = indexOfRow;
+indexOfRow++;
+
+return row;
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return progress;
+  }
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context)
+  throws IOException, InterruptedException {
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if 

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-17 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83752300
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * A mapred.RecordReader that returns VectorizedRowBatch.
+ */
+public class SparkVectorizedOrcRecordReader
+  implements RecordReader {
+private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+private final long offset;
+private final long length;
+private float progress = 0.0f;
+private ObjectInspector objectInspector;
+
+SparkVectorizedOrcRecordReader(Reader file, Configuration conf,
+FileSplit fileSplit) throws IOException {
+  this.offset = fileSplit.getStart();
+  this.length = fileSplit.getLength();
+  this.objectInspector = file.getObjectInspector();
+  this.reader = OrcInputFormat.createReaderFromFile(file, conf, 
this.offset,
+this.length);
+  this.progress = reader.getProgress();
+}
+
+/**
+ * Create a ColumnVector based on given ObjectInspector's type info.
+ *
+ * @param inspector ObjectInspector
+ */
+private ColumnVector createColumnVector(ObjectInspector inspector) {
+  switch(inspector.getCategory()) {
+case PRIMITIVE:
+  {
+PrimitiveTypeInfo primitiveTypeInfo =
+  (PrimitiveTypeInfo) 
((PrimitiveObjectInspector)inspector).getTypeInfo();
+switch(primitiveTypeInfo.getPrimitiveCategory()) {
+  case BOOLEAN:
+  case BYTE:
+  case SHORT:
+  case INT:
+  case LONG:
+  case DATE:
+  case INTERVAL_YEAR_MONTH:
+return new 
LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case FLOAT:
+  case DOUBLE:
+return new 
DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case BINARY:
+  case STRING:
+  case CHAR:
+  case VARCHAR:
+BytesColumnVector column = new 
BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+column.initBuffer();
+return column;
+  case DECIMAL:
+DecimalTypeInfo tInfo = (DecimalTypeInfo) 
primitiveTypeInfo;
+return new 
DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+tInfo.precision(), tInfo.scale());
+  default:
+

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-10-17 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r83757105
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * A mapred.RecordReader that returns VectorizedRowBatch.
+ */
+public class SparkVectorizedOrcRecordReader
+  implements RecordReader {
+private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+private final long offset;
+private final long length;
+private float progress = 0.0f;
+private ObjectInspector objectInspector;
+
+SparkVectorizedOrcRecordReader(Reader file, Configuration conf,
+FileSplit fileSplit) throws IOException {
+  this.offset = fileSplit.getStart();
+  this.length = fileSplit.getLength();
+  this.objectInspector = file.getObjectInspector();
+  this.reader = OrcInputFormat.createReaderFromFile(file, conf, 
this.offset,
+this.length);
+  this.progress = reader.getProgress();
+}
+
+/**
+ * Create a ColumnVector based on given ObjectInspector's type info.
+ *
+ * @param inspector ObjectInspector
+ */
+private ColumnVector createColumnVector(ObjectInspector inspector) {
+  switch(inspector.getCategory()) {
+case PRIMITIVE:
+  {
+PrimitiveTypeInfo primitiveTypeInfo =
+  (PrimitiveTypeInfo) 
((PrimitiveObjectInspector)inspector).getTypeInfo();
+switch(primitiveTypeInfo.getPrimitiveCategory()) {
+  case BOOLEAN:
+  case BYTE:
+  case SHORT:
+  case INT:
+  case LONG:
+  case DATE:
+  case INTERVAL_YEAR_MONTH:
+return new 
LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case FLOAT:
+  case DOUBLE:
+return new 
DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  case BINARY:
+  case STRING:
+  case CHAR:
+  case VARCHAR:
+BytesColumnVector column = new 
BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+column.initBuffer();
+return column;
+  case DECIMAL:
+DecimalTypeInfo tInfo = (DecimalTypeInfo) 
primitiveTypeInfo;
+return new 
DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+tInfo.precision(), tInfo.scale());
+  default:
+

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-08-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r74369496
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A RecordReader that returns InternalRow for Spark SQL execution.
+ * This reader uses an internal reader that returns Hive's 
VectorizedRowBatch. An adapter
+ * class is used to return internal row by directly accessing data in 
column vectors.
+ */
+public class VectorizedSparkOrcNewRecordReader
+extends org.apache.hadoop.mapreduce.RecordReader {
+  private final org.apache.hadoop.mapred.RecordReader reader;
+  private final int numColumns;
+  private VectorizedRowBatch internalValue;
+  private float progress = 0.0f;
+  private List columnIDs;
+
+  private long numRowsOfBatch = 0;
+  private int indexOfRow = 0;
+
+  private final Row row;
+
+  public VectorizedSparkOrcNewRecordReader(
+  Reader file,
+  JobConf conf,
+  FileSplit fileSplit,
+  List columnIDs) throws IOException {
+List types = file.getTypes();
+numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+this.reader = new SparkVectorizedOrcRecordReader(file, conf,
+  new org.apache.hadoop.mapred.FileSplit(fileSplit));
+
+this.columnIDs = new ArrayList<>(columnIDs);
+this.internalValue = this.reader.createValue();
+this.progress = reader.getProgress();
+this.row = new Row(this.internalValue.cols, this.columnIDs);
+  }
+
+  @Override
+  public void close() throws IOException {
+reader.close();
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException,
+  InterruptedException {
+return NullWritable.get();
+  }
+
+  @Override
+  public InternalRow getCurrentValue() throws IOException,
+  InterruptedException {
+if (indexOfRow >= numRowsOfBatch) {
+  return null;
+}
+row.rowId = indexOfRow;
+indexOfRow++;
+
+return row;
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return progress;
+  }
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context)
+  throws IOException, InterruptedException {
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if 

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-08-10 Thread dafrista
Github user dafrista commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r74368871
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A RecordReader that returns InternalRow for Spark SQL execution.
+ * This reader uses an internal reader that returns Hive's 
VectorizedRowBatch. An adapter
+ * class is used to return internal row by directly accessing data in 
column vectors.
+ */
+public class VectorizedSparkOrcNewRecordReader
+extends org.apache.hadoop.mapreduce.RecordReader {
+  private final org.apache.hadoop.mapred.RecordReader reader;
+  private final int numColumns;
+  private VectorizedRowBatch internalValue;
+  private float progress = 0.0f;
+  private List columnIDs;
+
+  private long numRowsOfBatch = 0;
+  private int indexOfRow = 0;
+
+  private final Row row;
+
+  public VectorizedSparkOrcNewRecordReader(
+  Reader file,
+  JobConf conf,
+  FileSplit fileSplit,
+  List columnIDs) throws IOException {
+List types = file.getTypes();
+numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+this.reader = new SparkVectorizedOrcRecordReader(file, conf,
+  new org.apache.hadoop.mapred.FileSplit(fileSplit));
+
+this.columnIDs = new ArrayList<>(columnIDs);
+this.internalValue = this.reader.createValue();
+this.progress = reader.getProgress();
+this.row = new Row(this.internalValue.cols, this.columnIDs);
+  }
+
+  @Override
+  public void close() throws IOException {
+reader.close();
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException,
+  InterruptedException {
+return NullWritable.get();
+  }
+
+  @Override
+  public InternalRow getCurrentValue() throws IOException,
+  InterruptedException {
+if (indexOfRow >= numRowsOfBatch) {
+  return null;
+}
+row.rowId = indexOfRow;
+indexOfRow++;
+
+return row;
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return progress;
+  }
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context)
+  throws IOException, InterruptedException {
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if 

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-08-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r74367827
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A RecordReader that returns InternalRow for Spark SQL execution.
+ * This reader uses an internal reader that returns Hive's 
VectorizedRowBatch. An adapter
+ * class is used to return internal row by directly accessing data in 
column vectors.
+ */
+public class VectorizedSparkOrcNewRecordReader
+extends org.apache.hadoop.mapreduce.RecordReader {
+  private final org.apache.hadoop.mapred.RecordReader reader;
+  private final int numColumns;
+  private VectorizedRowBatch internalValue;
+  private float progress = 0.0f;
+  private List columnIDs;
+
+  private long numRowsOfBatch = 0;
+  private int indexOfRow = 0;
+
+  private final Row row;
+
+  public VectorizedSparkOrcNewRecordReader(
+  Reader file,
+  JobConf conf,
+  FileSplit fileSplit,
+  List columnIDs) throws IOException {
+List types = file.getTypes();
+numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+this.reader = new SparkVectorizedOrcRecordReader(file, conf,
+  new org.apache.hadoop.mapred.FileSplit(fileSplit));
+
+this.columnIDs = new ArrayList<>(columnIDs);
+this.internalValue = this.reader.createValue();
+this.progress = reader.getProgress();
+this.row = new Row(this.internalValue.cols, this.columnIDs);
+  }
+
+  @Override
+  public void close() throws IOException {
+reader.close();
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException,
+  InterruptedException {
+return NullWritable.get();
+  }
+
+  @Override
+  public InternalRow getCurrentValue() throws IOException,
+  InterruptedException {
+if (indexOfRow >= numRowsOfBatch) {
+  return null;
+}
+row.rowId = indexOfRow;
+indexOfRow++;
+
+return row;
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return progress;
+  }
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context)
+  throws IOException, InterruptedException {
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if 

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-08-10 Thread dafrista
Github user dafrista commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r74356405
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A RecordReader that returns InternalRow for Spark SQL execution.
+ * This reader uses an internal reader that returns Hive's 
VectorizedRowBatch. An adapter
+ * class is used to return internal row by directly accessing data in 
column vectors.
+ */
+public class VectorizedSparkOrcNewRecordReader
+extends org.apache.hadoop.mapreduce.RecordReader {
+  private final org.apache.hadoop.mapred.RecordReader reader;
+  private final int numColumns;
+  private VectorizedRowBatch internalValue;
+  private float progress = 0.0f;
+  private List columnIDs;
+
+  private long numRowsOfBatch = 0;
+  private int indexOfRow = 0;
+
+  private final Row row;
+
+  public VectorizedSparkOrcNewRecordReader(
+  Reader file,
+  JobConf conf,
+  FileSplit fileSplit,
+  List columnIDs) throws IOException {
+List types = file.getTypes();
+numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+this.reader = new SparkVectorizedOrcRecordReader(file, conf,
+  new org.apache.hadoop.mapred.FileSplit(fileSplit));
+
+this.columnIDs = new ArrayList<>(columnIDs);
+this.internalValue = this.reader.createValue();
+this.progress = reader.getProgress();
+this.row = new Row(this.internalValue.cols, this.columnIDs);
+  }
+
+  @Override
+  public void close() throws IOException {
+reader.close();
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException,
+  InterruptedException {
+return NullWritable.get();
+  }
+
+  @Override
+  public InternalRow getCurrentValue() throws IOException,
+  InterruptedException {
+if (indexOfRow >= numRowsOfBatch) {
+  return null;
+}
+row.rowId = indexOfRow;
+indexOfRow++;
+
+return row;
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return progress;
+  }
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context)
+  throws IOException, InterruptedException {
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if 

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-08-09 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r74183317
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A RecordReader that returns InternalRow for Spark SQL execution.
+ * This reader uses an internal reader that returns Hive's 
VectorizedRowBatch. An adapter
+ * class is used to return internal row by directly accessing data in 
column vectors.
+ */
+public class VectorizedSparkOrcNewRecordReader
+extends org.apache.hadoop.mapreduce.RecordReader {
+  private final org.apache.hadoop.mapred.RecordReader reader;
+  private final int numColumns;
+  private VectorizedRowBatch internalValue;
+  private float progress = 0.0f;
+  private List columnIDs;
+
+  private long numRowsOfBatch = 0;
+  private int indexOfRow = 0;
+
+  private final Row row;
+
+  public VectorizedSparkOrcNewRecordReader(
+  Reader file,
+  JobConf conf,
+  FileSplit fileSplit,
+  List columnIDs) throws IOException {
+List types = file.getTypes();
+numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+this.reader = new SparkVectorizedOrcRecordReader(file, conf,
+  new org.apache.hadoop.mapred.FileSplit(fileSplit));
+
+this.columnIDs = columnIDs;
+this.internalValue = this.reader.createValue();
+this.progress = reader.getProgress();
+this.row = new Row(this.internalValue.cols, columnIDs);
+  }
+
+  @Override
+  public void close() throws IOException {
+reader.close();
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException,
+  InterruptedException {
+return NullWritable.get();
+  }
+
+  @Override
+  public InternalRow getCurrentValue() throws IOException,
+  InterruptedException {
+if (indexOfRow >= numRowsOfBatch) {
+  return null;
+}
+row.rowId = indexOfRow;
+indexOfRow++;
+
+return row;
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return progress;
+  }
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context)
+  throws IOException, InterruptedException {
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if (indexOfRow == numRowsOfBatch && progress < 1.0f) {
+  

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-08-09 Thread dafrista
Github user dafrista commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r74180849
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A RecordReader that returns InternalRow for Spark SQL execution.
+ * This reader uses an internal reader that returns Hive's 
VectorizedRowBatch. An adapter
+ * class is used to return internal row by directly accessing data in 
column vectors.
+ */
+public class VectorizedSparkOrcNewRecordReader
+extends org.apache.hadoop.mapreduce.RecordReader {
+  private final org.apache.hadoop.mapred.RecordReader reader;
+  private final int numColumns;
+  private VectorizedRowBatch internalValue;
+  private float progress = 0.0f;
+  private List columnIDs;
+
+  private long numRowsOfBatch = 0;
+  private int indexOfRow = 0;
+
+  private final Row row;
+
+  public VectorizedSparkOrcNewRecordReader(
+  Reader file,
+  JobConf conf,
+  FileSplit fileSplit,
+  List columnIDs) throws IOException {
+List types = file.getTypes();
+numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+this.reader = new SparkVectorizedOrcRecordReader(file, conf,
+  new org.apache.hadoop.mapred.FileSplit(fileSplit));
+
+this.columnIDs = columnIDs;
+this.internalValue = this.reader.createValue();
+this.progress = reader.getProgress();
+this.row = new Row(this.internalValue.cols, columnIDs);
+  }
+
+  @Override
+  public void close() throws IOException {
+reader.close();
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException,
+  InterruptedException {
+return NullWritable.get();
+  }
+
+  @Override
+  public InternalRow getCurrentValue() throws IOException,
+  InterruptedException {
+if (indexOfRow >= numRowsOfBatch) {
+  return null;
+}
+row.rowId = indexOfRow;
+indexOfRow++;
+
+return row;
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return progress;
+  }
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context)
+  throws IOException, InterruptedException {
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if (indexOfRow == numRowsOfBatch && progress < 1.0f) {
+

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-06-22 Thread dafrista
Github user dafrista commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r68135229
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java
 ---
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * A RecordReader that returns InternalRow for Spark SQL execution.
+ * This reader uses an internal reader that returns Hive's 
VectorizedRowBatch. An adapter
+ * class is used to return internal row by directly accessing data in 
column vectors.
+ */
+public class VectorizedSparkOrcNewRecordReader
+extends org.apache.hadoop.mapreduce.RecordReader {
+  private final org.apache.hadoop.mapred.RecordReader reader;
+  private final int numColumns;
+  private VectorizedRowBatch internalValue;
+  private float progress = 0.0f;
+  private List columnIDs;
+
+  private long numRowsOfBatch = 0;
+  private int indexOfRow = 0;
+
+  private final Row row;
+
+  public VectorizedSparkOrcNewRecordReader(
+  Reader file,
+  JobConf conf,
+  FileSplit fileSplit,
+  List columnIDs) throws IOException {
+List types = file.getTypes();
+numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+this.reader = new SparkVectorizedOrcRecordReader(file, conf,
+  new org.apache.hadoop.mapred.FileSplit(fileSplit));
+
+this.columnIDs = columnIDs;
+this.internalValue = this.reader.createValue();
+this.progress = reader.getProgress();
+this.row = new Row(this.internalValue.cols, columnIDs);
+  }
+
+  @Override
+  public void close() throws IOException {
+reader.close();
+  }
+
+  @Override
+  public NullWritable getCurrentKey() throws IOException,
+  InterruptedException {
+return NullWritable.get();
+  }
+
+  @Override
+  public InternalRow getCurrentValue() throws IOException,
+  InterruptedException {
+if (indexOfRow >= numRowsOfBatch) {
+  return null;
+}
+row.rowId = indexOfRow;
+indexOfRow++;
+
+return row;
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+return progress;
+  }
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context)
+  throws IOException, InterruptedException {
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+if (indexOfRow == numRowsOfBatch && progress < 1.0f) {
+

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-06-22 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r68016908
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * A mapred.RecordReader that returns VectorizedRowBatch.
+ */
+public class SparkVectorizedOrcRecordReader
+  implements RecordReader {
+private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+private final long offset;
+private final long length;
+private float progress = 0.0f;
+private ObjectInspector objectInspector;
+
+SparkVectorizedOrcRecordReader(Reader file, Configuration conf,
+FileSplit fileSplit) throws IOException {
+  this.offset = fileSplit.getStart();
+  this.length = fileSplit.getLength();
+  this.objectInspector = file.getObjectInspector();
+  this.reader = OrcInputFormat.createReaderFromFile(file, conf, 
this.offset,
+this.length);
+  this.progress = reader.getProgress();
+}
+
+/**
+ * Create a ColumnVector based on given ObjectInspector's type info.
+ *
+ * @param inspector ObjectInspector
+ */
+private ColumnVector createColumnVector(ObjectInspector inspector) {
+  switch(inspector.getCategory()) {
+  case PRIMITIVE:
+{
+  PrimitiveTypeInfo primitiveTypeInfo =
+(PrimitiveTypeInfo) 
((PrimitiveObjectInspector)inspector).getTypeInfo();
+  switch(primitiveTypeInfo.getPrimitiveCategory()) {
+case BOOLEAN:
+case BYTE:
+case SHORT:
+case INT:
+case LONG:
+case DATE:
+case INTERVAL_YEAR_MONTH:
+  return new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+case FLOAT:
+case DOUBLE:
+  return new 
DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+case BINARY:
+case STRING:
+case CHAR:
+case VARCHAR:
+  BytesColumnVector column = new 
BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  column.initBuffer();
+  return column;
+case DECIMAL:
+  DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo;
+  return new 
DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+  tInfo.precision(), tInfo.scale());
+default:
+  

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-06-22 Thread zjffdu
Github user zjffdu commented on a diff in the pull request:

https://github.com/apache/spark/pull/13775#discussion_r68016740
  
--- Diff: 
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import 
org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * A mapred.RecordReader that returns VectorizedRowBatch.
+ */
+public class SparkVectorizedOrcRecordReader
+  implements RecordReader {
+private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+private final long offset;
+private final long length;
+private float progress = 0.0f;
+private ObjectInspector objectInspector;
+
+SparkVectorizedOrcRecordReader(Reader file, Configuration conf,
+FileSplit fileSplit) throws IOException {
+  this.offset = fileSplit.getStart();
+  this.length = fileSplit.getLength();
+  this.objectInspector = file.getObjectInspector();
+  this.reader = OrcInputFormat.createReaderFromFile(file, conf, 
this.offset,
+this.length);
+  this.progress = reader.getProgress();
+}
+
+/**
+ * Create a ColumnVector based on given ObjectInspector's type info.
+ *
+ * @param inspector ObjectInspector
+ */
+private ColumnVector createColumnVector(ObjectInspector inspector) {
+  switch(inspector.getCategory()) {
+  case PRIMITIVE:
+{
+  PrimitiveTypeInfo primitiveTypeInfo =
+(PrimitiveTypeInfo) 
((PrimitiveObjectInspector)inspector).getTypeInfo();
+  switch(primitiveTypeInfo.getPrimitiveCategory()) {
+case BOOLEAN:
+case BYTE:
+case SHORT:
+case INT:
+case LONG:
+case DATE:
+case INTERVAL_YEAR_MONTH:
+  return new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+case FLOAT:
+case DOUBLE:
+  return new 
DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+case BINARY:
+case STRING:
+case CHAR:
+case VARCHAR:
+  BytesColumnVector column = new 
BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+  column.initBuffer();
+  return column;
+case DECIMAL:
+  DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo;
+  return new 
DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE,
+  tInfo.precision(), tInfo.scale());
+default:
+  

[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader

2016-06-19 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/13775

[SPARK-16060][SQL] Vectorized Orc reader

## What changes were proposed in this pull request?

Currently Orc reader in Spark SQL doesn't support vectorized reading. As 
Hive Orc already support vectorization, we can add this support to improve Orc 
reading performance.

### Benchmark

Benchmark code:

test("Benchmark for Orc") {
  val N = 500 << 12
withOrcTable((0 until N).map(i => (i, i.toString, i.toLong, 
i.toDouble)), "t") {
  val benchmark = new Benchmark("Orc reader", N)
  benchmark.addCase("reading Orc file", 10) { iter =>
sql("SELECT * FROM t").collect()
  }
  benchmark.run()
  }
}

Before this patch:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 
3.19.0-25-generic
Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz
Orc reader:  Best/Avg Time(ms)Rate(M/s) 
  Per Row(ns)   Relative


reading Orc file  4750 / 5266  0.4  
  2319.1   1.0X

After this patch:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 
3.19.0-25-generic
Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz
Orc reader:  Best/Avg Time(ms)Rate(M/s) 
  Per Row(ns)   Relative


reading Orc file  3550 / 3824  0.6  
  1733.2   1.0X



## How was this patch tested?
Existing tests.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 vectorized-orc-reader3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/13775.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #13775


commit 2861ac2a5136c065ec38cfc24bf9f979d5b7ae07
Author: Liang-Chi Hsieh 
Date:   2016-06-16T02:31:23Z

Add vectorized Orc reader support.

commit eee8eca70920d624becb43c8510d217ce4d9820b
Author: Liang-Chi Hsieh 
Date:   2016-06-17T09:44:11Z

import.

commit b753d09e3e369fc91a17d9632123dbe40d7d9dfb
Author: Liang-Chi Hsieh 
Date:   2016-06-18T10:00:00Z

If column is repeating, always using row id 0.

commit 7d26f5ed785269299b324df8bfc1c64c2d4a2b48
Author: Liang-Chi Hsieh 
Date:   2016-06-19T04:16:49Z

Fix bugs of getBinary and numFields.

commit 74fe936e522a827384461e445b9ba44f96ce29fe
Author: Liang-Chi Hsieh 
Date:   2016-06-20T02:44:07Z

Remove unnecessary change.

commit 7e7bb6c57860187f391f66ca82cdd715d0b2be43
Author: Liang-Chi Hsieh 
Date:   2016-06-20T02:48:11Z

Remove unnecessary change.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org