Github user chrysan commented on a diff in the pull request:
https://github.com/apache/spark/pull/19001#discussion_r178433504
--- Diff:
sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/BucketizedSparkInputFormat.java
---
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR;
+
+/**
+ * A {@link InputFormat} implementation for reading bucketed tables.
+ *
+ * We cannot directly use {@link BucketizedHiveInputFormat} from Hive as
it depends on the
+ * map-reduce plan to get required information for split generation.
+ */
+public class BucketizedSparkInputFormat<K extends WritableComparable, V
extends Writable>
+ extends BucketizedHiveInputFormat<K, V> {
+
+ private static final String FILE_INPUT_FORMAT = "file.inputformat";
+
+ @Override
+ public RecordReader getRecordReader(
+ InputSplit split,
+ JobConf job,
+ Reporter reporter) throws IOException {
+
+ BucketizedHiveInputSplit hsplit = (BucketizedHiveInputSplit) split;
+ String inputFormatClassName = null;
+ Class inputFormatClass = null;
+
+ try {
+ inputFormatClassName = hsplit.inputFormatClassName();
+ inputFormatClass = job.getClassByName(inputFormatClassName);
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Cannot find class " + inputFormatClassName,
e);
+ }
+
+ InputFormat inputFormat = getInputFormatFromCache(inputFormatClass,
job);
+ return new BucketizedSparkRecordReader<>(inputFormat, hsplit, job,
reporter);
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numBuckets) throws
IOException {
+ final String inputFormatClassName = job.get(FILE_INPUT_FORMAT);
+ final String[] inputDirs =
job.get(INPUT_DIR).split(StringUtils.COMMA_STR);
+
+ if (inputDirs.length != 1) {
+ throw new IOException(this.getClass().getCanonicalName() +
+ " expects only one input directory. " + inputDirs.length +
+ " directories detected : " + Arrays.toString(inputDirs));
+ }
+
+ final String inputDir = inputDirs[0];
+ final Path inputPath = new Path(inputDir);
+ final JobConf newJob = new JobConf(job);
+ final FileStatus[] listStatus = this.listStatus(newJob, inputPath);
+ final InputSplit[] result = new InputSplit[numBuckets];
+
+ if (listStatus.length != 0 && listStatus.length != numBuckets) {
+ throw new IOException("Bucketed path was expected to have " +
numBuckets + " files but " +
+ listStatus.length + " files are present. Path = " + inputPath);
+ }
+
+ try {
+ final Class<?> inputFormatClass =
Class.forName(inputFormatClassName);
+ final InputFormat inputFormat =
getInputFormatFromCache(inputFormatClass, job);
+ newJob.setInputFormat(inputFormat.getClass());
+
+ for (int i = 0; i < numBuckets; i++) {
+ final FileStatus fileStatus = listStatus[i];
--- End diff --
This logic depends on the files are listed in a right order, otherwise the
RDD partitions to be joined cannot be zipped correctly. Logic should be fixed
here to reorder the files listed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]