Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19943#discussion_r160251456
  
    --- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
 ---
    @@ -0,0 +1,509 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources.orc;
    +
    +import java.io.IOException;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.RecordReader;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    +import org.apache.orc.OrcConf;
    +import org.apache.orc.OrcFile;
    +import org.apache.orc.Reader;
    +import org.apache.orc.TypeDescription;
    +import org.apache.orc.mapred.OrcInputFormat;
    +import org.apache.orc.storage.common.type.HiveDecimal;
    +import org.apache.orc.storage.ql.exec.vector.*;
    +import org.apache.orc.storage.serde2.io.HiveDecimalWritable;
    +
    +import org.apache.spark.memory.MemoryMode;
    +import org.apache.spark.sql.catalyst.InternalRow;
    +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
    +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
    +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
    +import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
    +import org.apache.spark.sql.types.*;
    +import org.apache.spark.sql.vectorized.ColumnarBatch;
    +
    +
    +/**
    + * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
    + * After creating, `initialize` and `initBatch` should be called 
sequentially.
    + */
    +public class OrcColumnarBatchReader extends RecordReader<Void, 
ColumnarBatch> {
    +
    +  // ORC File Reader
    +  private Reader reader;
    +
    +  // Vectorized ORC Row Batch
    +  private VectorizedRowBatch batch;
    +
    +  /**
    +   * The column IDs of the physical ORC file schema which are required by 
this reader.
    +   * -1 means this required column doesn't exist in the ORC file.
    +   */
    +  private int[] requestedColIds;
    +
    +  // Record reader from ORC row batch.
    +  private org.apache.orc.RecordReader recordReader;
    +
    +  private StructField[] requiredFields;
    +
    +  // The result columnar batch for vectorized execution by whole-stage 
codegen.
    +  private ColumnarBatch columnarBatch;
    +
    +  // Writable column vectors of the result columnar batch.
    +  private WritableColumnVector[] columnVectors;
    +
    +  // The number of rows read and considered to be returned.
    +  private long rowsReturned = 0L;
    +
    +  private long totalRowCount = 0L;
    +
    +  /**
    +   * The memory mode of the columnarBatch
    +   */
    +  private final MemoryMode MEMORY_MODE;
    +
    +  public OrcColumnarBatchReader(boolean useOffHeap) {
    +    MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
    +  }
    +
    +
    +  @Override
    +  public Void getCurrentKey() throws IOException, InterruptedException {
    +    return null;
    +  }
    +
    +  @Override
    +  public ColumnarBatch getCurrentValue() throws IOException, 
InterruptedException {
    +    return columnarBatch;
    +  }
    +
    +  @Override
    +  public float getProgress() throws IOException, InterruptedException {
    +    return (float) rowsReturned / totalRowCount;
    +  }
    +
    +  @Override
    +  public boolean nextKeyValue() throws IOException, InterruptedException {
    +    return nextBatch();
    +  }
    +
    +  @Override
    +  public void close() throws IOException {
    +    if (columnarBatch != null) {
    +      columnarBatch.close();
    +      columnarBatch = null;
    +    }
    +    if (recordReader != null) {
    +      recordReader.close();
    +      recordReader = null;
    +    }
    +  }
    +
    +  /**
    +   * Initialize ORC file reader and batch record reader.
    +   * Please note that `initBatch` is needed to be called after this.
    +   */
    +  @Override
    +  public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext)
    +      throws IOException, InterruptedException {
    +    FileSplit fileSplit = (FileSplit)inputSplit;
    +    Configuration conf = taskAttemptContext.getConfiguration();
    +    reader = OrcFile.createReader(
    +      fileSplit.getPath(),
    +      OrcFile.readerOptions(conf)
    +        .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
    +        .filesystem(fileSplit.getPath().getFileSystem(conf)));
    +
    +    Reader.Options options =
    +      OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart(), 
fileSplit.getLength());
    +    recordReader = reader.rows(options);
    +    totalRowCount = reader.getNumberOfRows();
    +  }
    +
    +  /**
    +   * Initialize columnar batch by setting required schema and partition 
information.
    +   * With this information, this creates ColumnarBatch with the full 
schema.
    +   */
    +  public void initBatch(
    +      TypeDescription orcSchema,
    +      int[] requestedColIds,
    +      StructField[] requiredFields,
    +      StructType partitionSchema,
    +      InternalRow partitionValues) {
    +    batch = orcSchema.createRowBatch(DEFAULT_SIZE);
    +    assert(!batch.selectedInUse); // `selectedInUse` should be initialized 
with `false`.
    +
    +    this.requiredFields = requiredFields;
    +    this.requestedColIds = requestedColIds;
    +    assert(requiredFields.length == requestedColIds.length);
    +
    +    StructType resultSchema = new StructType(requiredFields);
    +    for (StructField f : partitionSchema.fields()) {
    +      resultSchema = resultSchema.add(f);
    +    }
    +
    +    int capacity = DEFAULT_SIZE;
    +    if (MEMORY_MODE == MemoryMode.OFF_HEAP) {
    +      columnVectors = OffHeapColumnVector.allocateColumns(capacity, 
resultSchema);
    +    } else {
    +      columnVectors = OnHeapColumnVector.allocateColumns(capacity, 
resultSchema);
    +    }
    +    columnarBatch = new ColumnarBatch(resultSchema, columnVectors, 
capacity);
    +
    +    if (partitionValues.numFields() > 0) {
    +      int partitionIdx = requiredFields.length;
    +      for (int i = 0; i < partitionValues.numFields(); i++) {
    +        ColumnVectorUtils.populate(columnVectors[i + partitionIdx], 
partitionValues, i);
    +        columnVectors[i + partitionIdx].setIsConstant();
    +      }
    +    }
    +
    +    // Initialize the missing columns once.
    +    for (int i = 0; i < requiredFields.length; i++) {
    +      if (requestedColIds[i] == -1) {
    +        columnVectors[i].putNulls(0, columnarBatch.capacity());
    +        columnVectors[i].setIsConstant();
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Return true if there exists more data in the next batch. If exists, 
prepare the next batch
    +   * by copying from ORC VectorizedRowBatch columns to Spark ColumnarBatch 
columns.
    +   */
    +  private boolean nextBatch() throws IOException {
    +    if (rowsReturned >= totalRowCount) {
    +      return false;
    +    }
    +
    +    recordReader.nextBatch(batch);
    +    int batchSize = batch.size;
    +    if (batchSize == 0) {
    --- End diff --
    
    Maybe, I explained in a vague manner. Sorry for that. Let me put in this 
way.
    
    ORC reader is the one who making batches. ORC reads the raw data and makes 
batches with filtered rows only. So, you can ignore the original rows. From 
now, please assume that you have only qualified rows. ORC will privide users a 
batch with fully packed in most cases. The last batch will be partially packed 
(batchSize < the max batch size) or empty.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to