Repository: hive Updated Branches: refs/heads/master 20c95c1c0 -> 7cf791472
HIVE-19937: Intern fields in MapWork on deserialization (Sahil Takiar, reviewed by Vihang Karajgaonkar) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7cf79147 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7cf79147 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7cf79147 Branch: refs/heads/master Commit: 7cf7914729ceeca34017ad6671a97a1290915e10 Parents: 20c95c1 Author: Sahil Takiar <takiar.sa...@gmail.com> Authored: Mon Jun 18 15:38:13 2018 -0500 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Mon Aug 6 11:05:33 2018 +0200 ---------------------------------------------------------------------- .../hive/ql/exec/AbstractMapOperator.java | 4 +- .../apache/hadoop/hive/ql/exec/MapOperator.java | 13 ++--- .../hive/ql/exec/SerializationUtilities.java | 52 +++++++++++++++++++- .../hive/ql/exec/vector/VectorMapOperator.java | 12 ++--- .../hive/ql/io/parquet/ProjectionPusher.java | 4 +- .../org/apache/hadoop/hive/ql/plan/MapWork.java | 17 +++++-- .../hadoop/hive/ql/plan/PartitionDesc.java | 2 +- 7 files changed, 82 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/7cf79147/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java index 0d1c688..c7af991 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapOperator.java @@ -96,7 +96,7 @@ public abstract class AbstractMapOperator extends Operator<MapWork> return path; } - protected String getNominalPath(Path fpath) { + protected Path getNominalPath(Path fpath) { Path nominal = null; boolean schemaless = fpath.toUri().getScheme() == null; for (Path onefile : conf.getPathToAliases().keySet()) { @@ -119,7 +119,7 @@ public abstract class AbstractMapOperator extends Operator<MapWork> if (nominal == null) { throw new IllegalStateException("Invalid input path " + fpath); } - return nominal.toString(); + return nominal; } public abstract void initEmptyInputChildren(List<Operator<?>> children, Configuration hconf) http://git-wip-us.apache.org/repos/asf/hive/blob/7cf79147/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index c7350ca..b9986d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -79,8 +79,7 @@ public class MapOperator extends AbstractMapOperator { protected transient long logEveryNRows = 0; // input path --> {operator --> context} - private final Map<String, Map<Operator<?>, MapOpCtx>> opCtxMap = - new HashMap<String, Map<Operator<?>, MapOpCtx>>(); + private final Map<Path, Map<Operator<?>, MapOpCtx>> opCtxMap = new HashMap<>(); // child operator --> object inspector (converted OI if it's needed) private final Map<Operator<?>, StructObjectInspector> childrenOpToOI = new HashMap<Operator<?>, StructObjectInspector>(); @@ -440,10 +439,8 @@ public class MapOperator extends AbstractMapOperator { LOG.debug("Adding alias " + alias + " to work list for file " + onefile); } - Map<Operator<?>, MapOpCtx> contexts = opCtxMap.get(onefile.toString()); - if (contexts == null) { - opCtxMap.put(onefile.toString(), contexts = new LinkedHashMap<Operator<?>, MapOpCtx>()); - } + Map<Operator<?>, MapOpCtx> contexts = opCtxMap.computeIfAbsent(onefile, + k -> new LinkedHashMap<>()); if (contexts.containsKey(op)) { continue; } @@ -515,7 +512,7 @@ public class MapOperator extends AbstractMapOperator { public void cleanUpInputFileChangedOp() throws HiveException { super.cleanUpInputFileChangedOp(); Path fpath = getExecContext().getCurrentInputPath(); - String nominalPath = getNominalPath(fpath); + Path nominalPath = getNominalPath(fpath); Map<Operator<?>, MapOpCtx> contexts = opCtxMap.get(nominalPath); if (LOG.isInfoEnabled()) { StringBuilder builder = new StringBuilder(); @@ -703,7 +700,7 @@ public class MapOperator extends AbstractMapOperator { public void initializeContexts() { Path fpath = getExecContext().getCurrentInputPath(); - String nominalPath = getNominalPath(fpath); + Path nominalPath = getNominalPath(fpath); Map<Operator<?>, MapOpCtx> contexts = opCtxMap.get(nominalPath); currentCtxs = contexts.values().toArray(new MapOpCtx[contexts.size()]); } http://git-wip-us.apache.org/repos/asf/hive/blob/7cf79147/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java index e03429b..28550d8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.hive.ql.plan.SparkWork; @@ -233,13 +234,14 @@ public class SerializationUtilities { kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer()); kryo.register(new java.util.ArrayList().subList(0,0).getClass(), new ArrayListSubListSerializer()); kryo.register(CopyOnFirstWriteProperties.class, new CopyOnFirstWritePropertiesSerializer()); + kryo.register(MapWork.class, new MapWorkSerializer(kryo, MapWork.class)); + kryo.register(PartitionDesc.class, new PartitionDescSerializer(kryo, PartitionDesc.class)); ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()) .setFallbackInstantiatorStrategy( new StdInstantiatorStrategy()); removeField(kryo, AbstractOperatorDesc.class, "colExprMap"); removeField(kryo, AbstractOperatorDesc.class, "statistics"); - kryo.register(MapWork.class); kryo.register(ReduceWork.class); kryo.register(TableDesc.class); kryo.register(UnionOperator.class); @@ -541,6 +543,54 @@ public class SerializationUtilities { } /** + * We use a custom {@link com.esotericsoftware.kryo.Serializer} for {@link MapWork} objects in + * order to invoke any string interning code present in the "setter" methods. The fields in {@link + * MapWork} often store paths that contain duplicate strings, so interning them can decrease + * memory significantly. + */ + private static class MapWorkSerializer extends FieldSerializer<MapWork> { + + MapWorkSerializer(Kryo kryo, Class type) { + super(kryo, type); + } + + @Override + public MapWork read(Kryo kryo, Input input, Class<MapWork> type) { + MapWork mapWork = super.read(kryo, input, type); + // The set methods in MapWork intern the any duplicate strings which is why we call them + // during de-serialization + mapWork.setPathToPartitionInfo(mapWork.getPathToPartitionInfo()); + mapWork.setPathToAliases(mapWork.getPathToAliases()); + return mapWork; + } + } + + /** + * We use a custom {@link com.esotericsoftware.kryo.Serializer} for {@link PartitionDesc} objects + * in order to invoke any string interning code present in the "setter" methods. {@link + * PartitionDesc} objects are usually stored by {@link MapWork} objects and contain duplicate info + * like input format class names, partition specs, etc. + */ + private static class PartitionDescSerializer extends FieldSerializer<PartitionDesc> { + + PartitionDescSerializer(Kryo kryo, Class type) { + super(kryo, type); + } + + @Override + public PartitionDesc read(Kryo kryo, Input input, Class<PartitionDesc> type) { + PartitionDesc partitionDesc = super.read(kryo, input, type); + // The set methods in PartitionDesc intern the any duplicate strings which is why we call them + // during de-serialization + partitionDesc.setBaseFileName(partitionDesc.getBaseFileName()); + partitionDesc.setPartSpec(partitionDesc.getPartSpec()); + partitionDesc.setInputFileFormatClass(partitionDesc.getInputFileFormatClass()); + partitionDesc.setOutputFileFormatClass(partitionDesc.getOutputFileFormatClass()); + return partitionDesc; + } + } + + /** * Serializes the plan. * * @param plan The plan, such as QueryPlan, MapredWork, etc. http://git-wip-us.apache.org/repos/asf/hive/blob/7cf79147/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java index bd70991..5a903d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java @@ -92,7 +92,7 @@ public class VectorMapOperator extends AbstractMapOperator { /* * Overall information on this vectorized Map operation. */ - private transient HashMap<String, VectorPartitionContext> fileToPartitionContextMap; + private transient HashMap<Path, VectorPartitionContext> fileToPartitionContextMap; private transient Operator<? extends OperatorDesc> oneRootOperator; @@ -555,7 +555,7 @@ public class VectorMapOperator extends AbstractMapOperator { * The Vectorizer class enforces that there is only one TableScanOperator, so * we don't need the more complicated multiple root operator mapping that MapOperator has. */ - fileToPartitionContextMap = new HashMap<String, VectorPartitionContext>(); + fileToPartitionContextMap = new HashMap<>(); // Temporary map so we only create one partition context entry. HashMap<PartitionDesc, VectorPartitionContext> partitionContextMap = @@ -573,7 +573,7 @@ public class VectorMapOperator extends AbstractMapOperator { vectorPartitionContext = partitionContextMap.get(partDesc); } - fileToPartitionContextMap.put(path.toString(), vectorPartitionContext); + fileToPartitionContextMap.put(path, vectorPartitionContext); } // Create list of one. @@ -593,7 +593,7 @@ public class VectorMapOperator extends AbstractMapOperator { public void initializeContexts() throws HiveException { Path fpath = getExecContext().getCurrentInputPath(); - String nominalPath = getNominalPath(fpath); + Path nominalPath = getNominalPath(fpath); setupPartitionContextVars(nominalPath); } @@ -602,7 +602,7 @@ public class VectorMapOperator extends AbstractMapOperator { public void cleanUpInputFileChangedOp() throws HiveException { super.cleanUpInputFileChangedOp(); Path fpath = getExecContext().getCurrentInputPath(); - String nominalPath = getNominalPath(fpath); + Path nominalPath = getNominalPath(fpath); setupPartitionContextVars(nominalPath); @@ -641,7 +641,7 @@ public class VectorMapOperator extends AbstractMapOperator { /* * Setup the context for reading from the next partition file. */ - private void setupPartitionContextVars(String nominalPath) throws HiveException { + private void setupPartitionContextVars(Path nominalPath) throws HiveException { currentVectorPartContext = fileToPartitionContextMap.get(nominalPath); if (currentVectorPartContext == null) { http://git-wip-us.apache.org/repos/asf/hive/blob/7cf79147/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java index fd6cc86..0444562 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ProjectionPusher.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +69,8 @@ public class ProjectionPusher { pathToPartitionInfo.clear(); for (final Map.Entry<Path, PartitionDesc> entry : mapWork.getPathToPartitionInfo().entrySet()) { // key contains scheme (such as pfile://) and we want only the path portion fix in HIVE-6366 - pathToPartitionInfo.put(Path.getPathWithoutSchemeAndAuthority(entry.getKey()), entry.getValue()); + pathToPartitionInfo.put(StringInternUtils.internUriStringsInPath( + Path.getPathWithoutSchemeAndAuthority(entry.getKey())), entry.getValue()); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/7cf79147/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index a0bd649..e7256cc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -194,6 +194,7 @@ public class MapWork extends BaseWork { } public void addPathToAlias(Path path, ArrayList<String> aliases){ + StringInternUtils.internUriStringsInPath(path); pathToAliases.put(path, aliases); } @@ -201,6 +202,7 @@ public class MapWork extends BaseWork { ArrayList<String> aliases = pathToAliases.get(path); if (aliases == null) { aliases = new ArrayList<>(); + StringInternUtils.internUriStringsInPath(path); pathToAliases.put(path, aliases); } aliases.add(newAlias.intern()); @@ -243,6 +245,9 @@ public class MapWork extends BaseWork { } public void setPathToPartitionInfo(final LinkedHashMap<Path, PartitionDesc> pathToPartitionInfo) { + for (Path p : pathToPartitionInfo.keySet()) { + StringInternUtils.internUriStringsInPath(p); + } this.pathToPartitionInfo = pathToPartitionInfo; } @@ -690,6 +695,10 @@ public class MapWork extends BaseWork { return eventSourceColumnTypeMap; } + public void setEventSourceColumnTypeMap(Map<String, List<String>> eventSourceColumnTypeMap) { + this.eventSourceColumnTypeMap = eventSourceColumnTypeMap; + } + public Map<String, List<ExprNodeDesc>> getEventSourcePartKeyExprMap() { return eventSourcePartKeyExprMap; } @@ -736,7 +745,7 @@ public class MapWork extends BaseWork { public void setIncludedBuckets(BitSet includedBuckets) { // see comment next to the field - this.includedBuckets = includedBuckets.toByteArray(); + this.includedBuckets = includedBuckets == null ? null : includedBuckets.toByteArray(); } public void setVectorizedRowBatch(VectorizedRowBatch vectorizedRowBatch) { @@ -821,7 +830,8 @@ public class MapWork extends BaseWork { } public void setVectorizationEnabledConditionsMet(ArrayList<String> vectorizationEnabledConditionsMet) { - this.vectorizationEnabledConditionsMet = VectorizationCondition.addBooleans(vectorizationEnabledConditionsMet, true); + this.vectorizationEnabledConditionsMet = vectorizationEnabledConditionsMet == null ? null : VectorizationCondition.addBooleans( + vectorizationEnabledConditionsMet, true); } public List<String> getVectorizationEnabledConditionsMet() { @@ -829,7 +839,8 @@ public class MapWork extends BaseWork { } public void setVectorizationEnabledConditionsNotMet(List<String> vectorizationEnabledConditionsNotMet) { - this.vectorizationEnabledConditionsNotMet = VectorizationCondition.addBooleans(vectorizationEnabledConditionsNotMet, false); + this.vectorizationEnabledConditionsNotMet = vectorizationEnabledConditionsNotMet == null ? null : VectorizationCondition.addBooleans( + vectorizationEnabledConditionsNotMet, false); } public List<String> getVectorizationEnabledConditionsNotMet() { http://git-wip-us.apache.org/repos/asf/hive/blob/7cf79147/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java index 821e428..b226ab7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java @@ -67,7 +67,7 @@ public class PartitionDesc implements Serializable, Cloneable { private VectorPartitionDesc vectorPartitionDesc; public void setBaseFileName(String baseFileName) { - this.baseFileName = baseFileName.intern(); + this.baseFileName = StringInternUtils.internIfNotNull(baseFileName); } public PartitionDesc() {