[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user viirya closed the pull request at: https://github.com/apache/spark/pull/13775 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r89127052 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; + +/** + * A mapred.RecordReader that returns VectorizedRowBatch. + */ +public class SparkVectorizedOrcRecordReader + implements RecordReader{ +private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; +private final long offset; +private final long length; +private float progress = 0.0f; +private ObjectInspector objectInspector; + +SparkVectorizedOrcRecordReader(Reader file, Configuration conf, +FileSplit fileSplit) throws IOException { + this.offset = fileSplit.getStart(); + this.length = fileSplit.getLength(); + this.objectInspector = file.getObjectInspector(); + this.reader = OrcInputFormat.createReaderFromFile(file, conf, this.offset, +this.length); + this.progress = reader.getProgress(); +} + +/** + * Create a ColumnVector based on given ObjectInspector's type info. + * + * @param inspector ObjectInspector + */ +private ColumnVector createColumnVector(ObjectInspector inspector) { + switch(inspector.getCategory()) { +case PRIMITIVE: + { +PrimitiveTypeInfo primitiveTypeInfo = + (PrimitiveTypeInfo) ((PrimitiveObjectInspector)inspector).getTypeInfo(); +switch(primitiveTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case DATE: + case INTERVAL_YEAR_MONTH: +return new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case FLOAT: + case DOUBLE: +return new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case BINARY: + case STRING: + case CHAR: + case VARCHAR: +BytesColumnVector column = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); +column.initBuffer(); +return column; + case DECIMAL: +DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo; +return new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE, +tInfo.precision(), tInfo.scale()); + default: +throw
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83796484 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A RecordReader that returns InternalRow for Spark SQL execution. + * This reader uses an internal reader that returns Hive's VectorizedRowBatch. An adapter + * class is used to return internal row by directly accessing data in column vectors. + */ +public class VectorizedSparkOrcNewRecordReader +extends org.apache.hadoop.mapreduce.RecordReader{ + private final org.apache.hadoop.mapred.RecordReader reader; + private final int numColumns; + private VectorizedRowBatch internalValue; + private float progress = 0.0f; + private List columnIDs; + + private long numRowsOfBatch = 0; + private int indexOfRow = 0; + + private final Row row; + + public VectorizedSparkOrcNewRecordReader( + Reader file, + JobConf conf, + FileSplit fileSplit, + List columnIDs) throws IOException { +List types = file.getTypes(); +numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount(); +this.reader = new SparkVectorizedOrcRecordReader(file, conf, + new org.apache.hadoop.mapred.FileSplit(fileSplit)); + +this.columnIDs = new ArrayList<>(columnIDs); +this.internalValue = this.reader.createValue(); +this.progress = reader.getProgress(); +this.row = new Row(this.internalValue.cols, this.columnIDs); + } + + @Override + public void close() throws IOException { +reader.close(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, + InterruptedException { +return NullWritable.get(); + } + + @Override + public InternalRow getCurrentValue() throws IOException, + InterruptedException { +if (indexOfRow >= numRowsOfBatch) { + return null; +} +row.rowId = indexOfRow; +indexOfRow++; + +return row; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return progress; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +if
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83796409 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala --- @@ -118,6 +120,11 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) +val enableVectorizedReader: Boolean = + sparkSession.sessionState.conf.orcVectorizedReaderEnabled && + dataSchema.forall(f => f.dataType.isInstanceOf[AtomicType] && --- End diff -- This is similar with ParquetFileFormat does. We might not add new `AtomicType` frequently and current `AtomicType` should be relatively stable. If we do, it should be easily tested out the data type is not supported in reader codes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83756912 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; + +/** + * A mapred.RecordReader that returns VectorizedRowBatch. + */ +public class SparkVectorizedOrcRecordReader + implements RecordReader{ +private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; +private final long offset; +private final long length; +private float progress = 0.0f; +private ObjectInspector objectInspector; + +SparkVectorizedOrcRecordReader(Reader file, Configuration conf, +FileSplit fileSplit) throws IOException { + this.offset = fileSplit.getStart(); + this.length = fileSplit.getLength(); + this.objectInspector = file.getObjectInspector(); + this.reader = OrcInputFormat.createReaderFromFile(file, conf, this.offset, +this.length); + this.progress = reader.getProgress(); +} + +/** + * Create a ColumnVector based on given ObjectInspector's type info. + * + * @param inspector ObjectInspector + */ +private ColumnVector createColumnVector(ObjectInspector inspector) { + switch(inspector.getCategory()) { +case PRIMITIVE: + { +PrimitiveTypeInfo primitiveTypeInfo = + (PrimitiveTypeInfo) ((PrimitiveObjectInspector)inspector).getTypeInfo(); +switch(primitiveTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case DATE: + case INTERVAL_YEAR_MONTH: +return new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case FLOAT: + case DOUBLE: +return new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case BINARY: + case STRING: + case CHAR: + case VARCHAR: +BytesColumnVector column = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); +column.initBuffer(); +return column; + case DECIMAL: +DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo; +return new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE, +tInfo.precision(), tInfo.scale()); + default: +
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83753217 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; + +/** + * A mapred.RecordReader that returns VectorizedRowBatch. + */ +public class SparkVectorizedOrcRecordReader + implements RecordReader{ +private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; +private final long offset; +private final long length; +private float progress = 0.0f; +private ObjectInspector objectInspector; + +SparkVectorizedOrcRecordReader(Reader file, Configuration conf, --- End diff -- nit: have each param on a separate line for readability. There are other places in this PR where the same comment will apply --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83764569 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala --- @@ -131,31 +138,43 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable val physicalSchema = maybePhysicalSchema.get OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema) -val orcRecordReader = { - val job = Job.getInstance(conf) - FileInputFormat.setInputPaths(job, file.filePath) - - val fileSplit = new FileSplit( -new Path(new URI(file.filePath)), file.start, file.length, Array.empty - ) - // Custom OrcRecordReader is used to get - // ObjectInspector during recordReader creation itself and can - // avoid NameNode call in unwrapOrcStructs per file. - // Specifically would be helpful for partitioned datasets. - val orcReader = OrcFile.createReader( -new Path(new URI(file.filePath)), OrcFile.readerOptions(conf)) - new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart, fileSplit.getLength) +val job = Job.getInstance(conf) +FileInputFormat.setInputPaths(job, file.filePath) + +val fileSplit = new FileSplit( + new Path(new URI(file.filePath)), file.start, file.length, Array.empty +) +// Custom OrcRecordReader is used to get +// ObjectInspector during recordReader creation itself and can +// avoid NameNode call in unwrapOrcStructs per file. +// Specifically would be helpful for partitioned datasets. +val orcReader = OrcFile.createReader( + new Path(new URI(file.filePath)), OrcFile.readerOptions(conf)) + +if (enableVectorizedReader) { + val conf = job.getConfiguration.asInstanceOf[JobConf] --- End diff -- why can't you reuse the `conf` at line 129 (https://github.com/apache/spark/pull/13775/files#diff-01999ccbf13e95a0ea2d223f69d8ae23R129) ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83752360 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; + +/** + * A mapred.RecordReader that returns VectorizedRowBatch. + */ +public class SparkVectorizedOrcRecordReader + implements RecordReader{ +private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; +private final long offset; +private final long length; +private float progress = 0.0f; +private ObjectInspector objectInspector; + +SparkVectorizedOrcRecordReader(Reader file, Configuration conf, +FileSplit fileSplit) throws IOException { + this.offset = fileSplit.getStart(); + this.length = fileSplit.getLength(); + this.objectInspector = file.getObjectInspector(); + this.reader = OrcInputFormat.createReaderFromFile(file, conf, this.offset, +this.length); + this.progress = reader.getProgress(); +} + +/** + * Create a ColumnVector based on given ObjectInspector's type info. + * + * @param inspector ObjectInspector + */ +private ColumnVector createColumnVector(ObjectInspector inspector) { + switch(inspector.getCategory()) { +case PRIMITIVE: + { +PrimitiveTypeInfo primitiveTypeInfo = + (PrimitiveTypeInfo) ((PrimitiveObjectInspector)inspector).getTypeInfo(); +switch(primitiveTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case DATE: + case INTERVAL_YEAR_MONTH: +return new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case FLOAT: + case DOUBLE: +return new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case BINARY: + case STRING: + case CHAR: + case VARCHAR: +BytesColumnVector column = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); +column.initBuffer(); +return column; + case DECIMAL: +DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo; +return new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE, +tInfo.precision(), tInfo.scale()); + default: +
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83753291 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; + +/** + * A mapred.RecordReader that returns VectorizedRowBatch. + */ +public class SparkVectorizedOrcRecordReader + implements RecordReader{ +private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; +private final long offset; +private final long length; +private float progress = 0.0f; +private ObjectInspector objectInspector; + +SparkVectorizedOrcRecordReader(Reader file, Configuration conf, +FileSplit fileSplit) throws IOException { + this.offset = fileSplit.getStart(); + this.length = fileSplit.getLength(); + this.objectInspector = file.getObjectInspector(); + this.reader = OrcInputFormat.createReaderFromFile(file, conf, this.offset, +this.length); + this.progress = reader.getProgress(); +} + +/** + * Create a ColumnVector based on given ObjectInspector's type info. + * + * @param inspector ObjectInspector + */ +private ColumnVector createColumnVector(ObjectInspector inspector) { + switch(inspector.getCategory()) { +case PRIMITIVE: + { +PrimitiveTypeInfo primitiveTypeInfo = + (PrimitiveTypeInfo) ((PrimitiveObjectInspector)inspector).getTypeInfo(); +switch(primitiveTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case DATE: + case INTERVAL_YEAR_MONTH: +return new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case FLOAT: + case DOUBLE: +return new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case BINARY: + case STRING: + case CHAR: + case VARCHAR: +BytesColumnVector column = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); +column.initBuffer(); +return column; + case DECIMAL: +DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo; --- End diff -- `tInfo` -> `decimalTypeInfo` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83761287 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A RecordReader that returns InternalRow for Spark SQL execution. + * This reader uses an internal reader that returns Hive's VectorizedRowBatch. An adapter + * class is used to return internal row by directly accessing data in column vectors. + */ +public class VectorizedSparkOrcNewRecordReader +extends org.apache.hadoop.mapreduce.RecordReader{ + private final org.apache.hadoop.mapred.RecordReader reader; + private final int numColumns; + private VectorizedRowBatch internalValue; + private float progress = 0.0f; + private List columnIDs; + + private long numRowsOfBatch = 0; + private int indexOfRow = 0; + + private final Row row; + + public VectorizedSparkOrcNewRecordReader( + Reader file, + JobConf conf, + FileSplit fileSplit, + List columnIDs) throws IOException { +List types = file.getTypes(); +numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount(); +this.reader = new SparkVectorizedOrcRecordReader(file, conf, + new org.apache.hadoop.mapred.FileSplit(fileSplit)); + +this.columnIDs = new ArrayList<>(columnIDs); +this.internalValue = this.reader.createValue(); +this.progress = reader.getProgress(); +this.row = new Row(this.internalValue.cols, this.columnIDs); + } + + @Override + public void close() throws IOException { +reader.close(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, + InterruptedException { +return NullWritable.get(); + } + + @Override + public InternalRow getCurrentValue() throws IOException, + InterruptedException { +if (indexOfRow >= numRowsOfBatch) { + return null; +} +row.rowId = indexOfRow; +indexOfRow++; + +return row; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return progress; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +if
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83751452 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -251,6 +251,12 @@ object SQLConf { .booleanConf .createWithDefault(true) + val ORC_VECTORIZED_READER_ENABLED = +SQLConfigBuilder("spark.sql.orc.enableVectorizedReader") + .doc("Enables vectorized orc reader.") + .booleanConf + .createWithDefault(true) --- End diff -- Please turn it off by default. Until there is more testing, this might be risky thing to do. If you have productionised this code and have been running smoothly for sometime, then it would be comfortable to launch with default turned on. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83756710 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; + +/** + * A mapred.RecordReader that returns VectorizedRowBatch. + */ +public class SparkVectorizedOrcRecordReader + implements RecordReader{ +private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; +private final long offset; +private final long length; +private float progress = 0.0f; +private ObjectInspector objectInspector; + +SparkVectorizedOrcRecordReader(Reader file, Configuration conf, +FileSplit fileSplit) throws IOException { + this.offset = fileSplit.getStart(); + this.length = fileSplit.getLength(); + this.objectInspector = file.getObjectInspector(); + this.reader = OrcInputFormat.createReaderFromFile(file, conf, this.offset, +this.length); + this.progress = reader.getProgress(); +} + +/** + * Create a ColumnVector based on given ObjectInspector's type info. + * + * @param inspector ObjectInspector + */ +private ColumnVector createColumnVector(ObjectInspector inspector) { + switch(inspector.getCategory()) { +case PRIMITIVE: + { +PrimitiveTypeInfo primitiveTypeInfo = + (PrimitiveTypeInfo) ((PrimitiveObjectInspector)inspector).getTypeInfo(); +switch(primitiveTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case DATE: + case INTERVAL_YEAR_MONTH: +return new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case FLOAT: + case DOUBLE: +return new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case BINARY: + case STRING: + case CHAR: + case VARCHAR: +BytesColumnVector column = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); +column.initBuffer(); +return column; + case DECIMAL: +DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo; +return new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE, +tInfo.precision(), tInfo.scale()); + default: +
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83752583 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; + +/** + * A mapred.RecordReader that returns VectorizedRowBatch. + */ +public class SparkVectorizedOrcRecordReader + implements RecordReader{ +private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; +private final long offset; +private final long length; +private float progress = 0.0f; +private ObjectInspector objectInspector; + +SparkVectorizedOrcRecordReader(Reader file, Configuration conf, +FileSplit fileSplit) throws IOException { + this.offset = fileSplit.getStart(); + this.length = fileSplit.getLength(); + this.objectInspector = file.getObjectInspector(); + this.reader = OrcInputFormat.createReaderFromFile(file, conf, this.offset, +this.length); + this.progress = reader.getProgress(); +} + +/** + * Create a ColumnVector based on given ObjectInspector's type info. + * + * @param inspector ObjectInspector + */ +private ColumnVector createColumnVector(ObjectInspector inspector) { + switch(inspector.getCategory()) { +case PRIMITIVE: + { +PrimitiveTypeInfo primitiveTypeInfo = + (PrimitiveTypeInfo) ((PrimitiveObjectInspector)inspector).getTypeInfo(); +switch(primitiveTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case DATE: + case INTERVAL_YEAR_MONTH: +return new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case FLOAT: + case DOUBLE: +return new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case BINARY: + case STRING: + case CHAR: + case VARCHAR: +BytesColumnVector column = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); +column.initBuffer(); +return column; + case DECIMAL: +DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo; +return new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE, +tInfo.precision(), tInfo.scale()); + default: +
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83760016 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A RecordReader that returns InternalRow for Spark SQL execution. + * This reader uses an internal reader that returns Hive's VectorizedRowBatch. An adapter + * class is used to return internal row by directly accessing data in column vectors. + */ +public class VectorizedSparkOrcNewRecordReader +extends org.apache.hadoop.mapreduce.RecordReader{ + private final org.apache.hadoop.mapred.RecordReader reader; + private final int numColumns; + private VectorizedRowBatch internalValue; + private float progress = 0.0f; + private List columnIDs; + + private long numRowsOfBatch = 0; + private int indexOfRow = 0; + + private final Row row; + + public VectorizedSparkOrcNewRecordReader( + Reader file, + JobConf conf, + FileSplit fileSplit, + List columnIDs) throws IOException { +List types = file.getTypes(); +numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount(); +this.reader = new SparkVectorizedOrcRecordReader(file, conf, + new org.apache.hadoop.mapred.FileSplit(fileSplit)); + +this.columnIDs = new ArrayList<>(columnIDs); +this.internalValue = this.reader.createValue(); +this.progress = reader.getProgress(); +this.row = new Row(this.internalValue.cols, this.columnIDs); + } + + @Override + public void close() throws IOException { +reader.close(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, + InterruptedException { +return NullWritable.get(); + } + + @Override + public InternalRow getCurrentValue() throws IOException, + InterruptedException { +if (indexOfRow >= numRowsOfBatch) { + return null; +} +row.rowId = indexOfRow; +indexOfRow++; + +return row; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return progress; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +if
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83763831 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala --- @@ -118,6 +120,11 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) +val enableVectorizedReader: Boolean = + sparkSession.sessionState.conf.orcVectorizedReaderEnabled && + dataSchema.forall(f => f.dataType.isInstanceOf[AtomicType] && --- End diff -- This is not reliable. If a new `AtomicType` type gets introduced and not supported for vectorised reads, this will not guard us against that. Its easy for anyone to forget to add new atomic type to the exclusion list in this check --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83756504 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; + +/** + * A mapred.RecordReader that returns VectorizedRowBatch. + */ +public class SparkVectorizedOrcRecordReader + implements RecordReader{ +private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; +private final long offset; +private final long length; +private float progress = 0.0f; +private ObjectInspector objectInspector; + +SparkVectorizedOrcRecordReader(Reader file, Configuration conf, +FileSplit fileSplit) throws IOException { + this.offset = fileSplit.getStart(); + this.length = fileSplit.getLength(); + this.objectInspector = file.getObjectInspector(); + this.reader = OrcInputFormat.createReaderFromFile(file, conf, this.offset, +this.length); + this.progress = reader.getProgress(); +} + +/** + * Create a ColumnVector based on given ObjectInspector's type info. + * + * @param inspector ObjectInspector + */ +private ColumnVector createColumnVector(ObjectInspector inspector) { + switch(inspector.getCategory()) { +case PRIMITIVE: + { +PrimitiveTypeInfo primitiveTypeInfo = + (PrimitiveTypeInfo) ((PrimitiveObjectInspector)inspector).getTypeInfo(); +switch(primitiveTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case DATE: + case INTERVAL_YEAR_MONTH: +return new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case FLOAT: + case DOUBLE: +return new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case BINARY: + case STRING: + case CHAR: + case VARCHAR: +BytesColumnVector column = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); +column.initBuffer(); +return column; + case DECIMAL: +DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo; +return new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE, +tInfo.precision(), tInfo.scale()); + default: +
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83753422 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; + +/** + * A mapred.RecordReader that returns VectorizedRowBatch. + */ +public class SparkVectorizedOrcRecordReader + implements RecordReader{ +private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; +private final long offset; +private final long length; +private float progress = 0.0f; +private ObjectInspector objectInspector; + +SparkVectorizedOrcRecordReader(Reader file, Configuration conf, +FileSplit fileSplit) throws IOException { + this.offset = fileSplit.getStart(); + this.length = fileSplit.getLength(); + this.objectInspector = file.getObjectInspector(); + this.reader = OrcInputFormat.createReaderFromFile(file, conf, this.offset, +this.length); + this.progress = reader.getProgress(); +} + +/** + * Create a ColumnVector based on given ObjectInspector's type info. + * + * @param inspector ObjectInspector + */ +private ColumnVector createColumnVector(ObjectInspector inspector) { + switch(inspector.getCategory()) { +case PRIMITIVE: + { +PrimitiveTypeInfo primitiveTypeInfo = + (PrimitiveTypeInfo) ((PrimitiveObjectInspector)inspector).getTypeInfo(); +switch(primitiveTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case DATE: + case INTERVAL_YEAR_MONTH: +return new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case FLOAT: + case DOUBLE: +return new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case BINARY: + case STRING: + case CHAR: + case VARCHAR: +BytesColumnVector column = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); +column.initBuffer(); +return column; + case DECIMAL: +DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo; +return new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE, +tInfo.precision(), tInfo.scale()); + default: +
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83761710 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A RecordReader that returns InternalRow for Spark SQL execution. + * This reader uses an internal reader that returns Hive's VectorizedRowBatch. An adapter + * class is used to return internal row by directly accessing data in column vectors. + */ +public class VectorizedSparkOrcNewRecordReader +extends org.apache.hadoop.mapreduce.RecordReader{ + private final org.apache.hadoop.mapred.RecordReader reader; + private final int numColumns; + private VectorizedRowBatch internalValue; + private float progress = 0.0f; + private List columnIDs; + + private long numRowsOfBatch = 0; + private int indexOfRow = 0; + + private final Row row; + + public VectorizedSparkOrcNewRecordReader( + Reader file, + JobConf conf, + FileSplit fileSplit, + List columnIDs) throws IOException { +List types = file.getTypes(); +numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount(); +this.reader = new SparkVectorizedOrcRecordReader(file, conf, + new org.apache.hadoop.mapred.FileSplit(fileSplit)); + +this.columnIDs = new ArrayList<>(columnIDs); +this.internalValue = this.reader.createValue(); +this.progress = reader.getProgress(); +this.row = new Row(this.internalValue.cols, this.columnIDs); + } + + @Override + public void close() throws IOException { +reader.close(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, + InterruptedException { +return NullWritable.get(); + } + + @Override + public InternalRow getCurrentValue() throws IOException, + InterruptedException { +if (indexOfRow >= numRowsOfBatch) { + return null; +} +row.rowId = indexOfRow; +indexOfRow++; + +return row; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return progress; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +if
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83757435 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A RecordReader that returns InternalRow for Spark SQL execution. + * This reader uses an internal reader that returns Hive's VectorizedRowBatch. An adapter + * class is used to return internal row by directly accessing data in column vectors. + */ +public class VectorizedSparkOrcNewRecordReader +extends org.apache.hadoop.mapreduce.RecordReader{ + private final org.apache.hadoop.mapred.RecordReader reader; + private final int numColumns; + private VectorizedRowBatch internalValue; + private float progress = 0.0f; + private List columnIDs; + + private long numRowsOfBatch = 0; + private int indexOfRow = 0; + + private final Row row; + + public VectorizedSparkOrcNewRecordReader( + Reader file, + JobConf conf, + FileSplit fileSplit, + List columnIDs) throws IOException { +List types = file.getTypes(); +numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount(); +this.reader = new SparkVectorizedOrcRecordReader(file, conf, + new org.apache.hadoop.mapred.FileSplit(fileSplit)); + +this.columnIDs = new ArrayList<>(columnIDs); +this.internalValue = this.reader.createValue(); +this.progress = reader.getProgress(); +this.row = new Row(this.internalValue.cols, this.columnIDs); + } + + @Override + public void close() throws IOException { +reader.close(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, + InterruptedException { --- End diff -- nit: move all the `InterruptedException` in the previous line. This might also apply to other places in the PR but I am not pointing out each instance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83756988 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; + +/** + * A mapred.RecordReader that returns VectorizedRowBatch. + */ +public class SparkVectorizedOrcRecordReader + implements RecordReader{ +private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; +private final long offset; +private final long length; +private float progress = 0.0f; +private ObjectInspector objectInspector; + +SparkVectorizedOrcRecordReader(Reader file, Configuration conf, +FileSplit fileSplit) throws IOException { + this.offset = fileSplit.getStart(); + this.length = fileSplit.getLength(); + this.objectInspector = file.getObjectInspector(); + this.reader = OrcInputFormat.createReaderFromFile(file, conf, this.offset, +this.length); + this.progress = reader.getProgress(); +} + +/** + * Create a ColumnVector based on given ObjectInspector's type info. + * + * @param inspector ObjectInspector + */ +private ColumnVector createColumnVector(ObjectInspector inspector) { + switch(inspector.getCategory()) { +case PRIMITIVE: + { +PrimitiveTypeInfo primitiveTypeInfo = + (PrimitiveTypeInfo) ((PrimitiveObjectInspector)inspector).getTypeInfo(); +switch(primitiveTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case DATE: + case INTERVAL_YEAR_MONTH: +return new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case FLOAT: + case DOUBLE: +return new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case BINARY: + case STRING: + case CHAR: + case VARCHAR: +BytesColumnVector column = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); +column.initBuffer(); +return column; + case DECIMAL: +DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo; +return new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE, +tInfo.precision(), tInfo.scale()); + default: +
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83753744 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; + +/** + * A mapred.RecordReader that returns VectorizedRowBatch. + */ +public class SparkVectorizedOrcRecordReader + implements RecordReader{ +private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; +private final long offset; +private final long length; +private float progress = 0.0f; +private ObjectInspector objectInspector; + +SparkVectorizedOrcRecordReader(Reader file, Configuration conf, +FileSplit fileSplit) throws IOException { + this.offset = fileSplit.getStart(); + this.length = fileSplit.getLength(); + this.objectInspector = file.getObjectInspector(); + this.reader = OrcInputFormat.createReaderFromFile(file, conf, this.offset, +this.length); + this.progress = reader.getProgress(); +} + +/** + * Create a ColumnVector based on given ObjectInspector's type info. + * + * @param inspector ObjectInspector + */ +private ColumnVector createColumnVector(ObjectInspector inspector) { + switch(inspector.getCategory()) { +case PRIMITIVE: + { +PrimitiveTypeInfo primitiveTypeInfo = + (PrimitiveTypeInfo) ((PrimitiveObjectInspector)inspector).getTypeInfo(); +switch(primitiveTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case DATE: + case INTERVAL_YEAR_MONTH: +return new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case FLOAT: + case DOUBLE: +return new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case BINARY: + case STRING: + case CHAR: + case VARCHAR: +BytesColumnVector column = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); +column.initBuffer(); +return column; + case DECIMAL: +DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo; +return new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE, +tInfo.precision(), tInfo.scale()); + default: +
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83759570 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A RecordReader that returns InternalRow for Spark SQL execution. + * This reader uses an internal reader that returns Hive's VectorizedRowBatch. An adapter + * class is used to return internal row by directly accessing data in column vectors. + */ +public class VectorizedSparkOrcNewRecordReader +extends org.apache.hadoop.mapreduce.RecordReader{ + private final org.apache.hadoop.mapred.RecordReader reader; + private final int numColumns; + private VectorizedRowBatch internalValue; + private float progress = 0.0f; + private List columnIDs; + + private long numRowsOfBatch = 0; + private int indexOfRow = 0; + + private final Row row; + + public VectorizedSparkOrcNewRecordReader( + Reader file, + JobConf conf, + FileSplit fileSplit, + List columnIDs) throws IOException { +List types = file.getTypes(); +numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount(); +this.reader = new SparkVectorizedOrcRecordReader(file, conf, + new org.apache.hadoop.mapred.FileSplit(fileSplit)); + +this.columnIDs = new ArrayList<>(columnIDs); +this.internalValue = this.reader.createValue(); +this.progress = reader.getProgress(); +this.row = new Row(this.internalValue.cols, this.columnIDs); + } + + @Override + public void close() throws IOException { +reader.close(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, + InterruptedException { +return NullWritable.get(); + } + + @Override + public InternalRow getCurrentValue() throws IOException, + InterruptedException { +if (indexOfRow >= numRowsOfBatch) { + return null; +} +row.rowId = indexOfRow; +indexOfRow++; + +return row; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return progress; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +if
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83752300 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; + +/** + * A mapred.RecordReader that returns VectorizedRowBatch. + */ +public class SparkVectorizedOrcRecordReader + implements RecordReader{ +private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; +private final long offset; +private final long length; +private float progress = 0.0f; +private ObjectInspector objectInspector; + +SparkVectorizedOrcRecordReader(Reader file, Configuration conf, +FileSplit fileSplit) throws IOException { + this.offset = fileSplit.getStart(); + this.length = fileSplit.getLength(); + this.objectInspector = file.getObjectInspector(); + this.reader = OrcInputFormat.createReaderFromFile(file, conf, this.offset, +this.length); + this.progress = reader.getProgress(); +} + +/** + * Create a ColumnVector based on given ObjectInspector's type info. + * + * @param inspector ObjectInspector + */ +private ColumnVector createColumnVector(ObjectInspector inspector) { + switch(inspector.getCategory()) { +case PRIMITIVE: + { +PrimitiveTypeInfo primitiveTypeInfo = + (PrimitiveTypeInfo) ((PrimitiveObjectInspector)inspector).getTypeInfo(); +switch(primitiveTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case DATE: + case INTERVAL_YEAR_MONTH: +return new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case FLOAT: + case DOUBLE: +return new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case BINARY: + case STRING: + case CHAR: + case VARCHAR: +BytesColumnVector column = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); +column.initBuffer(); +return column; + case DECIMAL: +DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo; +return new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE, +tInfo.precision(), tInfo.scale()); + default: +
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r83757105 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; + +/** + * A mapred.RecordReader that returns VectorizedRowBatch. + */ +public class SparkVectorizedOrcRecordReader + implements RecordReader{ +private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; +private final long offset; +private final long length; +private float progress = 0.0f; +private ObjectInspector objectInspector; + +SparkVectorizedOrcRecordReader(Reader file, Configuration conf, +FileSplit fileSplit) throws IOException { + this.offset = fileSplit.getStart(); + this.length = fileSplit.getLength(); + this.objectInspector = file.getObjectInspector(); + this.reader = OrcInputFormat.createReaderFromFile(file, conf, this.offset, +this.length); + this.progress = reader.getProgress(); +} + +/** + * Create a ColumnVector based on given ObjectInspector's type info. + * + * @param inspector ObjectInspector + */ +private ColumnVector createColumnVector(ObjectInspector inspector) { + switch(inspector.getCategory()) { +case PRIMITIVE: + { +PrimitiveTypeInfo primitiveTypeInfo = + (PrimitiveTypeInfo) ((PrimitiveObjectInspector)inspector).getTypeInfo(); +switch(primitiveTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case DATE: + case INTERVAL_YEAR_MONTH: +return new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case FLOAT: + case DOUBLE: +return new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case BINARY: + case STRING: + case CHAR: + case VARCHAR: +BytesColumnVector column = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); +column.initBuffer(); +return column; + case DECIMAL: +DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo; +return new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE, +tInfo.precision(), tInfo.scale()); + default: +
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r74369496 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A RecordReader that returns InternalRow for Spark SQL execution. + * This reader uses an internal reader that returns Hive's VectorizedRowBatch. An adapter + * class is used to return internal row by directly accessing data in column vectors. + */ +public class VectorizedSparkOrcNewRecordReader +extends org.apache.hadoop.mapreduce.RecordReader{ + private final org.apache.hadoop.mapred.RecordReader reader; + private final int numColumns; + private VectorizedRowBatch internalValue; + private float progress = 0.0f; + private List columnIDs; + + private long numRowsOfBatch = 0; + private int indexOfRow = 0; + + private final Row row; + + public VectorizedSparkOrcNewRecordReader( + Reader file, + JobConf conf, + FileSplit fileSplit, + List columnIDs) throws IOException { +List types = file.getTypes(); +numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount(); +this.reader = new SparkVectorizedOrcRecordReader(file, conf, + new org.apache.hadoop.mapred.FileSplit(fileSplit)); + +this.columnIDs = new ArrayList<>(columnIDs); +this.internalValue = this.reader.createValue(); +this.progress = reader.getProgress(); +this.row = new Row(this.internalValue.cols, this.columnIDs); + } + + @Override + public void close() throws IOException { +reader.close(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, + InterruptedException { +return NullWritable.get(); + } + + @Override + public InternalRow getCurrentValue() throws IOException, + InterruptedException { +if (indexOfRow >= numRowsOfBatch) { + return null; +} +row.rowId = indexOfRow; +indexOfRow++; + +return row; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return progress; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +if
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user dafrista commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r74368871 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A RecordReader that returns InternalRow for Spark SQL execution. + * This reader uses an internal reader that returns Hive's VectorizedRowBatch. An adapter + * class is used to return internal row by directly accessing data in column vectors. + */ +public class VectorizedSparkOrcNewRecordReader +extends org.apache.hadoop.mapreduce.RecordReader{ + private final org.apache.hadoop.mapred.RecordReader reader; + private final int numColumns; + private VectorizedRowBatch internalValue; + private float progress = 0.0f; + private List columnIDs; + + private long numRowsOfBatch = 0; + private int indexOfRow = 0; + + private final Row row; + + public VectorizedSparkOrcNewRecordReader( + Reader file, + JobConf conf, + FileSplit fileSplit, + List columnIDs) throws IOException { +List types = file.getTypes(); +numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount(); +this.reader = new SparkVectorizedOrcRecordReader(file, conf, + new org.apache.hadoop.mapred.FileSplit(fileSplit)); + +this.columnIDs = new ArrayList<>(columnIDs); +this.internalValue = this.reader.createValue(); +this.progress = reader.getProgress(); +this.row = new Row(this.internalValue.cols, this.columnIDs); + } + + @Override + public void close() throws IOException { +reader.close(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, + InterruptedException { +return NullWritable.get(); + } + + @Override + public InternalRow getCurrentValue() throws IOException, + InterruptedException { +if (indexOfRow >= numRowsOfBatch) { + return null; +} +row.rowId = indexOfRow; +indexOfRow++; + +return row; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return progress; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +if
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r74367827 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A RecordReader that returns InternalRow for Spark SQL execution. + * This reader uses an internal reader that returns Hive's VectorizedRowBatch. An adapter + * class is used to return internal row by directly accessing data in column vectors. + */ +public class VectorizedSparkOrcNewRecordReader +extends org.apache.hadoop.mapreduce.RecordReader{ + private final org.apache.hadoop.mapred.RecordReader reader; + private final int numColumns; + private VectorizedRowBatch internalValue; + private float progress = 0.0f; + private List columnIDs; + + private long numRowsOfBatch = 0; + private int indexOfRow = 0; + + private final Row row; + + public VectorizedSparkOrcNewRecordReader( + Reader file, + JobConf conf, + FileSplit fileSplit, + List columnIDs) throws IOException { +List types = file.getTypes(); +numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount(); +this.reader = new SparkVectorizedOrcRecordReader(file, conf, + new org.apache.hadoop.mapred.FileSplit(fileSplit)); + +this.columnIDs = new ArrayList<>(columnIDs); +this.internalValue = this.reader.createValue(); +this.progress = reader.getProgress(); +this.row = new Row(this.internalValue.cols, this.columnIDs); + } + + @Override + public void close() throws IOException { +reader.close(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, + InterruptedException { +return NullWritable.get(); + } + + @Override + public InternalRow getCurrentValue() throws IOException, + InterruptedException { +if (indexOfRow >= numRowsOfBatch) { + return null; +} +row.rowId = indexOfRow; +indexOfRow++; + +return row; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return progress; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +if
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user dafrista commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r74356405 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A RecordReader that returns InternalRow for Spark SQL execution. + * This reader uses an internal reader that returns Hive's VectorizedRowBatch. An adapter + * class is used to return internal row by directly accessing data in column vectors. + */ +public class VectorizedSparkOrcNewRecordReader +extends org.apache.hadoop.mapreduce.RecordReader{ + private final org.apache.hadoop.mapred.RecordReader reader; + private final int numColumns; + private VectorizedRowBatch internalValue; + private float progress = 0.0f; + private List columnIDs; + + private long numRowsOfBatch = 0; + private int indexOfRow = 0; + + private final Row row; + + public VectorizedSparkOrcNewRecordReader( + Reader file, + JobConf conf, + FileSplit fileSplit, + List columnIDs) throws IOException { +List types = file.getTypes(); +numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount(); +this.reader = new SparkVectorizedOrcRecordReader(file, conf, + new org.apache.hadoop.mapred.FileSplit(fileSplit)); + +this.columnIDs = new ArrayList<>(columnIDs); +this.internalValue = this.reader.createValue(); +this.progress = reader.getProgress(); +this.row = new Row(this.internalValue.cols, this.columnIDs); + } + + @Override + public void close() throws IOException { +reader.close(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, + InterruptedException { +return NullWritable.get(); + } + + @Override + public InternalRow getCurrentValue() throws IOException, + InterruptedException { +if (indexOfRow >= numRowsOfBatch) { + return null; +} +row.rowId = indexOfRow; +indexOfRow++; + +return row; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return progress; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +if
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r74183317 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A RecordReader that returns InternalRow for Spark SQL execution. + * This reader uses an internal reader that returns Hive's VectorizedRowBatch. An adapter + * class is used to return internal row by directly accessing data in column vectors. + */ +public class VectorizedSparkOrcNewRecordReader +extends org.apache.hadoop.mapreduce.RecordReader{ + private final org.apache.hadoop.mapred.RecordReader reader; + private final int numColumns; + private VectorizedRowBatch internalValue; + private float progress = 0.0f; + private List columnIDs; + + private long numRowsOfBatch = 0; + private int indexOfRow = 0; + + private final Row row; + + public VectorizedSparkOrcNewRecordReader( + Reader file, + JobConf conf, + FileSplit fileSplit, + List columnIDs) throws IOException { +List types = file.getTypes(); +numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount(); +this.reader = new SparkVectorizedOrcRecordReader(file, conf, + new org.apache.hadoop.mapred.FileSplit(fileSplit)); + +this.columnIDs = columnIDs; +this.internalValue = this.reader.createValue(); +this.progress = reader.getProgress(); +this.row = new Row(this.internalValue.cols, columnIDs); + } + + @Override + public void close() throws IOException { +reader.close(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, + InterruptedException { +return NullWritable.get(); + } + + @Override + public InternalRow getCurrentValue() throws IOException, + InterruptedException { +if (indexOfRow >= numRowsOfBatch) { + return null; +} +row.rowId = indexOfRow; +indexOfRow++; + +return row; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return progress; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +if (indexOfRow == numRowsOfBatch && progress < 1.0f) { +
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user dafrista commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r74180849 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A RecordReader that returns InternalRow for Spark SQL execution. + * This reader uses an internal reader that returns Hive's VectorizedRowBatch. An adapter + * class is used to return internal row by directly accessing data in column vectors. + */ +public class VectorizedSparkOrcNewRecordReader +extends org.apache.hadoop.mapreduce.RecordReader{ + private final org.apache.hadoop.mapred.RecordReader reader; + private final int numColumns; + private VectorizedRowBatch internalValue; + private float progress = 0.0f; + private List columnIDs; + + private long numRowsOfBatch = 0; + private int indexOfRow = 0; + + private final Row row; + + public VectorizedSparkOrcNewRecordReader( + Reader file, + JobConf conf, + FileSplit fileSplit, + List columnIDs) throws IOException { +List types = file.getTypes(); +numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount(); +this.reader = new SparkVectorizedOrcRecordReader(file, conf, + new org.apache.hadoop.mapred.FileSplit(fileSplit)); + +this.columnIDs = columnIDs; +this.internalValue = this.reader.createValue(); +this.progress = reader.getProgress(); +this.row = new Row(this.internalValue.cols, columnIDs); + } + + @Override + public void close() throws IOException { +reader.close(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, + InterruptedException { +return NullWritable.get(); + } + + @Override + public InternalRow getCurrentValue() throws IOException, + InterruptedException { +if (indexOfRow >= numRowsOfBatch) { + return null; +} +row.rowId = indexOfRow; +indexOfRow++; + +return row; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return progress; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +if (indexOfRow == numRowsOfBatch && progress < 1.0f) { +
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user dafrista commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r68135229 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/VectorizedSparkOrcNewRecordReader.java --- @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A RecordReader that returns InternalRow for Spark SQL execution. + * This reader uses an internal reader that returns Hive's VectorizedRowBatch. An adapter + * class is used to return internal row by directly accessing data in column vectors. + */ +public class VectorizedSparkOrcNewRecordReader +extends org.apache.hadoop.mapreduce.RecordReader{ + private final org.apache.hadoop.mapred.RecordReader reader; + private final int numColumns; + private VectorizedRowBatch internalValue; + private float progress = 0.0f; + private List columnIDs; + + private long numRowsOfBatch = 0; + private int indexOfRow = 0; + + private final Row row; + + public VectorizedSparkOrcNewRecordReader( + Reader file, + JobConf conf, + FileSplit fileSplit, + List columnIDs) throws IOException { +List types = file.getTypes(); +numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount(); +this.reader = new SparkVectorizedOrcRecordReader(file, conf, + new org.apache.hadoop.mapred.FileSplit(fileSplit)); + +this.columnIDs = columnIDs; +this.internalValue = this.reader.createValue(); +this.progress = reader.getProgress(); +this.row = new Row(this.internalValue.cols, columnIDs); + } + + @Override + public void close() throws IOException { +reader.close(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, + InterruptedException { +return NullWritable.get(); + } + + @Override + public InternalRow getCurrentValue() throws IOException, + InterruptedException { +if (indexOfRow >= numRowsOfBatch) { + return null; +} +row.rowId = indexOfRow; +indexOfRow++; + +return row; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return progress; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +if (indexOfRow == numRowsOfBatch && progress < 1.0f) { +
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r68016908 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; + +/** + * A mapred.RecordReader that returns VectorizedRowBatch. + */ +public class SparkVectorizedOrcRecordReader + implements RecordReader{ +private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; +private final long offset; +private final long length; +private float progress = 0.0f; +private ObjectInspector objectInspector; + +SparkVectorizedOrcRecordReader(Reader file, Configuration conf, +FileSplit fileSplit) throws IOException { + this.offset = fileSplit.getStart(); + this.length = fileSplit.getLength(); + this.objectInspector = file.getObjectInspector(); + this.reader = OrcInputFormat.createReaderFromFile(file, conf, this.offset, +this.length); + this.progress = reader.getProgress(); +} + +/** + * Create a ColumnVector based on given ObjectInspector's type info. + * + * @param inspector ObjectInspector + */ +private ColumnVector createColumnVector(ObjectInspector inspector) { + switch(inspector.getCategory()) { + case PRIMITIVE: +{ + PrimitiveTypeInfo primitiveTypeInfo = +(PrimitiveTypeInfo) ((PrimitiveObjectInspector)inspector).getTypeInfo(); + switch(primitiveTypeInfo.getPrimitiveCategory()) { +case BOOLEAN: +case BYTE: +case SHORT: +case INT: +case LONG: +case DATE: +case INTERVAL_YEAR_MONTH: + return new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); +case FLOAT: +case DOUBLE: + return new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE); +case BINARY: +case STRING: +case CHAR: +case VARCHAR: + BytesColumnVector column = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + column.initBuffer(); + return column; +case DECIMAL: + DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo; + return new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE, + tInfo.precision(), tInfo.scale()); +default: +
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
Github user zjffdu commented on a diff in the pull request: https://github.com/apache/spark/pull/13775#discussion_r68016740 --- Diff: sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkVectorizedOrcRecordReader.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.RecordReader; + +/** + * A mapred.RecordReader that returns VectorizedRowBatch. + */ +public class SparkVectorizedOrcRecordReader + implements RecordReader{ +private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; +private final long offset; +private final long length; +private float progress = 0.0f; +private ObjectInspector objectInspector; + +SparkVectorizedOrcRecordReader(Reader file, Configuration conf, +FileSplit fileSplit) throws IOException { + this.offset = fileSplit.getStart(); + this.length = fileSplit.getLength(); + this.objectInspector = file.getObjectInspector(); + this.reader = OrcInputFormat.createReaderFromFile(file, conf, this.offset, +this.length); + this.progress = reader.getProgress(); +} + +/** + * Create a ColumnVector based on given ObjectInspector's type info. + * + * @param inspector ObjectInspector + */ +private ColumnVector createColumnVector(ObjectInspector inspector) { + switch(inspector.getCategory()) { + case PRIMITIVE: +{ + PrimitiveTypeInfo primitiveTypeInfo = +(PrimitiveTypeInfo) ((PrimitiveObjectInspector)inspector).getTypeInfo(); + switch(primitiveTypeInfo.getPrimitiveCategory()) { +case BOOLEAN: +case BYTE: +case SHORT: +case INT: +case LONG: +case DATE: +case INTERVAL_YEAR_MONTH: + return new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); +case FLOAT: +case DOUBLE: + return new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE); +case BINARY: +case STRING: +case CHAR: +case VARCHAR: + BytesColumnVector column = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + column.initBuffer(); + return column; +case DECIMAL: + DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo; + return new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE, + tInfo.precision(), tInfo.scale()); +default: +
[GitHub] spark pull request #13775: [SPARK-16060][SQL] Vectorized Orc reader
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/13775 [SPARK-16060][SQL] Vectorized Orc reader ## What changes were proposed in this pull request? Currently Orc reader in Spark SQL doesn't support vectorized reading. As Hive Orc already support vectorization, we can add this support to improve Orc reading performance. ### Benchmark Benchmark code: test("Benchmark for Orc") { val N = 500 << 12 withOrcTable((0 until N).map(i => (i, i.toString, i.toLong, i.toDouble)), "t") { val benchmark = new Benchmark("Orc reader", N) benchmark.addCase("reading Orc file", 10) { iter => sql("SELECT * FROM t").collect() } benchmark.run() } } Before this patch: Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz Orc reader: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative reading Orc file 4750 / 5266 0.4 2319.1 1.0X After this patch: Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic Intel(R) Core(TM) i7-5557U CPU @ 3.10GHz Orc reader: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative reading Orc file 3550 / 3824 0.6 1733.2 1.0X ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 vectorized-orc-reader3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/13775.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #13775 commit 2861ac2a5136c065ec38cfc24bf9f979d5b7ae07 Author: Liang-Chi HsiehDate: 2016-06-16T02:31:23Z Add vectorized Orc reader support. commit eee8eca70920d624becb43c8510d217ce4d9820b Author: Liang-Chi Hsieh Date: 2016-06-17T09:44:11Z import. commit b753d09e3e369fc91a17d9632123dbe40d7d9dfb Author: Liang-Chi Hsieh Date: 2016-06-18T10:00:00Z If column is repeating, always using row id 0. commit 7d26f5ed785269299b324df8bfc1c64c2d4a2b48 Author: Liang-Chi Hsieh Date: 2016-06-19T04:16:49Z Fix bugs of getBinary and numFields. commit 74fe936e522a827384461e445b9ba44f96ce29fe Author: Liang-Chi Hsieh Date: 2016-06-20T02:44:07Z Remove unnecessary change. commit 7e7bb6c57860187f391f66ca82cdd715d0b2be43 Author: Liang-Chi Hsieh Date: 2016-06-20T02:48:11Z Remove unnecessary change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org