Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/19943#discussion_r160071785
--- Diff:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java
---
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcInputFormat;
+import org.apache.orc.storage.common.type.HiveDecimal;
+import org.apache.orc.storage.ql.exec.vector.*;
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
+import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns
ColumnarBatch.
+ * After creating, `initialize` and `setRequiredSchema` should be called
sequentially.
+ */
+public class JavaOrcColumnarBatchReader extends RecordReader<Void,
ColumnarBatch> {
+
+ /**
+ * ORC File Reader.
+ */
+ private Reader reader;
+
+ /**
+ * Vectorized Row Batch.
+ */
+ private VectorizedRowBatch batch;
+
+ /**
+ * Requested Column IDs.
+ */
+ private int[] requestedColIds;
+
+ /**
+ * Record reader from row batch.
+ */
+ private org.apache.orc.RecordReader recordReader;
+
+ /**
+ * Required Schema.
+ */
+ private StructType requiredSchema;
+
+ /**
+ * ColumnarBatch for vectorized execution by whole-stage codegen.
+ */
+ private ColumnarBatch columnarBatch;
+
+ /**
+ * Writable column vectors of ColumnarBatch.
+ */
+ private WritableColumnVector[] columnVectors;
+
+ /**
+ * The number of rows read and considered to be returned.
+ */
+ private long rowsReturned = 0L;
+
+ /**
+ * Total number of rows.
+ */
+ private long totalRowCount = 0L;
+
+ @Override
+ public Void getCurrentKey() throws IOException, InterruptedException {
+ return null;
+ }
+
+ @Override
+ public ColumnarBatch getCurrentValue() throws IOException,
InterruptedException {
+ return columnarBatch;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return (float) rowsReturned / totalRowCount;
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ return nextBatch();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (columnarBatch != null) {
+ columnarBatch.close();
+ columnarBatch = null;
+ }
+ if (recordReader != null) {
+ recordReader.close();
+ recordReader = null;
+ }
+ }
+
+ /**
+ * Initialize ORC file reader and batch record reader.
+ * Please note that `setRequiredSchema` is needed to be called after
this.
+ */
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext
taskAttemptContext)
+ throws IOException, InterruptedException {
+ FileSplit fileSplit = (FileSplit)inputSplit;
+ Configuration conf = taskAttemptContext.getConfiguration();
+ reader = OrcFile.createReader(
+ fileSplit.getPath(),
+ OrcFile.readerOptions(conf)
+ .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+ .filesystem(fileSplit.getPath().getFileSystem(conf)));
+
+ Reader.Options options =
+ OrcInputFormat.buildOptions(conf, reader,
fileSplit.getStart(), fileSplit.getLength());
+ recordReader = reader.rows(options);
+ totalRowCount = reader.getNumberOfRows();
+ }
+
+ /**
+ * Set required schema and partition information.
+ * With this information, this creates ColumnarBatch with the full
schema.
+ */
+ public void setRequiredSchema(
+ TypeDescription orcSchema,
+ int[] requestedColIds,
+ StructType requiredSchema,
+ StructType partitionSchema,
+ InternalRow partitionValues) {
+ batch = orcSchema.createRowBatch(DEFAULT_SIZE);
+ assert(!batch.selectedInUse); // `selectedInUse` should be initialized
with `false`.
+
+ StructType resultSchema = new StructType(requiredSchema.fields());
+ for (StructField f : partitionSchema.fields())
+ resultSchema = resultSchema.add(f);
+ this.requiredSchema = requiredSchema;
+ this.requestedColIds = requestedColIds;
--- End diff --
add an assert that `requiredSchema.length == requesredColIds.length`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]