[GitHub] [incubator-pinot] jenniferdai commented on a change in pull request #3994: Adding ORC reader
jenniferdai commented on a change in pull request #3994: Adding ORC reader URL: https://github.com/apache/incubator-pinot/pull/3994#discussion_r267959911 ## File path: pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java ## @@ -0,0 +1,207 @@ +package org.apache.pinot.orc.data.readers; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcList; +import org.apache.orc.mapred.OrcMapredRecordReader; +import org.apache.pinot.common.data.Schema; +import org.apache.pinot.core.data.GenericRow; +import org.apache.pinot.core.data.readers.RecordReader; +import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The ORCRecordReader uses a VectorizedRowBatch, which we convert to a Writable. Then, we convert these + * Writable objects to primitives that we can then store in a GenericRow. + * + * When new data types are added to Pinot, we will need to update them here as well. + * Note that not all ORC types are supported; we only support the ORC types that correspond to either + * primitives or multivalue columns in Pinot, which is similar to other record readers. + */ +public class ORCRecordReader implements RecordReader { + + private Schema _pinotSchema; + private TypeDescription _orcSchema; + Reader _reader; + org.apache.orc.RecordReader _recordReader; + VectorizedRowBatch _reusableVectorizedRowBatch; + + public static final String LOCAL_FS_PREFIX = "file://"; + + private static final Logger LOGGER = LoggerFactory.getLogger(ORCRecordReader.class); + + private void init(String inputPath, Schema schema) { +Configuration conf = new Configuration(); +LOGGER.info("Creating segment for {}", inputPath); +try { + Path orcReaderPath = new Path(LOCAL_FS_PREFIX + inputPath); + LOGGER.info("orc reader path is {}", orcReaderPath); + _reader = OrcFile.createReader(orcReaderPath, OrcFile.readerOptions(conf)); + _orcSchema = _reader.getSchema(); + LOGGER.info("ORC schema is {}", _orcSchema.toJson()); + + _pinotSchema = schema; + if (_pinotSchema == null) { +LOGGER.warn("Pinot schema is not set in segment generator config"); + } + _recordReader = _reader.rows(_reader.options().schema(_orcSchema)); +} catch (Exception e) { + LOGGER.error("Caught exception initializing record reader at path {}", inputPath); + throw new RuntimeException(e); +} + +// Create a row batch with max size 1 +_reusableVectorizedRowBatch = _orcSchema.createRowBatch(1); + } + + @Override + public void init(SegmentGeneratorConfig segmentGeneratorConfig) { +init(segmentGeneratorConfig.getInputFilePath(), segmentGeneratorConfig.getSchema()); + } + + @Override + public boolean hasNext() { +try { + return _recordReader.getProgress() != 1; +} catch (IOException e) { + LOGGER.error("Could not get next record"); + throw new RuntimeException(e); +} + } + + @Override + public GenericRow next() + throws IOException { +return next(new GenericRow()); + } + + @Override + public GenericRow next(GenericRow reuse) + throws IOException { +_recordReader.nextBatch(_reusableVectorizedRowBatch); +fillGenericRow(reuse, _reusableVectorizedRowBatch); +return reuse; + } + + privat
[GitHub] [incubator-pinot] jenniferdai commented on a change in pull request #3994: Adding ORC reader
jenniferdai commented on a change in pull request #3994: Adding ORC reader URL: https://github.com/apache/incubator-pinot/pull/3994#discussion_r267569198 ## File path: pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java ## @@ -0,0 +1,187 @@ +package org.apache.pinot.orc.data.readers; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcList; +import org.apache.orc.mapred.OrcMapredRecordReader; +import org.apache.pinot.common.data.Schema; +import org.apache.pinot.core.data.GenericRow; +import org.apache.pinot.core.data.readers.RecordReader; +import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ORCRecordReader implements RecordReader { + + private Schema _pinotSchema; + private TypeDescription _orcSchema; + Reader _reader; + org.apache.orc.RecordReader _recordReader; + VectorizedRowBatch _reusableVectorizedRowBatch; + + private static final Logger LOGGER = LoggerFactory.getLogger(ORCRecordReader.class); + + @Override + public void init(SegmentGeneratorConfig segmentGeneratorConfig) { +Configuration conf = new Configuration(); +LOGGER.info("Creating segment for {}", segmentGeneratorConfig.getInputFilePath()); +try { + Path orcReaderPath = new Path("file://" + segmentGeneratorConfig.getInputFilePath()); + LOGGER.info("orc reader path is {}", orcReaderPath); + _reader = OrcFile.createReader(orcReaderPath, OrcFile.readerOptions(conf)); + _orcSchema = _reader.getSchema(); + LOGGER.info("ORC schema is {}", _orcSchema.toJson()); + + _pinotSchema = segmentGeneratorConfig.getSchema(); + if (_pinotSchema == null) { +throw new IllegalArgumentException("ORCRecordReader requires schema"); + } + _recordReader = _reader.rows(_reader.options().schema(_orcSchema)); +} catch (Exception e) { + LOGGER.error("Caught exception initializing record reader at path {}", segmentGeneratorConfig.getInputFilePath()); + throw new RuntimeException(e); +} + +_reusableVectorizedRowBatch = _orcSchema.createRowBatch(1); + } + + @Override + public boolean hasNext() { +try { + return _recordReader.getProgress() != 1; +} catch (IOException e) { + LOGGER.error("Could not get next record"); + throw new RuntimeException(e); +} + } + + @Override + public GenericRow next() + throws IOException { +return next(new GenericRow()); + } + + @Override + public GenericRow next(GenericRow reuse) + throws IOException { +_recordReader.nextBatch(_reusableVectorizedRowBatch); +fillGenericRow(reuse, _reusableVectorizedRowBatch); +return reuse; + } + + private void fillGenericRow(GenericRow genericRow, VectorizedRowBatch rowBatch) throws IOException { +// Read the row data +TypeDescription schema = _reader.getSchema(); +// Create a row batch with max size 1 + +if (schema.getCategory().equals(TypeDescription.Category.STRUCT)) { + for (int i = 0; i < schema.getChildren().size(); i++) { +// Get current column in schema +TypeDescription currColumn = schema.getChildren().get(i); +String currColumnName = currColumn.getFieldNames().get(0); +int currColRowIndex = currColumn.getId(); +ColumnVector vector =