http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index a53587c..564b3bd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -20,196 +20,106 @@ package org.apache.drill.exec.store.parquet; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.common.expression.ErrorCollector; -import org.apache.drill.common.expression.ErrorCollectorImpl; -import org.apache.drill.common.expression.ExpressionStringBuilder; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.ValueExpressions; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; -import org.apache.drill.common.types.TypeProtos; -import org.apache.drill.common.types.TypeProtos.MajorType; -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.common.types.Types; -import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier; -import org.apache.drill.exec.expr.ExpressionTreeMaterializer; -import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; -import org.apache.drill.exec.expr.stat.ParquetFilterPredicate; -import org.apache.drill.exec.ops.UdfUtilities; -import org.apache.drill.exec.physical.EndpointAffinity; -import org.apache.drill.exec.physical.PhysicalOperatorSetupException; -import org.apache.drill.exec.physical.base.AbstractFileGroupScan; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.physical.base.ScanStats; -import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; -import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.ColumnExplorer; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.parquet.metadata.Metadata; import org.apache.drill.exec.util.DrillFileSystemUtil; import org.apache.drill.exec.store.dfs.MetadataContext; import org.apache.drill.exec.store.dfs.MetadataContext.PruneStatus; -import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS; import org.apache.drill.exec.store.dfs.ReadEntryWithPath; -import org.apache.drill.exec.store.dfs.easy.FileWork; -import org.apache.drill.exec.store.parquet.Metadata.ColumnMetadata; -import org.apache.drill.exec.store.parquet.Metadata.ParquetFileMetadata; -import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadataBase; -import org.apache.drill.exec.store.parquet.Metadata.RowGroupMetadata; -import org.apache.drill.exec.store.parquet.stat.ColumnStatistics; -import org.apache.drill.exec.store.parquet.stat.ParquetMetaStatCollector; -import org.apache.drill.exec.store.schedule.AffinityCreator; -import org.apache.drill.exec.store.schedule.AssignmentCreator; -import org.apache.drill.exec.store.schedule.CompleteWork; -import org.apache.drill.exec.store.schedule.EndpointByteMap; -import org.apache.drill.exec.store.schedule.EndpointByteMapImpl; -import org.apache.drill.exec.util.DecimalUtility; +import org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata; +import org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase; import org.apache.drill.exec.util.ImpersonationUtil; -import org.apache.drill.exec.vector.NullableBitVector; -import org.apache.drill.exec.vector.NullableBigIntVector; -import org.apache.drill.exec.vector.NullableDateVector; -import org.apache.drill.exec.vector.NullableDecimal18Vector; -import org.apache.drill.exec.vector.NullableFloat4Vector; -import org.apache.drill.exec.vector.NullableFloat8Vector; -import org.apache.drill.exec.vector.NullableIntVector; -import org.apache.drill.exec.vector.NullableIntervalVector; -import org.apache.drill.exec.vector.NullableSmallIntVector; -import org.apache.drill.exec.vector.NullableTimeStampVector; -import org.apache.drill.exec.vector.NullableTimeVector; -import org.apache.drill.exec.vector.NullableTinyIntVector; -import org.apache.drill.exec.vector.NullableUInt1Vector; -import org.apache.drill.exec.vector.NullableUInt2Vector; -import org.apache.drill.exec.vector.NullableUInt4Vector; -import org.apache.drill.exec.vector.NullableVarBinaryVector; -import org.apache.drill.exec.vector.NullableVarCharVector; -import org.apache.drill.exec.vector.ValueVector; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.joda.time.DateTimeConstants; -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.schema.OriginalType; -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ListMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; @JsonTypeName("parquet-scan") -public class ParquetGroupScan extends AbstractFileGroupScan { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class); +public class ParquetGroupScan extends AbstractParquetGroupScan { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class); - private final List<ReadEntryWithPath> entries; private final ParquetFormatPlugin formatPlugin; private final ParquetFormatConfig formatConfig; private final DrillFileSystem fs; private final MetadataContext metaContext; + private boolean usedMetadataCache; // false by default + // may change when filter push down / partition pruning is applied private String selectionRoot; - - private boolean usedMetadataCache = false; - private List<EndpointAffinity> endpointAffinities; - private List<SchemaPath> columns; - private ListMultimap<Integer, RowGroupInfo> mappings; - private List<RowGroupInfo> rowGroupInfos; - private LogicalExpression filter; - - /** - * The parquet table metadata may have already been read - * from a metadata cache file earlier; we can re-use during - * the ParquetGroupScan and avoid extra loading time. - */ - private Metadata.ParquetTableMetadataBase parquetTableMetadata = null; - private String cacheFileRoot = null; - - /* - * total number of rows (obtained from parquet footer) - */ - private long rowCount; - - /* - * total number of non-null value for each column in parquet files. - */ - private Map<SchemaPath, Long> columnValueCounts; - - @JsonCreator public ParquetGroupScan( // - @JsonProperty("userName") String userName, - @JsonProperty("entries") List<ReadEntryWithPath> entries,// - @JsonProperty("storage") StoragePluginConfig storageConfig, // - @JsonProperty("format") FormatPluginConfig formatConfig, // - @JacksonInject StoragePluginRegistry engineRegistry, // - @JsonProperty("columns") List<SchemaPath> columns, // - @JsonProperty("selectionRoot") String selectionRoot, // - @JsonProperty("cacheFileRoot") String cacheFileRoot, // - @JsonProperty("filter") LogicalExpression filter - ) throws IOException, ExecutionSetupException { - super(ImpersonationUtil.resolveUserName(userName)); - this.columns = columns; + private String cacheFileRoot; + + @JsonCreator + public ParquetGroupScan(@JacksonInject StoragePluginRegistry engineRegistry, + @JsonProperty("userName") String userName, + @JsonProperty("entries") List<ReadEntryWithPath> entries, + @JsonProperty("storage") StoragePluginConfig storageConfig, + @JsonProperty("format") FormatPluginConfig formatConfig, + @JsonProperty("columns") List<SchemaPath> columns, + @JsonProperty("selectionRoot") String selectionRoot, + @JsonProperty("cacheFileRoot") String cacheFileRoot, + @JsonProperty("filter") LogicalExpression filter) throws IOException, ExecutionSetupException { + super(ImpersonationUtil.resolveUserName(userName), columns, entries, filter); Preconditions.checkNotNull(storageConfig); Preconditions.checkNotNull(formatConfig); this.formatPlugin = (ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatConfig); Preconditions.checkNotNull(formatPlugin); this.fs = ImpersonationUtil.createFileSystem(getUserName(), formatPlugin.getFsConf()); this.formatConfig = formatPlugin.getConfig(); - this.entries = entries; this.selectionRoot = selectionRoot; this.cacheFileRoot = cacheFileRoot; - this.filter = filter; this.metaContext = new MetadataContext(); init(); } - public ParquetGroupScan( // - String userName, - FileSelection selection, // - ParquetFormatPlugin formatPlugin, // - List<SchemaPath> columns) throws IOException { + public ParquetGroupScan(String userName, + FileSelection selection, + ParquetFormatPlugin formatPlugin, + List<SchemaPath> columns) throws IOException { this(userName, selection, formatPlugin, columns, ValueExpressions.BooleanExpression.TRUE); } - public ParquetGroupScan( // - String userName, - FileSelection selection, // - ParquetFormatPlugin formatPlugin, // - List<SchemaPath> columns, - LogicalExpression filter) // - throws IOException { - super(userName); + public ParquetGroupScan(String userName, + FileSelection selection, + ParquetFormatPlugin formatPlugin, + List<SchemaPath> columns, + LogicalExpression filter) throws IOException { + super(userName, columns, new ArrayList<>(), filter); + this.formatPlugin = formatPlugin; - this.columns = columns; this.formatConfig = formatPlugin.getConfig(); this.fs = ImpersonationUtil.createFileSystem(userName, formatPlugin.getFsConf()); - this.selectionRoot = selection.getSelectionRoot(); this.cacheFileRoot = selection.getCacheFileRoot(); MetadataContext metadataContext = selection.getMetaContext(); this.metaContext = metadataContext != null ? metadataContext : new MetadataContext(); - final FileSelection fileSelection = expandIfNecessary(selection); - - this.entries = Lists.newArrayList(); - + FileSelection fileSelection = expandIfNecessary(selection); if (fileSelection != null) { if (checkForInitializingEntriesWithSelectionRoot()) { // The fully expanded list is already stored as part of the fileSet @@ -220,41 +130,20 @@ public class ParquetGroupScan extends AbstractFileGroupScan { } } - this.filter = filter; - init(); } } - /* - * This is used to clone another copy of the group scan. - */ private ParquetGroupScan(ParquetGroupScan that) { this(that, null); } - /* - * This is used to clone another copy of the group scan, but keep the metadata caching info from the file selection - */ private ParquetGroupScan(ParquetGroupScan that, FileSelection selection) { super(that); - this.columns = that.columns == null ? null : Lists.newArrayList(that.columns); - this.endpointAffinities = that.endpointAffinities == null ? null : Lists.newArrayList(that.endpointAffinities); - this.entries = that.entries == null ? null : Lists.newArrayList(that.entries); this.formatConfig = that.formatConfig; this.formatPlugin = that.formatPlugin; this.fs = that.fs; - this.mappings = that.mappings == null ? null : ArrayListMultimap.create(that.mappings); - this.rowCount = that.rowCount; - this.rowGroupInfos = that.rowGroupInfos == null ? null : Lists.newArrayList(that.rowGroupInfos); this.selectionRoot = that.selectionRoot; - this.columnValueCounts = that.columnValueCounts == null ? null : new HashMap<>(that.columnValueCounts); - this.partitionColTypeMap = that.partitionColTypeMap == null ? null : new HashMap<>(that.partitionColTypeMap); - this.partitionValueMap = that.partitionValueMap == null ? null : new HashMap<>(that.partitionValueMap); - this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet); - this.usedMetadataCache = that.usedMetadataCache; - this.parquetTableMetadata = that.parquetTableMetadata; - this.filter = that.filter; if (selection != null) { this.cacheFileRoot = selection.getCacheFileRoot(); MetadataContext metaContext = selection.getMetaContext(); @@ -263,542 +152,206 @@ public class ParquetGroupScan extends AbstractFileGroupScan { this.cacheFileRoot = that.cacheFileRoot; this.metaContext = that.metaContext; } + this.usedMetadataCache = that.usedMetadataCache; } - /** - * expands the selection's folders if metadata cache is found for the selection root.<br> - * If the selection has already been expanded or no metadata cache was found, does nothing - * - * @param selection actual selection before expansion - * @return new selection after expansion, if no expansion was done returns the input selection - * - * @throws IOException - */ - private FileSelection expandIfNecessary(FileSelection selection) throws IOException { - if (selection.isExpandedFully()) { - return selection; - } - - // use the cacheFileRoot if provided (e.g after partition pruning) - Path metaFilePath = new Path(cacheFileRoot != null ? cacheFileRoot : selectionRoot, Metadata.METADATA_FILENAME); - if (!fs.exists(metaFilePath)) { // no metadata cache - if (selection.isExpandedPartial()) { - logger.error("'{}' metadata file does not exist, but metadata directories cache file is present", metaFilePath); - metaContext.setMetadataCacheCorrupted(true); - } - - return selection; - } - - return expandSelectionFromMetadataCache(selection, metaFilePath); - } - - /** - * For two cases the entries should be initialized with just the selection root instead of the fully expanded list: - * <ul> - * <li> When metadata caching is corrupted (to use correct file selection) - * <li> Metadata caching is correct and used, but pruning was not applicable or was attempted and nothing was pruned - * (to reduce overhead in parquet group scan). - * </ul> - * - * @return true if entries should be initialized with selection root, false otherwise - */ - private boolean checkForInitializingEntriesWithSelectionRoot() { - // TODO: at some point we should examine whether the list of entries is absolutely needed. - return metaContext.isMetadataCacheCorrupted() || (parquetTableMetadata != null && - (metaContext.getPruneStatus() == PruneStatus.NOT_STARTED || metaContext.getPruneStatus() == PruneStatus.NOT_PRUNED)); - } - - public List<ReadEntryWithPath> getEntries() { - return entries; - } - + // getters for serialization / deserialization start @JsonProperty("format") public ParquetFormatConfig getFormatConfig() { - return this.formatConfig; + return formatConfig; } @JsonProperty("storage") public StoragePluginConfig getEngineConfig() { - return this.formatPlugin.getStorageConfig(); + return formatPlugin.getStorageConfig(); } + @JsonProperty public String getSelectionRoot() { return selectionRoot; } - public Set<String> getFileSet() { - return fileSet; + @JsonProperty + public String getCacheFileRoot() { + return cacheFileRoot; } + // getters for serialization / deserialization end - public LogicalExpression getFilter() { - return this.filter; - } - - public void setFilter(LogicalExpression filter) { - this.filter = filter; + @Override + public ParquetRowGroupScan getSpecificScan(int minorFragmentId) { + return new ParquetRowGroupScan(getUserName(), formatPlugin, getReadEntries(minorFragmentId), columns, selectionRoot, filter); } @Override - public boolean hasFiles() { - return true; + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { + Preconditions.checkArgument(children.isEmpty()); + return new ParquetGroupScan(this); } - @JsonIgnore @Override - public Collection<String> getFiles() { - return fileSet; + public GroupScan clone(List<SchemaPath> columns) { + ParquetGroupScan newScan = new ParquetGroupScan(this); + newScan.columns = columns; + return newScan; } - private Set<String> fileSet; - - @JsonIgnore - // only for partition columns : value is unique for each partition - private Map<SchemaPath, MajorType> partitionColTypeMap = Maps.newHashMap(); - - /** - * When reading the very first footer, any column is a potential partition column. So for the first footer, we check - * every column to see if it is single valued, and if so, add it to the list of potential partition columns. For the - * remaining footers, we will not find any new partition columns, but we may discover that what was previously a - * potential partition column now no longer qualifies, so it needs to be removed from the list. - * @return whether column is a potential partition column - */ - private boolean checkForPartitionColumn(ColumnMetadata columnMetadata, boolean first, long rowCount) { - SchemaPath schemaPath = SchemaPath.getCompoundPath(columnMetadata.getName()); - final PrimitiveTypeName primitiveType; - final OriginalType originalType; - int precision = 0; - int scale = 0; - if (this.parquetTableMetadata.hasColumnMetadata()) { - // only ColumnTypeMetadata_v3 stores information about scale and precision - if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v3) { - Metadata.ColumnTypeMetadata_v3 columnTypeInfo = ((Metadata.ParquetTableMetadata_v3) parquetTableMetadata) - .getColumnTypeInfo(columnMetadata.getName()); - scale = columnTypeInfo.scale; - precision = columnTypeInfo.precision; - } - primitiveType = this.parquetTableMetadata.getPrimitiveType(columnMetadata.getName()); - originalType = this.parquetTableMetadata.getOriginalType(columnMetadata.getName()); - } else { - primitiveType = columnMetadata.getPrimitiveType(); - originalType = columnMetadata.getOriginalType(); - } - if (first) { - if (hasSingleValue(columnMetadata, rowCount)) { - partitionColTypeMap.put(schemaPath, getType(primitiveType, originalType, scale, precision)); - return true; - } else { - return false; - } - } else { - if (!partitionColTypeMap.keySet().contains(schemaPath)) { - return false; - } else { - if (!hasSingleValue(columnMetadata, rowCount)) { - partitionColTypeMap.remove(schemaPath); - return false; - } - if (!getType(primitiveType, originalType, scale, precision).equals(partitionColTypeMap.get(schemaPath))) { - partitionColTypeMap.remove(schemaPath); - return false; - } - } - } - return true; + @Override + public ParquetGroupScan clone(FileSelection selection) throws IOException { + ParquetGroupScan newScan = new ParquetGroupScan(this, selection); + newScan.modifyFileSelection(selection); + newScan.init(); + return newScan; } - /** - * Builds major type using given {@code OriginalType originalType} or {@code PrimitiveTypeName type}. - * For DECIMAL will be returned major type with scale and precision. - * - * @param type parquet primitive type - * @param originalType parquet original type - * @param scale type scale (used for DECIMAL type) - * @param precision type precision (used for DECIMAL type) - * @return major type - */ - public static MajorType getType(PrimitiveTypeName type, OriginalType originalType, int scale, int precision) { - if (originalType != null) { - switch (originalType) { - case DECIMAL: - return Types.withScaleAndPrecision(MinorType.DECIMAL18, TypeProtos.DataMode.OPTIONAL, scale, precision); - case DATE: - return Types.optional(MinorType.DATE); - case TIME_MILLIS: - return Types.optional(MinorType.TIME); - case TIMESTAMP_MILLIS: - return Types.optional(MinorType.TIMESTAMP); - case UTF8: - return Types.optional(MinorType.VARCHAR); - case UINT_8: - return Types.optional(MinorType.UINT1); - case UINT_16: - return Types.optional(MinorType.UINT2); - case UINT_32: - return Types.optional(MinorType.UINT4); - case UINT_64: - return Types.optional(MinorType.UINT8); - case INT_8: - return Types.optional(MinorType.TINYINT); - case INT_16: - return Types.optional(MinorType.SMALLINT); - case INTERVAL: - return Types.optional(MinorType.INTERVAL); - } - } + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("ParquetGroupScan ["); + builder.append("entries=").append(entries); + builder.append(", selectionRoot=").append(selectionRoot); + builder.append(", numFiles=").append(getEntries().size()); + builder.append(", numRowGroups=").append(rowGroupInfos.size()); + builder.append(", usedMetadataFile=").append(usedMetadataCache); - switch (type) { - case BOOLEAN: - return Types.optional(MinorType.BIT); - case INT32: - return Types.optional(MinorType.INT); - case INT64: - return Types.optional(MinorType.BIGINT); - case FLOAT: - return Types.optional(MinorType.FLOAT4); - case DOUBLE: - return Types.optional(MinorType.FLOAT8); - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - case INT96: - return Types.optional(MinorType.VARBINARY); - default: - // Should never hit this - throw new UnsupportedOperationException("Unsupported type:" + type); + String filterString = getFilterString(); + if (!filterString.isEmpty()) { + builder.append(", filter=").append(filterString); } - } - /** - * Checks that the column chunk has a single value. - * - * @param columnChunkMetaData metadata to check - * @param rowCount rows count in column chunk - * @return true if column has single value - */ - private boolean hasSingleValue(ColumnMetadata columnChunkMetaData, long rowCount) { - // ColumnMetadata will have a non-null value iff the minValue and the maxValue for the - // rowgroup are the same - return (columnChunkMetaData != null) && (columnChunkMetaData.hasSingleValue(rowCount)); - } - - @Override public void modifyFileSelection(FileSelection selection) { - entries.clear(); - fileSet = Sets.newHashSet(); - for (String fileName : selection.getFiles()) { - entries.add(new ReadEntryWithPath(fileName)); - fileSet.add(fileName); + if (usedMetadataCache) { + // For EXPLAIN, remove the URI prefix from cacheFileRoot. If cacheFileRoot is null, we + // would have read the cache file from selectionRoot + String cacheFileRootString = (cacheFileRoot == null) ? + Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)).toString() : + Path.getPathWithoutSchemeAndAuthority(new Path(cacheFileRoot)).toString(); + builder.append(", cacheFileRoot=").append(cacheFileRootString); } - List<RowGroupInfo> newRowGroupList = Lists.newArrayList(); - for (RowGroupInfo rowGroupInfo : rowGroupInfos) { - if (fileSet.contains(rowGroupInfo.getPath())) { - newRowGroupList.add(rowGroupInfo); - } - } - this.rowGroupInfos = newRowGroupList; - } + builder.append(", columns=").append(columns); + builder.append("]"); - public MajorType getTypeForColumn(SchemaPath schemaPath) { - return partitionColTypeMap.get(schemaPath); + return builder.toString(); } - // Map from file names to maps of column name to partition value mappings - private Map<String, Map<SchemaPath, Object>> partitionValueMap = Maps.newHashMap(); - - public void populatePruningVector(ValueVector v, int index, SchemaPath column, String file) { - String f = Path.getPathWithoutSchemeAndAuthority(new Path(file)).toString(); - MajorType majorType = getTypeForColumn(column); - MinorType type = majorType.getMinorType(); - switch (type) { - case BIT: { - NullableBitVector bitVector = (NullableBitVector) v; - Boolean value = (Boolean) partitionValueMap.get(f).get(column); - if (value == null) { - bitVector.getMutator().setNull(index); - } else { - bitVector.getMutator().setSafe(index, value ? 1 : 0); - } - return; - } - case INT: { - NullableIntVector intVector = (NullableIntVector) v; - Integer value = (Integer) partitionValueMap.get(f).get(column); - if (value == null) { - intVector.getMutator().setNull(index); - } else { - intVector.getMutator().setSafe(index, value); - } - return; - } - case SMALLINT: { - NullableSmallIntVector smallIntVector = (NullableSmallIntVector) v; - Integer value = (Integer) partitionValueMap.get(f).get(column); - if (value == null) { - smallIntVector.getMutator().setNull(index); - } else { - smallIntVector.getMutator().setSafe(index, value.shortValue()); - } - return; - } - case TINYINT: { - NullableTinyIntVector tinyIntVector = (NullableTinyIntVector) v; - Integer value = (Integer) partitionValueMap.get(f).get(column); - if (value == null) { - tinyIntVector.getMutator().setNull(index); - } else { - tinyIntVector.getMutator().setSafe(index, value.byteValue()); - } - return; - } - case UINT1: { - NullableUInt1Vector intVector = (NullableUInt1Vector) v; - Integer value = (Integer) partitionValueMap.get(f).get(column); - if (value == null) { - intVector.getMutator().setNull(index); - } else { - intVector.getMutator().setSafe(index, value.byteValue()); - } - return; - } - case UINT2: { - NullableUInt2Vector intVector = (NullableUInt2Vector) v; - Integer value = (Integer) partitionValueMap.get(f).get(column); - if (value == null) { - intVector.getMutator().setNull(index); - } else { - intVector.getMutator().setSafe(index, (char) value.shortValue()); - } - return; - } - case UINT4: { - NullableUInt4Vector intVector = (NullableUInt4Vector) v; - Integer value = (Integer) partitionValueMap.get(f).get(column); - if (value == null) { - intVector.getMutator().setNull(index); - } else { - intVector.getMutator().setSafe(index, value); - } - return; - } - case BIGINT: { - NullableBigIntVector bigIntVector = (NullableBigIntVector) v; - Long value = (Long) partitionValueMap.get(f).get(column); - if (value == null) { - bigIntVector.getMutator().setNull(index); - } else { - bigIntVector.getMutator().setSafe(index, value); - } - return; - } - case FLOAT4: { - NullableFloat4Vector float4Vector = (NullableFloat4Vector) v; - Float value = (Float) partitionValueMap.get(f).get(column); - if (value == null) { - float4Vector.getMutator().setNull(index); - } else { - float4Vector.getMutator().setSafe(index, value); - } - return; - } - case FLOAT8: { - NullableFloat8Vector float8Vector = (NullableFloat8Vector) v; - Double value = (Double) partitionValueMap.get(f).get(column); - if (value == null) { - float8Vector.getMutator().setNull(index); - } else { - float8Vector.getMutator().setSafe(index, value); - } - return; - } - case VARBINARY: { - NullableVarBinaryVector varBinaryVector = (NullableVarBinaryVector) v; - Object s = partitionValueMap.get(f).get(column); - byte[] bytes; - if (s == null) { - varBinaryVector.getMutator().setNull(index); - return; - } else { - bytes = getBytes(type, s); - } - varBinaryVector.getMutator().setSafe(index, bytes, 0, bytes.length); - return; - } - case DECIMAL18: { - NullableDecimal18Vector decimalVector = (NullableDecimal18Vector) v; - Object s = partitionValueMap.get(f).get(column); - byte[] bytes; - if (s == null) { - decimalVector.getMutator().setNull(index); - return; - } else if (s instanceof Integer) { - long value = DecimalUtility.getBigDecimalFromPrimitiveTypes( - (Integer) s, - majorType.getScale(), - majorType.getPrecision()).longValue(); - decimalVector.getMutator().setSafe(index, value); - return; - } else if (s instanceof Long) { - long value = DecimalUtility.getBigDecimalFromPrimitiveTypes( - (Long) s, - majorType.getScale(), - majorType.getPrecision()).longValue(); - decimalVector.getMutator().setSafe(index, value); - return; - } else { - bytes = getBytes(type, s); - } - long value = DecimalUtility.getBigDecimalFromByteArray(bytes, 0, bytes.length, majorType.getScale()).longValue(); - decimalVector.getMutator().setSafe(index, value); - return; + // overridden protected methods block start + @Override + protected void initInternal() throws IOException { + FileSystem processUserFileSystem = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fs.getConf()); + Path metaPath = null; + if (entries.size() == 1 && parquetTableMetadata == null) { + Path p = Path.getPathWithoutSchemeAndAuthority(new Path(entries.get(0).getPath())); + if (fs.isDirectory(p)) { + // Using the metadata file makes sense when querying a directory; otherwise + // if querying a single file we can look up the metadata directly from the file + metaPath = new Path(p, Metadata.METADATA_FILENAME); } - case DATE: { - NullableDateVector dateVector = (NullableDateVector) v; - Integer value = (Integer) partitionValueMap.get(f).get(column); - if (value == null) { - dateVector.getMutator().setNull(index); - } else { - dateVector.getMutator().setSafe(index, value * (long) DateTimeConstants.MILLIS_PER_DAY); + if (!metaContext.isMetadataCacheCorrupted() && metaPath != null && fs.exists(metaPath)) { + parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaPath, metaContext, formatConfig); + if (parquetTableMetadata != null) { + usedMetadataCache = true; } - return; } - case TIME: { - NullableTimeVector timeVector = (NullableTimeVector) v; - Integer value = (Integer) partitionValueMap.get(f).get(column); - if (value == null) { - timeVector.getMutator().setNull(index); - } else { - timeVector.getMutator().setSafe(index, value); - } - return; + if (!usedMetadataCache) { + parquetTableMetadata = Metadata.getParquetTableMetadata(processUserFileSystem, p.toString(), formatConfig); } - case TIMESTAMP: { - NullableTimeStampVector timeStampVector = (NullableTimeStampVector) v; - Long value = (Long) partitionValueMap.get(f).get(column); - if (value == null) { - timeStampVector.getMutator().setNull(index); - } else { - timeStampVector.getMutator().setSafe(index, value); + } else { + Path p = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)); + metaPath = new Path(p, Metadata.METADATA_FILENAME); + if (!metaContext.isMetadataCacheCorrupted() && fs.isDirectory(new Path(selectionRoot)) + && fs.exists(metaPath)) { + if (parquetTableMetadata == null) { + parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaPath, metaContext, formatConfig); } - return; - } - case VARCHAR: { - NullableVarCharVector varCharVector = (NullableVarCharVector) v; - Object s = partitionValueMap.get(f).get(column); - byte[] bytes; - if (s == null) { - varCharVector.getMutator().setNull(index); - return; - } else { - bytes = getBytes(type, s); + if (parquetTableMetadata != null) { + usedMetadataCache = true; + if (fileSet != null) { + parquetTableMetadata = removeUnneededRowGroups(parquetTableMetadata); + } } - varCharVector.getMutator().setSafe(index, bytes, 0, bytes.length); - return; } - case INTERVAL: { - NullableIntervalVector intervalVector = (NullableIntervalVector) v; - Object s = partitionValueMap.get(f).get(column); - byte[] bytes; - if (s == null) { - intervalVector.getMutator().setNull(index); - return; - } else { - bytes = getBytes(type, s); + if (!usedMetadataCache) { + final List<FileStatus> fileStatuses = new ArrayList<>(); + for (ReadEntryWithPath entry : entries) { + fileStatuses.addAll( + DrillFileSystemUtil.listFiles(fs, Path.getPathWithoutSchemeAndAuthority(new Path(entry.getPath())), true)); } - intervalVector.getMutator().setSafe(index, 1, - ParquetReaderUtility.getIntFromLEBytes(bytes, 0), - ParquetReaderUtility.getIntFromLEBytes(bytes, 4), - ParquetReaderUtility.getIntFromLEBytes(bytes, 8)); - return; + + Map<FileStatus, FileSystem> statusMap = fileStatuses.stream() + .collect( + Collectors.toMap( + Function.identity(), + s -> processUserFileSystem, + (oldFs, newFs) -> newFs, + LinkedHashMap::new)); + + parquetTableMetadata = Metadata.getParquetTableMetadata(statusMap, formatConfig); } - default: - throw new UnsupportedOperationException("Unsupported type: " + type); } } - /** - * Returns the sequence of bytes received from {@code Object source}. - * - * @param type the column type - * @param source the source of the bytes sequence - * @return bytes sequence obtained from {@code Object source} - */ - private byte[] getBytes(MinorType type, Object source) { - byte[] bytes; - if (source instanceof Binary) { - bytes = ((Binary) source).getBytes(); - } else if (source instanceof byte[]) { - bytes = (byte[]) source; - } else { - throw new UnsupportedOperationException("Unable to create column data for type: " + type); - } - return bytes; + @Override + protected AbstractParquetGroupScan cloneWithFileSelection(Collection<String> filePaths) throws IOException { + FileSelection newSelection = new FileSelection(null, new ArrayList<>(filePaths), getSelectionRoot(), cacheFileRoot, false); + return clone(newSelection); } - public static class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, FileWork { - - private EndpointByteMap byteMap; - private int rowGroupIndex; - private List<? extends ColumnMetadata> columns; - private long rowCount; // rowCount = -1 indicates to include all rows. - private long numRecordsToRead; - - @JsonCreator - public RowGroupInfo(@JsonProperty("path") String path, @JsonProperty("start") long start, - @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex, long rowCount) { - super(path, start, length); - this.rowGroupIndex = rowGroupIndex; - this.rowCount = rowCount; - this.numRecordsToRead = rowCount; - } - - public RowGroupReadEntry getRowGroupReadEntry() { - return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), - this.rowGroupIndex, this.getNumRecordsToRead()); - } - - public int getRowGroupIndex() { - return this.rowGroupIndex; - } - - @Override - public int compareTo(CompleteWork o) { - return Long.compare(getTotalBytes(), o.getTotalBytes()); - } - - @Override - public long getTotalBytes() { - return this.getLength(); - } - - @Override - public EndpointByteMap getByteMap() { - return byteMap; - } + @Override + protected Collection<DrillbitEndpoint> getDrillbits() { + return formatPlugin.getContext().getBits(); + } - public long getNumRecordsToRead() { - return numRecordsToRead; - } + @Override + protected boolean supportsFileImplicitColumns() { + return selectionRoot != null; + } - public void setNumRecordsToRead(long numRecords) { - numRecordsToRead = numRecords; - } + @Override + protected List<String> getPartitionValues(RowGroupInfo rowGroupInfo) { + return ColumnExplorer.listPartitionValues(rowGroupInfo.getPath(), selectionRoot); + } + // overridden protected methods block end - public void setEndpointByteMap(EndpointByteMap byteMap) { - this.byteMap = byteMap; + // private methods block start + /** + * Expands the selection's folders if metadata cache is found for the selection root.<br> + * If the selection has already been expanded or no metadata cache was found, does nothing + * + * @param selection actual selection before expansion + * @return new selection after expansion, if no expansion was done returns the input selection + */ + private FileSelection expandIfNecessary(FileSelection selection) throws IOException { + if (selection.isExpandedFully()) { + return selection; } - public long getRowCount() { - return rowCount; - } + // use the cacheFileRoot if provided (e.g after partition pruning) + Path metaFilePath = new Path(cacheFileRoot != null ? cacheFileRoot : selectionRoot, Metadata.METADATA_FILENAME); + if (!fs.exists(metaFilePath)) { // no metadata cache + if (selection.isExpandedPartial()) { + logger.error("'{}' metadata file does not exist, but metadata directories cache file is present", metaFilePath); + metaContext.setMetadataCacheCorrupted(true); + } - public List<? extends ColumnMetadata> getColumns() { - return columns; + return selection; } - public void setColumns(List<? extends ColumnMetadata> columns) { - this.columns = columns; - } + return expandSelectionFromMetadataCache(selection, metaFilePath); + } + /** + * For two cases the entries should be initialized with just the selection root instead of the fully expanded list: + * <ul> + * <li> When metadata caching is corrupted (to use correct file selection) + * <li> Metadata caching is correct and used, but pruning was not applicable or was attempted and nothing was pruned + * (to reduce overhead in parquet group scan). + * </ul> + * + * @return true if entries should be initialized with selection root, false otherwise + */ + private boolean checkForInitializingEntriesWithSelectionRoot() { + // TODO: at some point we should examine whether the list of entries is absolutely needed. + return metaContext.isMetadataCacheCorrupted() || (parquetTableMetadata != null && + (metaContext.getPruneStatus() == PruneStatus.NOT_STARTED || metaContext.getPruneStatus() == PruneStatus.NOT_PRUNED)); } /** @@ -810,17 +363,16 @@ public class ParquetGroupScan extends AbstractFileGroupScan { * @param metaFilePath metadata cache file path * @return file selection read from cache * - * @throws IOException * @throws UserException when the updated selection is empty, this happens if the user selects an empty folder. */ - private FileSelection - expandSelectionFromMetadataCache(FileSelection selection, Path metaFilePath) throws IOException { + private FileSelection expandSelectionFromMetadataCache(FileSelection selection, Path metaFilePath) throws IOException { // get the metadata for the root directory by reading the metadata file // parquetTableMetadata contains the metadata for all files in the selection root folder, but we need to make sure // we only select the files that are part of selection (by setting fileSet appropriately) // get (and set internal field) the metadata for the directory by reading the metadata file - parquetTableMetadata = Metadata.readBlockMeta(fs, metaFilePath, metaContext, formatConfig); + FileSystem processUserFileSystem = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fs.getConf()); + parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaFilePath, metaContext, formatConfig); if (ignoreExpandingSelection(parquetTableMetadata)) { return selection; } @@ -837,7 +389,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { final Path first = fileStatuses.get(0).getPath(); if (fileStatuses.size() == 1 && selection.getSelectionRoot().equals(first.toString())) { // we are selecting all files from selection root. Expand the file list from the cache - for (Metadata.ParquetFileMetadata file : parquetTableMetadata.getFiles()) { + for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) { fileSet.add(file.getPath()); } @@ -850,7 +402,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { // list of files from the metadata cache file that is present in the cacheFileRoot directory and populate // the fileSet. However, this is *not* the final list of files that will be scanned in execution since the // second phase of partition pruning will apply on the files and modify the file selection appropriately. - for (Metadata.ParquetFileMetadata file : this.parquetTableMetadata.getFiles()) { + for (ParquetFileMetadata file : this.parquetTableMetadata.getFiles()) { fileSet.add(file.getPath()); } } @@ -861,11 +413,11 @@ public class ParquetGroupScan extends AbstractFileGroupScan { if (status.isDirectory()) { //TODO [DRILL-4496] read the metadata cache files in parallel final Path metaPath = new Path(cacheFileRoot, Metadata.METADATA_FILENAME); - final Metadata.ParquetTableMetadataBase metadata = Metadata.readBlockMeta(fs, metaPath, metaContext, formatConfig); + final ParquetTableMetadataBase metadata = Metadata.readBlockMeta(processUserFileSystem, metaPath, metaContext, formatConfig); if (ignoreExpandingSelection(metadata)) { return selection; } - for (Metadata.ParquetFileMetadata file : metadata.getFiles()) { + for (ParquetFileMetadata file : metadata.getFiles()) { fileSet.add(file.getPath()); } } else { @@ -881,7 +433,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { return null; } - List<String> fileNames = Lists.newArrayList(fileSet); + List<String> fileNames = new ArrayList<>(fileSet); // when creating the file selection, set the selection root without the URI prefix // The reason is that the file names above have been created in the form @@ -902,141 +454,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan { return newSelection; } - private void init() throws IOException { - Path metaPath = null; - if (entries.size() == 1 && parquetTableMetadata == null) { - Path p = Path.getPathWithoutSchemeAndAuthority(new Path(entries.get(0).getPath())); - if (fs.isDirectory(p)) { - // Using the metadata file makes sense when querying a directory; otherwise - // if querying a single file we can look up the metadata directly from the file - metaPath = new Path(p, Metadata.METADATA_FILENAME); - } - if (!metaContext.isMetadataCacheCorrupted() && metaPath != null && fs.exists(metaPath)) { - parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath, metaContext, formatConfig); - if (parquetTableMetadata != null) { - usedMetadataCache = true; - } - } - if (!usedMetadataCache) { - parquetTableMetadata = Metadata.getParquetTableMetadata(fs, p.toString(), formatConfig); - } - } else { - Path p = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)); - metaPath = new Path(p, Metadata.METADATA_FILENAME); - if (!metaContext.isMetadataCacheCorrupted() && fs.isDirectory(new Path(selectionRoot)) - && fs.exists(metaPath)) { - if (parquetTableMetadata == null) { - parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath, metaContext, formatConfig); - } - if (parquetTableMetadata != null) { - usedMetadataCache = true; - if (fileSet != null) { - parquetTableMetadata = removeUnneededRowGroups(parquetTableMetadata); - } - } - } - if (!usedMetadataCache) { - final List<FileStatus> fileStatuses = Lists.newArrayList(); - for (ReadEntryWithPath entry : entries) { - fileStatuses.addAll(DrillFileSystemUtil.listFiles(fs, Path.getPathWithoutSchemeAndAuthority(new Path(entry.getPath())), true)); - } - parquetTableMetadata = Metadata.getParquetTableMetadata(fs, fileStatuses, formatConfig); - } - } - - if (fileSet == null) { - fileSet = Sets.newHashSet(); - for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) { - fileSet.add(file.getPath()); - } - } - - Map<String, DrillbitEndpoint> hostEndpointMap = Maps.newHashMap(); - - for (DrillbitEndpoint endpoint : formatPlugin.getContext().getBits()) { - hostEndpointMap.put(endpoint.getAddress(), endpoint); - } - - rowGroupInfos = Lists.newArrayList(); - for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) { - int rgIndex = 0; - for (RowGroupMetadata rg : file.getRowGroups()) { - RowGroupInfo rowGroupInfo = - new RowGroupInfo(file.getPath(), rg.getStart(), rg.getLength(), rgIndex, rg.getRowCount()); - EndpointByteMap endpointByteMap = new EndpointByteMapImpl(); - for (String host : rg.getHostAffinity().keySet()) { - if (hostEndpointMap.containsKey(host)) { - endpointByteMap - .add(hostEndpointMap.get(host), (long) (rg.getHostAffinity().get(host) * rg.getLength())); - } - } - rowGroupInfo.setEndpointByteMap(endpointByteMap); - rowGroupInfo.setColumns(rg.getColumns()); - rgIndex++; - rowGroupInfos.add(rowGroupInfo); - } - } - - this.endpointAffinities = AffinityCreator.getAffinityMap(rowGroupInfos); - updatePartitionColTypeMap(); - } - - private void updatePartitionColTypeMap() { - columnValueCounts = Maps.newHashMap(); - this.rowCount = 0; - boolean first = true; - for (RowGroupInfo rowGroup : this.rowGroupInfos) { - long rowCount = rowGroup.getRowCount(); - for (ColumnMetadata column : rowGroup.getColumns()) { - SchemaPath schemaPath = SchemaPath.getCompoundPath(column.getName()); - Long previousCount = columnValueCounts.get(schemaPath); - if (previousCount != null) { - if (previousCount != GroupScan.NO_COLUMN_STATS) { - if (column.getNulls() != null) { - Long newCount = rowCount - column.getNulls(); - columnValueCounts.put(schemaPath, columnValueCounts.get(schemaPath) + newCount); - } - } - } else { - if (column.getNulls() != null) { - Long newCount = rowCount - column.getNulls(); - columnValueCounts.put(schemaPath, newCount); - } else { - columnValueCounts.put(schemaPath, GroupScan.NO_COLUMN_STATS); - } - } - boolean partitionColumn = checkForPartitionColumn(column, first, rowCount); - if (partitionColumn) { - Map<SchemaPath, Object> map = partitionValueMap.get(rowGroup.getPath()); - if (map == null) { - map = Maps.newHashMap(); - partitionValueMap.put(rowGroup.getPath(), map); - } - Object value = map.get(schemaPath); - Object currentValue = column.getMaxValue(); - if (value != null) { - if (value != currentValue) { - partitionColTypeMap.remove(schemaPath); - } - } else { - // the value of a column with primitive type can not be null, - // so checks that there are really null value and puts it to the map - if (rowCount == column.getNulls()) { - map.put(schemaPath, null); - } else { - map.put(schemaPath, currentValue); - } - } - } else { - partitionColTypeMap.remove(schemaPath); - } - } - this.rowCount += rowGroup.getRowCount(); - first = false; - } - } private ParquetTableMetadataBase removeUnneededRowGroups(ParquetTableMetadataBase parquetTableMetadata) { - List<ParquetFileMetadata> newFileMetadataList = Lists.newArrayList(); + List<ParquetFileMetadata> newFileMetadataList = new ArrayList<>(); for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) { if (fileSet.contains(file.getPath())) { newFileMetadataList.add(file); @@ -1049,284 +468,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan { } /** - * Calculates the affinity each endpoint has for this scan, by adding up the affinity each endpoint has for each - * rowGroup - * - * @return a list of EndpointAffinity objects - */ - @Override - public List<EndpointAffinity> getOperatorAffinity() { - return this.endpointAffinities; - } - - @Override - public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) throws PhysicalOperatorSetupException { - - this.mappings = AssignmentCreator.getMappings(incomingEndpoints, rowGroupInfos); - } - - @Override public ParquetRowGroupScan getSpecificScan(int minorFragmentId) { - assert minorFragmentId < mappings.size() : String - .format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", - mappings.size(), minorFragmentId); - - List<RowGroupInfo> rowGroupsForMinor = mappings.get(minorFragmentId); - - Preconditions.checkArgument(!rowGroupsForMinor.isEmpty(), - String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId)); - - return new ParquetRowGroupScan( - getUserName(), formatPlugin, convertToReadEntries(rowGroupsForMinor), columns, selectionRoot, filter); - } - - private List<RowGroupReadEntry> convertToReadEntries(List<RowGroupInfo> rowGroups) { - List<RowGroupReadEntry> entries = Lists.newArrayList(); - for (RowGroupInfo rgi : rowGroups) { - RowGroupReadEntry entry = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), rgi.getRowGroupIndex(), rgi.getNumRecordsToRead()); - entries.add(entry); - } - return entries; - } - - @Override - public int getMaxParallelizationWidth() { - return rowGroupInfos.size(); - } - - public List<SchemaPath> getColumns() { - return columns; - } - - @Override - public ScanStats getScanStats() { - int columnCount = columns == null ? 20 : columns.size(); - return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, rowCount, 1, rowCount * columnCount); - } - - @Override - @JsonIgnore - public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { - Preconditions.checkArgument(children.isEmpty()); - return new ParquetGroupScan(this); - } - - @Override - public String getDigest() { - return toString(); - } - - @Override - public String toString() { - String cacheFileString = ""; - if (usedMetadataCache) { - // For EXPLAIN, remove the URI prefix from cacheFileRoot. If cacheFileRoot is null, we - // would have read the cache file from selectionRoot - String str = (cacheFileRoot == null) ? - Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)).toString() : - Path.getPathWithoutSchemeAndAuthority(new Path(cacheFileRoot)).toString(); - cacheFileString = ", cacheFileRoot=" + str; - } - final String filterStr = filter == null || filter.equals(ValueExpressions.BooleanExpression.TRUE) ? "" : ", filter=" + ExpressionStringBuilder.toString(this.filter); - - return "ParquetGroupScan [entries=" + entries - + ", selectionRoot=" + selectionRoot - + ", numFiles=" + getEntries().size() - + ", numRowGroups=" + rowGroupInfos.size() - + ", usedMetadataFile=" + usedMetadataCache - + filterStr - + cacheFileString - + ", columns=" + columns - + "]"; - } - - @Override - public GroupScan clone(List<SchemaPath> columns) { - ParquetGroupScan newScan = new ParquetGroupScan(this); - newScan.columns = columns; - return newScan; - } - - // Based on maxRecords to read for the scan, - // figure out how many rowGroups to read and update number of records to read for each of them. - // Returns total number of rowGroups to read. - private int updateRowGroupInfo(long maxRecords) { - long count = 0; - int index = 0; - for (RowGroupInfo rowGroupInfo : rowGroupInfos) { - long rowCount = rowGroupInfo.getRowCount(); - if (count + rowCount <= maxRecords) { - count += rowCount; - rowGroupInfo.setNumRecordsToRead(rowCount); - index++; - continue; - } else if (count < maxRecords) { - rowGroupInfo.setNumRecordsToRead(maxRecords - count); - index++; - } - break; - } - - return index; - } - - @Override - public ParquetGroupScan clone(FileSelection selection) throws IOException { - ParquetGroupScan newScan = new ParquetGroupScan(this, selection); - newScan.modifyFileSelection(selection); - newScan.init(); - return newScan; - } - - public ParquetGroupScan clone(FileSelection selection, long maxRecords) throws IOException { - ParquetGroupScan newScan = clone(selection); - newScan.updateRowGroupInfo(maxRecords); - return newScan; - } - - @Override - public boolean supportsLimitPushdown() { - return true; - } - - @Override - public GroupScan applyLimit(int maxRecords) { - Preconditions.checkArgument(rowGroupInfos.size() >= 0); - - maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 row -> 1 rowGroup. - // further optimization : minimize # of files chosen, or the affinity of files chosen. - - // Calculate number of rowGroups to read based on maxRecords and update - // number of records to read for each of those rowGroups. - int index = updateRowGroupInfo(maxRecords); - - Set<String> fileNames = Sets.newHashSet(); // HashSet keeps a fileName unique. - for (RowGroupInfo rowGroupInfo : rowGroupInfos.subList(0, index)) { - fileNames.add(rowGroupInfo.getPath()); - } - - // If there is no change in fileSet, no need to create new groupScan. - if (fileNames.size() == fileSet.size() ) { - // There is no reduction of rowGroups. Return the original groupScan. - logger.debug("applyLimit() does not apply!"); - return null; - } - - try { - FileSelection newSelection = new FileSelection(null, Lists.newArrayList(fileNames), getSelectionRoot(), cacheFileRoot, false); - logger.debug("applyLimit() reduce parquet file # from {} to {}", fileSet.size(), fileNames.size()); - return this.clone(newSelection, maxRecords); - } catch (IOException e) { - logger.warn("Could not apply rowcount based prune due to Exception : {}", e); - return null; - } - } - - @Override - @JsonIgnore - public boolean canPushdownProjects(List<SchemaPath> columns) { - return true; - } - - /** - * Return column value count for the specified column. If does not contain such column, return 0. - */ - @Override - public long getColumnValueCount(SchemaPath column) { - return columnValueCounts.containsKey(column) ? columnValueCounts.get(column) : 0; - } - - @Override - public List<SchemaPath> getPartitionColumns() { - return new ArrayList<>(partitionColTypeMap.keySet()); - } - - public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities, - FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) { - if (rowGroupInfos.size() == 1 || - ! (parquetTableMetadata.isRowGroupPrunable()) || - rowGroupInfos.size() > optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD) - ) { - // Stop pruning for 3 cases: - // - 1 single parquet file, - // - metadata does not have proper format to support row group level filter pruning, - // - # of row groups is beyond PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD. - return null; - } - - final Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new ParquetRGFilterEvaluator.FieldReferenceFinder(), null); - - final List<RowGroupInfo> qualifiedRGs = new ArrayList<>(rowGroupInfos.size()); - Set<String> qualifiedFileNames = Sets.newHashSet(); // HashSet keeps a fileName unique. - - ParquetFilterPredicate filterPredicate = null; - - for (RowGroupInfo rowGroup : rowGroupInfos) { - final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, this.columns); - Map<String, String> implicitColValues = columnExplorer.populateImplicitColumns(rowGroup.getPath(), selectionRoot); - - ParquetMetaStatCollector statCollector = new ParquetMetaStatCollector( - parquetTableMetadata, - rowGroup.getColumns(), - implicitColValues); - - Map<SchemaPath, ColumnStatistics> columnStatisticsMap = statCollector.collectColStat(schemaPathsInExpr); - - if (filterPredicate == null) { - ErrorCollector errorCollector = new ErrorCollectorImpl(); - LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr( - filterExpr, columnStatisticsMap, errorCollector, functionImplementationRegistry); - - if (errorCollector.hasErrors()) { - logger.error("{} error(s) encountered when materialize filter expression : {}", - errorCollector.getErrorCount(), errorCollector.toErrorString()); - return null; - } - // logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter)); - - Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter); - filterPredicate = (ParquetFilterPredicate) ParquetFilterBuilder.buildParquetFilterPredicate( - materializedFilter, constantBoundaries, udfUtilities); - - if (filterPredicate == null) { - return null; - } - } - - if (ParquetRGFilterEvaluator.canDrop(filterPredicate, columnStatisticsMap, rowGroup.getRowCount())) { - continue; - } - - qualifiedRGs.add(rowGroup); - qualifiedFileNames.add(rowGroup.getPath()); // TODO : optimize when 1 file contains m row groups. - } - - - if (qualifiedRGs.size() == rowGroupInfos.size() ) { - // There is no reduction of rowGroups. Return the original groupScan. - logger.debug("applyFilter does not have any pruning!"); - return null; - } else if (qualifiedFileNames.size() == 0) { - logger.warn("All rowgroups have been filtered out. Add back one to get schema from scannner"); - RowGroupInfo rg = rowGroupInfos.iterator().next(); - qualifiedFileNames.add(rg.getPath()); - qualifiedRGs.add(rg); - } - - try { - FileSelection newSelection = new FileSelection(null, Lists.newArrayList(qualifiedFileNames), getSelectionRoot(), cacheFileRoot, false); - logger.info("applyFilter {} reduce parquet rowgroup # from {} to {}", ExpressionStringBuilder.toString(filterExpr), rowGroupInfos.size(), qualifiedRGs.size()); - ParquetGroupScan clonegroupscan = this.clone(newSelection); - clonegroupscan.rowGroupInfos = qualifiedRGs; - clonegroupscan.updatePartitionColTypeMap(); - return clonegroupscan; - - } catch (IOException e) { - logger.warn("Could not apply filter prune due to Exception : {}", e); - return null; - } - } - - /** * If metadata is corrupted, ignore expanding selection and reset parquetTableMetadata and fileSet fields * * @param metadata parquet table metadata @@ -1341,5 +482,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan { } return false; } + // private methods block end }
http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java new file mode 100644 index 0000000..5a3e0c2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ColumnMetadata; +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase; +import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ColumnTypeMetadata_v3; +import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTableMetadata_v3; + +/** + * Holds common statistics about data in parquet group scan, + * including information about total row count, columns counts, partition columns. + */ +public class ParquetGroupScanStatistics { + + // map from file names to maps of column name to partition value mappings + private Map<String, Map<SchemaPath, Object>> partitionValueMap; + // only for partition columns : value is unique for each partition + private Map<SchemaPath, TypeProtos.MajorType> partitionColTypeMap; + // total number of non-null value for each column in parquet files + private Map<SchemaPath, Long> columnValueCounts; + // total number of rows (obtained from parquet footer) + private long rowCount; + + + public ParquetGroupScanStatistics(List<RowGroupInfo> rowGroupInfos, ParquetTableMetadataBase parquetTableMetadata) { + collect(rowGroupInfos, parquetTableMetadata); + } + + public ParquetGroupScanStatistics(ParquetGroupScanStatistics that) { + this.partitionValueMap = new HashMap<>(that.partitionValueMap); + this.partitionColTypeMap = new HashMap<>(that.partitionColTypeMap); + this.columnValueCounts = new HashMap<>(that.columnValueCounts); + this.rowCount = that.rowCount; + } + + public long getColumnValueCount(SchemaPath column) { + return columnValueCounts.containsKey(column) ? columnValueCounts.get(column) : 0; + } + + public List<SchemaPath> getPartitionColumns() { + return new ArrayList<>(partitionColTypeMap.keySet()); + } + + public TypeProtos.MajorType getTypeForColumn(SchemaPath schemaPath) { + return partitionColTypeMap.get(schemaPath); + } + + public long getRowCount() { + return rowCount; + } + + public Object getPartitionValue(String path, SchemaPath column) { + return partitionValueMap.get(path).get(column); + } + + public void collect(List<RowGroupInfo> rowGroupInfos, ParquetTableMetadataBase parquetTableMetadata) { + resetHolders(); + boolean first = true; + for (RowGroupInfo rowGroup : rowGroupInfos) { + long rowCount = rowGroup.getRowCount(); + for (ColumnMetadata column : rowGroup.getColumns()) { + SchemaPath schemaPath = SchemaPath.getCompoundPath(column.getName()); + Long previousCount = columnValueCounts.get(schemaPath); + if (previousCount != null) { + if (previousCount != GroupScan.NO_COLUMN_STATS && column.getNulls() != null) { + Long newCount = rowCount - column.getNulls(); + columnValueCounts.put(schemaPath, columnValueCounts.get(schemaPath) + newCount); + } + } else { + if (column.getNulls() != null) { + Long newCount = rowCount - column.getNulls(); + columnValueCounts.put(schemaPath, newCount); + } else { + columnValueCounts.put(schemaPath, GroupScan.NO_COLUMN_STATS); + } + } + boolean partitionColumn = checkForPartitionColumn(column, first, rowCount, parquetTableMetadata); + if (partitionColumn) { + Map<SchemaPath, Object> map = partitionValueMap.computeIfAbsent(rowGroup.getPath(), key -> new HashMap<>()); + Object value = map.get(schemaPath); + Object currentValue = column.getMaxValue(); + if (value != null) { + if (value != currentValue) { + partitionColTypeMap.remove(schemaPath); + } + } else { + // the value of a column with primitive type can not be null, + // so checks that there are really null value and puts it to the map + if (rowCount == column.getNulls()) { + map.put(schemaPath, null); + } else { + map.put(schemaPath, currentValue); + } + } + } else { + partitionColTypeMap.remove(schemaPath); + } + } + this.rowCount += rowGroup.getRowCount(); + first = false; + } + } + + /** + * Re-init holders eigther during first instance creation or statistics update based on updated list of row groups. + */ + private void resetHolders() { + this.partitionValueMap = new HashMap<>(); + this.partitionColTypeMap = new HashMap<>(); + this.columnValueCounts = new HashMap<>(); + this.rowCount = 0; + } + + /** + * When reading the very first footer, any column is a potential partition column. So for the first footer, we check + * every column to see if it is single valued, and if so, add it to the list of potential partition columns. For the + * remaining footers, we will not find any new partition columns, but we may discover that what was previously a + * potential partition column now no longer qualifies, so it needs to be removed from the list. + * + * @param columnMetadata column metadata + * @param first if columns first appears in row group + * @param rowCount row count + * @param parquetTableMetadata parquet table metadata + * + * @return whether column is a potential partition column + */ + private boolean checkForPartitionColumn(ColumnMetadata columnMetadata, + boolean first, + long rowCount, + ParquetTableMetadataBase parquetTableMetadata) { + SchemaPath schemaPath = SchemaPath.getCompoundPath(columnMetadata.getName()); + final PrimitiveType.PrimitiveTypeName primitiveType; + final OriginalType originalType; + int precision = 0; + int scale = 0; + if (parquetTableMetadata.hasColumnMetadata()) { + // only ColumnTypeMetadata_v3 stores information about scale and precision + if (parquetTableMetadata instanceof ParquetTableMetadata_v3) { + ColumnTypeMetadata_v3 columnTypeInfo = ((ParquetTableMetadata_v3) parquetTableMetadata) + .getColumnTypeInfo(columnMetadata.getName()); + scale = columnTypeInfo.scale; + precision = columnTypeInfo.precision; + } + primitiveType = parquetTableMetadata.getPrimitiveType(columnMetadata.getName()); + originalType = parquetTableMetadata.getOriginalType(columnMetadata.getName()); + } else { + primitiveType = columnMetadata.getPrimitiveType(); + originalType = columnMetadata.getOriginalType(); + } + if (first) { + if (hasSingleValue(columnMetadata, rowCount)) { + partitionColTypeMap.put(schemaPath, ParquetReaderUtility.getType(primitiveType, originalType, scale, precision)); + return true; + } else { + return false; + } + } else { + if (!partitionColTypeMap.keySet().contains(schemaPath)) { + return false; + } else { + if (!hasSingleValue(columnMetadata, rowCount)) { + partitionColTypeMap.remove(schemaPath); + return false; + } + if (!ParquetReaderUtility.getType(primitiveType, originalType, scale, precision).equals(partitionColTypeMap.get(schemaPath))) { + partitionColTypeMap.remove(schemaPath); + return false; + } + } + } + return true; + } + + /** + * Checks that the column chunk has a single value. + * ColumnMetadata will have a non-null value iff the minValue and + * the maxValue for the rowgroup are the same. + * + * @param columnChunkMetaData metadata to check + * @param rowCount rows count in column chunk + * + * @return true if column has single value + */ + private boolean hasSingleValue(ColumnMetadata columnChunkMetaData, long rowCount) { + return (columnChunkMetaData != null) && (columnChunkMetaData.hasSingleValue(rowCount)); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java index 45fa51d..83ce4d2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java @@ -65,7 +65,7 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule { @Override public boolean matches(RelOptRuleCall call) { final ScanPrel scan = call.rel(2); - if (scan.getGroupScan() instanceof ParquetGroupScan) { + if (scan.getGroupScan() instanceof AbstractParquetGroupScan) { return super.matches(call); } return false; @@ -90,7 +90,7 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule { @Override public boolean matches(RelOptRuleCall call) { final ScanPrel scan = call.rel(1); - if (scan.getGroupScan() instanceof ParquetGroupScan) { + if (scan.getGroupScan() instanceof AbstractParquetGroupScan) { return super.matches(call); } return false; @@ -114,7 +114,7 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule { } protected void doOnMatch(RelOptRuleCall call, FilterPrel filter, ProjectPrel project, ScanPrel scan) { - ParquetGroupScan groupScan = (ParquetGroupScan) scan.getGroupScan(); + AbstractParquetGroupScan groupScan = (AbstractParquetGroupScan) scan.getGroupScan(); if (groupScan.getFilter() != null && !groupScan.getFilter().equals(ValueExpressions.BooleanExpression.TRUE)) { return; } @@ -152,25 +152,26 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule { LogicalExpression conditionExp = DrillOptiq.toDrill( new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, qualifedPred); - Stopwatch timer = Stopwatch.createStarted(); + + Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; final GroupScan newGroupScan = groupScan.applyFilter(conditionExp,optimizerContext, optimizerContext.getFunctionRegistry(), optimizerContext.getPlannerSettings().getOptions()); - logger.info("Took {} ms to apply filter on parquet row groups. ", timer.elapsed(TimeUnit.MILLISECONDS)); + if (timer != null) { + logger.debug("Took {} ms to apply filter on parquet row groups. ", timer.elapsed(TimeUnit.MILLISECONDS)); + timer.stop(); + } if (newGroupScan == null ) { return; } - final ScanPrel newScanRel = ScanPrel.create(scan, scan.getTraitSet(), newGroupScan, scan.getRowType()); - RelNode inputRel = newScanRel; + RelNode newScan = ScanPrel.create(scan, scan.getTraitSet(), newGroupScan, scan.getRowType());; if (project != null) { - inputRel = project.copy(project.getTraitSet(), ImmutableList.of(inputRel)); + newScan = project.copy(project.getTraitSet(), ImmutableList.of(newScan)); } - - final RelNode newFilter = filter.copy(filter.getTraitSet(), ImmutableList.of(inputRel)); - + final RelNode newFilter = filter.copy(filter.getTraitSet(), ImmutableList.<RelNode>of(newScan)); call.transformTo(newFilter); } } http://git-wip-us.apache.org/repos/asf/drill/blob/c6549e58/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java index c2580cb..9157590 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderUtility.java @@ -22,9 +22,12 @@ import org.apache.commons.codec.binary.Base64; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.expr.holders.NullableTimeStampHolder; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.store.parquet.metadata.MetadataVersion; import org.apache.drill.exec.util.Utilities; import org.apache.drill.exec.work.ExecErrorConstants; import org.apache.parquet.SemanticVersion; @@ -53,6 +56,15 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.drill.exec.store.parquet.metadata.Metadata_V2.ColumnTypeMetadata_v2; +import static org.apache.drill.exec.store.parquet.metadata.Metadata_V2.ParquetTableMetadata_v2; +import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ColumnTypeMetadata_v3; +import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTableMetadata_v3; +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ColumnMetadata; +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetTableMetadataBase; +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.ParquetFileMetadata; +import static org.apache.drill.exec.store.parquet.metadata.MetadataBase.RowGroupMetadata; + /** * Utility class where we can capture common logic between the two parquet readers */ @@ -142,7 +154,7 @@ public class ParquetReaderUtility { return (int) (corruptedDate - CORRECT_CORRUPT_DATE_SHIFT); } - public static void correctDatesInMetadataCache(Metadata.ParquetTableMetadataBase parquetTableMetadata) { + public static void correctDatesInMetadataCache(ParquetTableMetadataBase parquetTableMetadata) { DateCorruptionStatus cacheFileCanContainsCorruptDates = new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 0)) >= 0 ? DateCorruptionStatus.META_SHOWS_NO_CORRUPTION : DateCorruptionStatus.META_UNCLEAR_TEST_VALUES; @@ -150,19 +162,19 @@ public class ParquetReaderUtility { // Looking for the DATE data type of column names in the metadata cache file ("metadata_version" : "v2") String[] names = new String[0]; if (new MetadataVersion(2, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) { - for (Metadata.ColumnTypeMetadata_v2 columnTypeMetadata : - ((Metadata.ParquetTableMetadata_v2) parquetTableMetadata).columnTypeInfo.values()) { + for (ColumnTypeMetadata_v2 columnTypeMetadata : + ((ParquetTableMetadata_v2) parquetTableMetadata).columnTypeInfo.values()) { if (OriginalType.DATE.equals(columnTypeMetadata.originalType)) { names = columnTypeMetadata.name; } } } - for (Metadata.ParquetFileMetadata file : parquetTableMetadata.getFiles()) { + for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) { // Drill has only ever written a single row group per file, only need to correct the statistics // on the first row group - Metadata.RowGroupMetadata rowGroupMetadata = file.getRowGroups().get(0); + RowGroupMetadata rowGroupMetadata = file.getRowGroups().get(0); Long rowCount = rowGroupMetadata.getRowCount(); - for (Metadata.ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) { + for (ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) { // Setting Min/Max values for ParquetTableMetadata_v1 if (new MetadataVersion(1, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) { OriginalType originalType = columnMetadata.getOriginalType(); @@ -192,15 +204,15 @@ public class ParquetReaderUtility { * * @param parquetTableMetadata table metadata that should be corrected */ - public static void correctBinaryInMetadataCache(Metadata.ParquetTableMetadataBase parquetTableMetadata) { + public static void correctBinaryInMetadataCache(ParquetTableMetadataBase parquetTableMetadata) { // Looking for the names of the columns with BINARY data type // in the metadata cache file for V2 and all v3 versions Set<List<String>> columnsNames = getBinaryColumnsNames(parquetTableMetadata); - for (Metadata.ParquetFileMetadata file : parquetTableMetadata.getFiles()) { - for (Metadata.RowGroupMetadata rowGroupMetadata : file.getRowGroups()) { + for (ParquetFileMetadata file : parquetTableMetadata.getFiles()) { + for (RowGroupMetadata rowGroupMetadata : file.getRowGroups()) { Long rowCount = rowGroupMetadata.getRowCount(); - for (Metadata.ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) { + for (ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) { // Setting Min/Max values for ParquetTableMetadata_v1 if (new MetadataVersion(1, 0).equals(new MetadataVersion(parquetTableMetadata.getMetadataVersion()))) { if (columnMetadata.getPrimitiveType() == PrimitiveTypeName.BINARY @@ -231,19 +243,19 @@ public class ParquetReaderUtility { * @param parquetTableMetadata table metadata the source of the columns to check * @return set of the lists with column names */ - private static Set<List<String>> getBinaryColumnsNames(Metadata.ParquetTableMetadataBase parquetTableMetadata) { + private static Set<List<String>> getBinaryColumnsNames(ParquetTableMetadataBase parquetTableMetadata) { Set<List<String>> names = Sets.newHashSet(); - if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v2) { - for (Metadata.ColumnTypeMetadata_v2 columnTypeMetadata : - ((Metadata.ParquetTableMetadata_v2) parquetTableMetadata).columnTypeInfo.values()) { + if (parquetTableMetadata instanceof ParquetTableMetadata_v2) { + for (ColumnTypeMetadata_v2 columnTypeMetadata : + ((ParquetTableMetadata_v2) parquetTableMetadata).columnTypeInfo.values()) { if (columnTypeMetadata.primitiveType == PrimitiveTypeName.BINARY || columnTypeMetadata.primitiveType == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { names.add(Arrays.asList(columnTypeMetadata.name)); } } - } else if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v3) { - for (Metadata.ColumnTypeMetadata_v3 columnTypeMetadata : - ((Metadata.ParquetTableMetadata_v3) parquetTableMetadata).columnTypeInfo.values()) { + } else if (parquetTableMetadata instanceof ParquetTableMetadata_v3) { + for (ColumnTypeMetadata_v3 columnTypeMetadata : + ((ParquetTableMetadata_v3) parquetTableMetadata).columnTypeInfo.values()) { if (columnTypeMetadata.primitiveType == PrimitiveTypeName.BINARY || columnTypeMetadata.primitiveType == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { names.add(Arrays.asList(columnTypeMetadata.name)); @@ -260,7 +272,7 @@ public class ParquetReaderUtility { * @param columnMetadata column metadata that should be changed * @param rowCount rows count in column chunk */ - private static void setMinMaxValues(Metadata.ColumnMetadata columnMetadata, long rowCount) { + private static void setMinMaxValues(ColumnMetadata columnMetadata, long rowCount) { if (columnMetadata.hasSingleValue(rowCount)) { Object minValue = columnMetadata.getMinValue(); if (minValue != null && minValue instanceof String) { @@ -278,7 +290,7 @@ public class ParquetReaderUtility { * @param columnMetadata column metadata that should be changed * @param rowCount rows count in column chunk */ - private static void convertMinMaxValues(Metadata.ColumnMetadata columnMetadata, long rowCount) { + private static void convertMinMaxValues(ColumnMetadata columnMetadata, long rowCount) { if (columnMetadata.hasSingleValue(rowCount)) { Object minValue = columnMetadata.getMinValue(); if (minValue != null && minValue instanceof String) { @@ -362,8 +374,8 @@ public class ParquetReaderUtility { * This method only checks the first Row Group, because Drill has only ever written * a single Row Group per file. * - * @param footer - * @param columns + * @param footer parquet footer + * @param columns list of columns schema path * @param autoCorrectCorruptDates user setting to allow enabling/disabling of auto-correction * of corrupt dates. There are some rare cases (storing dates thousands * of years into the future, with tools other than Drill writing files) @@ -459,4 +471,66 @@ public class ParquetReaderUtility { } } } + + /** + * Builds major type using given {@code OriginalType originalType} or {@code PrimitiveTypeName type}. + * For DECIMAL will be returned major type with scale and precision. + * + * @param type parquet primitive type + * @param originalType parquet original type + * @param scale type scale (used for DECIMAL type) + * @param precision type precision (used for DECIMAL type) + * @return major type + */ + public static TypeProtos.MajorType getType(PrimitiveTypeName type, OriginalType originalType, int scale, int precision) { + if (originalType != null) { + switch (originalType) { + case DECIMAL: + return Types.withScaleAndPrecision(TypeProtos.MinorType.DECIMAL18, TypeProtos.DataMode.OPTIONAL, scale, precision); + case DATE: + return Types.optional(TypeProtos.MinorType.DATE); + case TIME_MILLIS: + return Types.optional(TypeProtos.MinorType.TIME); + case TIMESTAMP_MILLIS: + return Types.optional(TypeProtos.MinorType.TIMESTAMP); + case UTF8: + return Types.optional(TypeProtos.MinorType.VARCHAR); + case UINT_8: + return Types.optional(TypeProtos.MinorType.UINT1); + case UINT_16: + return Types.optional(TypeProtos.MinorType.UINT2); + case UINT_32: + return Types.optional(TypeProtos.MinorType.UINT4); + case UINT_64: + return Types.optional(TypeProtos.MinorType.UINT8); + case INT_8: + return Types.optional(TypeProtos.MinorType.TINYINT); + case INT_16: + return Types.optional(TypeProtos.MinorType.SMALLINT); + case INTERVAL: + return Types.optional(TypeProtos.MinorType.INTERVAL); + } + } + + switch (type) { + case BOOLEAN: + return Types.optional(TypeProtos.MinorType.BIT); + case INT32: + return Types.optional(TypeProtos.MinorType.INT); + case INT64: + return Types.optional(TypeProtos.MinorType.BIGINT); + case FLOAT: + return Types.optional(TypeProtos.MinorType.FLOAT4); + case DOUBLE: + return Types.optional(TypeProtos.MinorType.FLOAT8); + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + case INT96: + return Types.optional(TypeProtos.MinorType.VARBINARY); + default: + // Should never hit this + throw new UnsupportedOperationException("Unsupported type:" + type); + } + } + }