Author: gates Date: Thu Jan 22 19:16:37 2015 New Revision: 1654014 URL: http://svn.apache.org/r1654014 Log: HIVE-9359 Export of a large table causes OOM in Metastore and Client
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java?rev=1654014&view=auto ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java (added) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/PartitionIterable.java Thu Jan 22 19:16:37 2015 @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.metadata; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + + +/** + * PartitionIterable - effectively a lazy Iterable<Partition> + * + * Sometimes, we have a need for iterating through a list of partitions, + * but the list of partitions can be too big to fetch as a single object. + * Thus, the goal of PartitionIterable is to act as an Iterable<Partition> + * while lazily fetching each relevant partition, one after the other as + * independent metadata calls. + * + * It is very likely that any calls to PartitionIterable are going to result + * in a large number of calls, so use sparingly only when the memory cost + * of fetching all the partitions in one shot is too prohibitive. + * + * This is still pretty costly in that it would retain a list of partition + * names, but that should be far less expensive than the entire partition + * objects. + * + * Note that remove() is an illegal call on this, and will result in an + * IllegalStateException. + */ +public class PartitionIterable implements Iterable<Partition> { + + @Override + public Iterator<Partition> iterator() { + return new Iterator<Partition>(){ + + private boolean initialized = false; + private Iterator<Partition> ptnsIterator = null; + + private Iterator<String> partitionNamesIter = null; + private Iterator<Partition> batchIter = null; + + private void initialize(){ + if(!initialized){ + if (currType == Type.LIST_PROVIDED){ + ptnsIterator = ptnsProvided.iterator(); + } else { + partitionNamesIter = partitionNames.iterator(); + } + initialized = true; + } + } + + public boolean hasNext() { + initialize(); + if (currType == Type.LIST_PROVIDED){ + return ptnsIterator.hasNext(); + } else { + return ((batchIter != null) && batchIter.hasNext()) || partitionNamesIter.hasNext(); + } + } + + @Override + public Partition next() { + initialize(); + if (currType == Type.LIST_PROVIDED){ + return ptnsIterator.next(); + } + + if ((batchIter == null) || !batchIter.hasNext()){ + getNextBatch(); + } + + return batchIter.next(); + } + + private void getNextBatch() { + int batch_counter = 0; + List<String> nameBatch = new ArrayList<String>(); + while (batch_counter < batch_size && partitionNamesIter.hasNext()){ + nameBatch.add(partitionNamesIter.next()); + batch_counter++; + } + try { + batchIter = db.getPartitionsByNames(table,nameBatch).iterator(); + } catch (HiveException e) { + throw new RuntimeException(e); + } + } + + @Override + public void remove() { + throw new IllegalStateException( + "PartitionIterable is a read-only iterable and remove() is unsupported"); + } + }; + } + + enum Type { + LIST_PROVIDED, // Where a List<Partitions is already provided + LAZY_FETCH_PARTITIONS // Where we want to fetch Partitions lazily when they're needed. + }; + + final Type currType; + + // used for LIST_PROVIDED cases + private List<Partition> ptnsProvided = null; + + // used for LAZY_FETCH_PARTITIONS cases + private Hive db = null; + private Table table = null; + private Map<String, String> partialPartitionSpec = null; + private List<String> partitionNames = null; + private int batch_size; + + /** + * Dummy constructor, which simply acts as an iterator on an already-present + * list of partitions, allows for easy drop-in replacement for other methods + * that already have a List<Partition> + */ + public PartitionIterable(List<Partition> ptnsProvided){ + this.currType = Type.LIST_PROVIDED; + this.ptnsProvided = ptnsProvided; + } + + /** + * Primary constructor that fetches all partitions in a given table, given + * a Hive object and a table object, and a partial partition spec. + */ + public PartitionIterable(Hive db, Table table, Map<String, String> partialPartitionSpec, + int batch_size) throws HiveException { + this.currType = Type.LAZY_FETCH_PARTITIONS; + this.db = db; + this.table = table; + this.partialPartitionSpec = partialPartitionSpec; + this.batch_size = batch_size; + + if (this.partialPartitionSpec == null){ + partitionNames = db.getPartitionNames( + table.getDbName(),table.getTableName(), (short) -1); + } else { + partitionNames = db.getPartitionNames( + table.getDbName(),table.getTableName(),partialPartitionSpec,(short)-1); + } + } + +} Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java?rev=1654014&r1=1654013&r2=1654014&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java Thu Jan 22 19:16:37 2015 @@ -47,6 +47,8 @@ import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; import org.apache.thrift.protocol.TJSONProtocol; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; @@ -168,37 +170,37 @@ public class EximUtil { public static final String METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION = null; public static void createExportDump(FileSystem fs, Path metadataPath, org.apache.hadoop.hive.ql.metadata.Table tableHandle, - List<org.apache.hadoop.hive.ql.metadata.Partition> partitions) throws SemanticException, IOException { + Iterable<org.apache.hadoop.hive.ql.metadata.Partition> partitions) throws SemanticException, IOException { + OutputStream out = fs.create(metadataPath); + JsonGenerator jgen = (new JsonFactory()).createJsonGenerator(out); + jgen.writeStartObject(); + jgen.writeStringField("version",METADATA_FORMAT_VERSION); + if (METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION != null) { + jgen.writeStringField("fcversion",METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION); + } + TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); try { - JSONObject jsonContainer = new JSONObject(); - jsonContainer.put("version", METADATA_FORMAT_VERSION); - if (METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION != null) { - jsonContainer.put("fcversion", METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION); - } - TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); - try { - String tableDesc = serializer.toString(tableHandle.getTTable(), "UTF-8"); - jsonContainer.put("table", tableDesc); - JSONArray jsonPartitions = new JSONArray(); - if (partitions != null) { - for (org.apache.hadoop.hive.ql.metadata.Partition partition : partitions) { - String partDesc = serializer.toString(partition.getTPartition(), "UTF-8"); - jsonPartitions.put(partDesc); - } + jgen.writeStringField("table", serializer.toString(tableHandle.getTTable(), "UTF-8")); + jgen.writeFieldName("partitions"); + jgen.writeStartArray(); + if (partitions != null) { + for (org.apache.hadoop.hive.ql.metadata.Partition partition : partitions) { + jgen.writeString(serializer.toString(partition.getTPartition(), "UTF-8")); + jgen.flush(); } - jsonContainer.put("partitions", jsonPartitions); - } catch (TException e) { - throw new SemanticException( - ErrorMsg.GENERIC_ERROR - .getMsg("Exception while serializing the metastore objects"), e); } - OutputStream out = fs.create(metadataPath); - out.write(jsonContainer.toString().getBytes("UTF-8")); - out.close(); - - } catch (JSONException e) { - throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg("Error in serializing metadata"), e); + jgen.writeEndArray(); + } catch (TException e) { + throw new SemanticException( + ErrorMsg.GENERIC_ERROR + .getMsg("Exception while serializing the metastore objects"), e); } + jgen.writeEndObject(); + jgen.close(); // JsonGenerator owns the OutputStream, so it closes it when we call close. + } + + private static void write(OutputStream out, String s) throws IOException { + out.write(s.getBytes("UTF-8")); } public static Map.Entry<Table, List<Partition>> Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java?rev=1654014&r1=1654013&r2=1654014&view=diff ============================================================================== --- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java (original) +++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java Thu Jan 22 19:16:37 2015 @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.hive.ql.metadata.PartitionIterable; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; @@ -81,11 +83,13 @@ public class ExportSemanticAnalyzer exte throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast), e); } - List<Partition> partitions = null; + PartitionIterable partitions = null; try { - partitions = null; if (ts.tableHandle.isPartitioned()) { - partitions = (ts.partitions != null) ? ts.partitions : db.getPartitions(ts.tableHandle); + partitions = (ts.partitions != null) ? + new PartitionIterable(ts.partitions) : + new PartitionIterable(db,ts.tableHandle,null,conf.getIntVar( + HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX)); } Path path = new Path(ctx.getLocalTmpPath(), "_metadata"); EximUtil.createExportDump(FileSystem.getLocal(conf), path, ts.tableHandle, partitions);