>From Peeyush Gupta <peeyush.gu...@couchbase.com>: Peeyush Gupta has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20135 )
Change subject: WIP: Input splits for delta table ...................................................................... WIP: Input splits for delta table Change-Id: Ie6daf3846064326bfe749ad15b508fe27d1721ca --- M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaParquetHandler.java A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaParquetFileReader.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java 5 files changed, 375 insertions(+), 2 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/35/20135/1 diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java new file mode 100644 index 0000000..1410405 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import org.apache.hadoop.conf.Configuration; + +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.defaults.engine.fileio.FileIO; +import io.delta.kernel.defaults.engine.hadoopio.HadoopFileIO; +import io.delta.kernel.engine.ParquetHandler; + +public class DeltaEngine extends DefaultEngine { + + private final FileIO fileIO; + private final Configuration conf; + + protected DeltaEngine(FileIO fileIO, Configuration conf) { + super(fileIO); + this.fileIO = fileIO; + this.conf = conf; + } + + public static DeltaEngine create(Configuration configuration) { + return new DeltaEngine(new HadoopFileIO(configuration), configuration); + } + + public ParquetHandler getParquetHandler() { + return new DeltaParquetHandler(this.fileIO, this.conf); + } + +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java index 121a76b..3fffc86 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java @@ -19,6 +19,7 @@ package org.apache.asterix.external.input.record.reader.aws.delta; import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator; +import static io.delta.kernel.internal.util.Utils.toCloseableIterator; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import java.io.IOException; @@ -33,6 +34,7 @@ import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.asterix.external.input.record.GenericRecord; import org.apache.asterix.external.util.IFeedLogManager; +import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.primitive.VoidPointable; @@ -104,6 +106,40 @@ } } + public DeltaFileRecordReader(List<InputSplit> splits, List<String> serScanFiles, String serScanState, + ConfFactory config, String filterExpressionStr) throws HyracksDataException { + JobConf conf = config.getConf(); + this.engine = DeltaEngine.create(conf); + this.scanFiles = new ArrayList<>(); + for (String scanFile : serScanFiles) { + this.scanFiles.add(RowSerDe.deserializeRowFromJson(scanFile)); + } + this.scanState = RowSerDe.deserializeRowFromJson(serScanState); + this.fileStatus = null; + this.physicalReadSchema = null; + this.physicalDataIter = null; + this.dataIter = null; + this.record = new GenericRecord<>(); + if (scanFiles.size() > 0) { + this.fileIndex = 0; + this.scanFile = scanFiles.get(0); + this.fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile); + this.physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState); + this.filterPredicate = PredicateSerDe.deserializeExpressionFromJson(filterExpressionStr); + try { + this.physicalDataIter = ((DeltaParquetHandler) engine.getParquetHandler()).readParquetSplits( + singletonCloseableIterator(fileStatus), toCloseableIterator(splits.iterator()), + physicalReadSchema, filterPredicate); + this.dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter); + if (dataIter.hasNext()) { + rows = dataIter.next().getRows(); + } + } catch (IOException e) { + throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e)); + } + } + } + @Override public void close() throws IOException { if (dataIter != null) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaParquetFileReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaParquetFileReader.java new file mode 100644 index 0000000..cccaf71 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaParquetFileReader.java @@ -0,0 +1,170 @@ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import static io.delta.kernel.defaults.internal.parquet.ParquetFilterUtils.toParquetFilter; +import static java.util.Objects.requireNonNull; +import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.setFilterPredicate; + +import java.io.IOException; +import java.util.NoSuchElementException; +import java.util.Optional; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.Reporter; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetRecordReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; + +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.defaults.engine.fileio.FileIO; +import io.delta.kernel.defaults.internal.parquet.ParquetFileReader; +import io.delta.kernel.exceptions.KernelEngineException; +import io.delta.kernel.expressions.Predicate; +import io.delta.kernel.internal.util.Utils; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; + +public class DeltaParquetFileReader extends ParquetFileReader { + + private final FileIO fileIO; + private final int maxBatchSize; + private final Configuration conf; + + public DeltaParquetFileReader(FileIO fileIO, Configuration conf) { + super(fileIO); + this.fileIO = requireNonNull(fileIO, "fileIO is null"); + this.conf = requireNonNull(conf, "conf is null"); + this.maxBatchSize = + fileIO.getConf("delta.kernel.default.parquet.reader.batch-size").map(Integer::valueOf).orElse(1024); + } + + public CloseableIterator<ColumnarBatch> read(FileStatus status, InputSplit split, StructType schema, + Optional<Predicate> predicate) { + + final boolean hasRowIndexCol = schema.indexOf(StructField.METADATA_ROW_INDEX_COLUMN_NAME) >= 0 + && schema.get(StructField.METADATA_ROW_INDEX_COLUMN_NAME).isMetadataColumn(); + + return new CloseableIterator<ColumnarBatch>() { + private final BatchReadSupport readSupport = new BatchReadSupport(maxBatchSize, schema); + private ParquetRecordReader<Object> reader; + private boolean hasNotConsumedNextElement; + + @Override + public void close() throws IOException { + Utils.closeCloseables(reader); + } + + @Override + public boolean hasNext() { + initParquetReaderIfRequired(); + try { + if (hasNotConsumedNextElement) { + return true; + } + + hasNotConsumedNextElement = reader.nextKeyValue() && reader.getCurrentValue() != null; + return hasNotConsumedNextElement; + } catch (IOException | InterruptedException ex) { + throw new KernelEngineException("Error reading Parquet file: " + status.getPath(), ex); + } + } + + @Override + public ColumnarBatch next() { + if (!hasNotConsumedNextElement) { + throw new NoSuchElementException(); + } + int batchSize = 0; + do { + hasNotConsumedNextElement = false; + // hasNext reads to row to confirm there is a next element. + // get the row index only if required by the read schema + long rowIndex = 0; + try { + rowIndex = hasRowIndexCol ? reader.getCurrentRowIndex() : -1; + } catch (IOException e) { + throw new RuntimeException(e); + } + readSupport.finalizeCurrentRow(rowIndex); + batchSize++; + } while (batchSize < maxBatchSize && hasNext()); + + return readSupport.getDataAsColumnarBatch(batchSize); + } + + private void initParquetReaderIfRequired() { + if (reader == null) { + org.apache.parquet.hadoop.ParquetFileReader fileReader = null; + try { + Configuration confCopy = conf; + Path filePath = new Path(status.getPath()); + + // We need physical schema in order to construct a filter that can be + // pushed into the `parquet-mr` reader. For that reason read the footer + // in advance. + ParquetMetadata footer = + org.apache.parquet.hadoop.ParquetFileReader.readFooter(confCopy, filePath); + + MessageType parquetSchema = footer.getFileMetaData().getSchema(); + Optional<FilterPredicate> parquetPredicate = + predicate.flatMap(predicate -> toParquetFilter(parquetSchema, predicate)); + + if (parquetPredicate.isPresent()) { + // clone the configuration to avoid modifying the original one + confCopy = new Configuration(confCopy); + + setFilterPredicate(confCopy, parquetPredicate.get()); + // Disable the record level filtering as the `parquet-mr` evaluates + // the filter once the entire record has been materialized. Instead, + // we use the predicate to prune the row groups which is more efficient. + // In the future, we can consider using the record level filtering if a + // native Parquet reader is implemented in Kernel default module. + confCopy.set(RECORD_FILTERING_ENABLED, "false"); + confCopy.set(DICTIONARY_FILTERING_ENABLED, "false"); + confCopy.set(COLUMN_INDEX_FILTERING_ENABLED, "false"); + } + + // Pass the already read footer to the reader to avoid reading it again. + fileReader = new ParquetFileReaderWithFooter(filePath, confCopy, footer); + reader = new ParquetRecordReader<>(readSupport, ParquetInputFormat.getFilter(confCopy)); + reader.initialize((FileSplit) split, confCopy, Reporter.NULL); + } catch (IOException e) { + Utils.closeCloseablesSilently(fileReader, reader); + throw new KernelEngineException("Error reading Parquet file: " + status.getPath(), e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + }; + } + + /** + * Wrapper around {@link org.apache.parquet.hadoop.ParquetFileReader} to allow using the + * provided footer instead of reading it again. We read the footer in advance to construct a + * predicate for filtering rows. + */ + private static class ParquetFileReaderWithFooter extends org.apache.parquet.hadoop.ParquetFileReader { + private final ParquetMetadata footer; + + ParquetFileReaderWithFooter(Path filePath, Configuration configuration, ParquetMetadata footer) + throws IOException { + super(configuration, filePath, footer); + this.footer = requireNonNull(footer, "footer is null"); + } + + @Override + public ParquetMetadata getFooter() { + return footer; // return the footer passed in the constructor + } + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaParquetHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaParquetHandler.java new file mode 100644 index 0000000..1e5a3bd --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaParquetHandler.java @@ -0,0 +1,66 @@ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import java.io.IOException; +import java.util.Optional; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.InputSplit; + +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.defaults.engine.DefaultParquetHandler; +import io.delta.kernel.defaults.engine.fileio.FileIO; +import io.delta.kernel.expressions.Predicate; +import io.delta.kernel.internal.util.Utils; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; + +public class DeltaParquetHandler extends DefaultParquetHandler { + + private final FileIO fileIO; + private final Configuration conf; + + public DeltaParquetHandler(FileIO fileIO, Configuration conf) { + super(fileIO); + this.fileIO = fileIO; + this.conf = conf; + } + + public CloseableIterator<ColumnarBatch> readParquetSplits(final CloseableIterator<FileStatus> var1, + final CloseableIterator<InputSplit> splits, final StructType var2, final Optional<Predicate> var3) + throws IOException { + return new CloseableIterator<ColumnarBatch>() { + private final DeltaParquetFileReader batchReader; + private CloseableIterator<ColumnarBatch> currentFileReader; + + { + this.batchReader = + new DeltaParquetFileReader(DeltaParquetHandler.this.fileIO, DeltaParquetHandler.this.conf); + } + + public void close() throws IOException { + Utils.closeCloseables(new AutoCloseable[] { this.currentFileReader, var1 }); + } + + public boolean hasNext() { + if (this.currentFileReader != null && this.currentFileReader.hasNext()) { + return true; + } else { + Utils.closeCloseables(new AutoCloseable[] { this.currentFileReader }); + this.currentFileReader = null; + if (var1.hasNext()) { + this.currentFileReader = + this.batchReader.read((FileStatus) var1.next(), splits.next(), var2, var3); + return this.hasNext(); + } else { + return false; + } + } + } + + public ColumnarBatch next() { + return (ColumnarBatch) this.currentFileReader.next(); + } + }; + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java index a3693e8..081e39c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java @@ -45,16 +45,23 @@ import org.apache.asterix.external.util.HDFSUtils; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.runtime.projection.FunctionCallInformation; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.IWarningCollector; import org.apache.hyracks.api.exceptions.Warning; import org.apache.hyracks.hdfs.dataflow.ConfFactory; +import org.apache.hyracks.hdfs.scheduler.Scheduler; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; import io.delta.kernel.Scan; import io.delta.kernel.Snapshot; @@ -80,6 +87,9 @@ protected final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>(); protected ConfFactory confFactory; private String filterExpressionStr; + protected static Scheduler hdfsScheduler; + protected String[] readSchedule; + private List<InputSplit> configInputSplits; public List<PartitionWorkLoadBasedOnSize> getPartitionWorkLoadsBasedOnSize() { return partitionWorkLoadsBasedOnSize; @@ -99,6 +109,7 @@ public void configure(IServiceContext serviceCtx, Map<String, String> configuration, IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException, HyracksDataException { + hdfsScheduler = HDFSUtils.initializeHDFSScheduler((ICCServiceContext) serviceCtx); JobConf conf = new JobConf(); ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); locationConstraints = getPartitions(appCtx); @@ -157,12 +168,40 @@ scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine)); scanFiles = getScanFiles(scan, engine); } + int numPartitions = getPartitionConstraint().getLocations().length; + try { + if (scanFiles.size() < numPartitions) { + configInputSplits = getInputSplits(scanFiles, conf, numPartitions); + readSchedule = hdfsScheduler.getLocationConstraints(configInputSplits.toArray(new InputSplit[0])); + } + } catch (IOException e) { + throw new RuntimeException(e); + } LOGGER.info("Number of delta table parquet data files to scan: {}", scanFiles.size()); configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA); distributeFiles(scanFiles, getPartitionConstraint().getLocations().length); issueWarnings(warnings, warningCollector); } + private List<InputSplit> getInputSplits(List<Row> scanFiles, JobConf conf, int numPartitions) throws IOException { + List<InputSplit> inputSplits = new ArrayList<>(); + for (Row file : scanFiles) { + FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(file); + Path parquetPath = new Path(fileStatus.getPath()); + try (ParquetFileReader reader = ParquetFileReader.open(conf, parquetPath)) { + List<BlockMetaData> blocks = reader.getFooter().getBlocks(); + for (BlockMetaData block : blocks) { + long start = block.getStartingPos(); + long length = block.getCompressedSize(); + for (int i = 0; i < numPartitions; i++) { + inputSplits.add(new FileSplit(parquetPath, start, length, conf)); + } + } + } + } + return inputSplits; + } + private List<Row> getScanFiles(Scan scan, Engine engine) { List<Row> scanFiles = new ArrayList<>(); CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine); @@ -213,8 +252,14 @@ public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException { try { int partition = context.getPartition(); - return new DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(), scanState, - confFactory, filterExpressionStr); + if (configInputSplits.isEmpty()) { + return new DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(), scanState, + confFactory, filterExpressionStr); + } else { + return new DeltaFileRecordReader(configInputSplits, + partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(), scanState, confFactory, + filterExpressionStr); + } } catch (Exception e) { throw HyracksDataException.create(e); } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20135 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: ionic Gerrit-Change-Id: Ie6daf3846064326bfe749ad15b508fe27d1721ca Gerrit-Change-Number: 20135 Gerrit-PatchSet: 1 Gerrit-Owner: Peeyush Gupta <peeyush.gu...@couchbase.com> Gerrit-MessageType: newchange