Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java Wed Feb 22 09:43:41 2017 @@ -56,6 +56,7 @@ public class POCounterStatsTez extends P private transient KeyValuesReader reader; private transient KeyValueWriter writer; private transient boolean finished = false; + private transient boolean hasNext = false; public POCounterStatsTez(OperatorKey k) { super(k); @@ -88,6 +89,7 @@ public class POCounterStatsTez extends P try { reader = (KeyValuesReader) input.getReader(); LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader); + hasNext = reader.next(); } catch (Exception e) { throw new ExecException(e); } @@ -130,12 +132,13 @@ public class POCounterStatsTez extends P Integer key = null; Long value = null; // Read count of records per task - while (reader.next()) { + while (hasNext) { key = ((IntWritable)reader.getCurrentKey()).get(); for (Object val : reader.getCurrentValues()) { value = ((LongWritable)val).get(); counterRecords.put(key, value); } + hasNext = reader.next(); } // BinInterSedes only takes String for map key
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java Wed Feb 22 09:43:41 2017 @@ -19,6 +19,8 @@ package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -101,9 +103,13 @@ public class POFRJoinTez extends POFRJoi LogicalInput input = inputs.get(key); if (!this.replInputs.contains(input)) { this.replInputs.add(input); - this.replReaders.add((KeyValueReader) input.getReader()); + KeyValueReader reader = (KeyValueReader) input.getReader(); + this.replReaders.add(reader); + log.info("Attached input from vertex " + key + " : input=" + input + ", reader=" + reader); } } + // Do not force fetch input by reading first record. Cases like MultiQuery_Union_4 have + // multiple POFRJoinTez loading same replicate input and will skip records } catch (Exception e) { throw new ExecException(e); } @@ -114,6 +120,7 @@ public class POFRJoinTez extends POFRJoi * * @throws ExecException */ + @SuppressWarnings("unchecked") @Override protected void setUpHashMap() throws ExecException { @@ -121,8 +128,8 @@ public class POFRJoinTez extends POFRJoi // where same POFRJoinTez occurs in different Split sub-plans Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey); if (cacheValue != null) { - replicates = (TupleToMapKey[]) cacheValue; - log.info("Found " + (replicates.length - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey); + replicates = (List<Map<? extends Object, ? extends List<Tuple>>>) cacheValue; + log.info("Found " + (replicates.size() - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey); return; } @@ -148,7 +155,7 @@ public class POFRJoinTez extends POFRJoi long time1 = System.currentTimeMillis(); - replicates[fragment] = null; + replicates.set(fragment, null); int inputIdx = 0; // We need to adjust the index because the number of replInputs is // one less than the number of inputSchemas. The inputSchemas @@ -158,7 +165,12 @@ public class POFRJoinTez extends POFRJoi SchemaTupleFactory inputSchemaTupleFactory = inputSchemaTupleFactories[schemaIdx]; SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[schemaIdx]; - TupleToMapKey replicate = new TupleToMapKey(4000, keySchemaTupleFactory); + Map<Object, ArrayList<Tuple>> replicate; + if (keySchemaTupleFactory == null) { + replicate = new HashMap<Object, ArrayList<Tuple>>(4000); + } else { + replicate = new TupleToMapKey(4000, keySchemaTupleFactory); + } POLocalRearrange lr = LRs[schemaIdx]; try { @@ -168,7 +180,8 @@ public class POFRJoinTez extends POFRJoi } PigNullableWritable key = (PigNullableWritable) replReaders.get(inputIdx).getCurrentKey(); - if (isKeyNull(key.getValueAsPigType())) continue; + Object keyValue = key.getValueAsPigType(); + if (isKeyNull(keyValue)) continue; NullableTuple val = (NullableTuple) replReaders.get(inputIdx).getCurrentValue(); // POFRJoin#getValueTuple() is reused to construct valTuple, @@ -176,27 +189,31 @@ public class POFRJoinTez extends POFRJoi // construct one here. Tuple retTuple = mTupleFactory.newTuple(3); retTuple.set(0, key.getIndex()); - retTuple.set(1, key.getValueAsPigType()); + retTuple.set(1, keyValue); retTuple.set(2, val.getValueAsPigType()); Tuple valTuple = getValueTuple(lr, retTuple); - Tuple keyTuple = mTupleFactory.newTuple(1); - keyTuple.set(0, key.getValueAsPigType()); - if (replicate.get(keyTuple) == null) { - replicate.put(keyTuple, new TuplesToSchemaTupleList(1, inputSchemaTupleFactory)); + ArrayList<Tuple> values = replicate.get(keyValue); + if (values == null) { + if (inputSchemaTupleFactory == null) { + values = new ArrayList<Tuple>(1); + } else { + values = new TuplesToSchemaTupleList(1, inputSchemaTupleFactory); + } + replicate.put(keyValue, values); } - replicate.get(keyTuple).add(valTuple); + values.add(valTuple); } } catch (IOException e) { throw new ExecException(e); } - replicates[schemaIdx] = replicate; + replicates.set(schemaIdx, replicate); inputIdx++; schemaIdx++; } long time2 = System.currentTimeMillis(); - log.info((replicates.length - 1) + " replication hash tables built. Time taken: " + (time2 - time1)); + log.info((replicates.size() - 1) + " replication hash tables built. Time taken: " + (time2 - time1)); ObjectCache.getInstance().cache(cacheKey, replicates); log.info("Cached replicate hash tables in Tez ObjectRegistry with vertex scope. cachekey=" + cacheKey); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java Wed Feb 22 09:43:41 2017 @@ -57,6 +57,7 @@ public class POIdentityInOutTez extends private transient KeyValuesReader shuffleReader; private transient boolean shuffleInput; private transient boolean finished = false; + private transient boolean hasNext = false; public POIdentityInOutTez(OperatorKey k, POLocalRearrange inputRearrange, String inputKey) { super(inputRearrange); @@ -95,9 +96,12 @@ public class POIdentityInOutTez extends Reader r = input.getReader(); if (r instanceof KeyValueReader) { reader = (KeyValueReader) r; + // Force input fetch + hasNext = reader.next(); } else { shuffleInput = true; shuffleReader = (KeyValuesReader) r; + hasNext = shuffleReader.next(); } LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + r); } catch (Exception e) { @@ -127,7 +131,7 @@ public class POIdentityInOutTez extends return RESULT_EOP; } if (shuffleInput) { - while (shuffleReader.next()) { + while (hasNext) { Object curKey = shuffleReader.getCurrentKey(); Iterable<Object> vals = shuffleReader.getCurrentValues(); if (isSkewedJoin) { @@ -139,9 +143,10 @@ public class POIdentityInOutTez extends for (Object val : vals) { writer.write(curKey, val); } + hasNext = shuffleReader.next(); } } else { - while (reader.next()) { + while (hasNext) { if (isSkewedJoin) { NullablePartitionWritable wrappedKey = new NullablePartitionWritable( (PigNullableWritable) reader.getCurrentKey()); @@ -155,6 +160,7 @@ public class POIdentityInOutTez extends writer.write(reader.getCurrentKey(), reader.getCurrentValue()); } + hasNext = reader.next(); } } finished = true; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java Wed Feb 22 09:43:41 2017 @@ -71,8 +71,8 @@ public class POLocalRearrangeTez extends } } - public String getOutputKey() { - return outputKey; + public boolean containsOutputKey(String key) { + return outputKey.equals(key); } public void setOutputKey(String outputKey) { @@ -122,6 +122,10 @@ public class POLocalRearrangeTez extends } } + protected Result getRearrangedTuple() throws ExecException { + return super.getNextTuple(); + } + @Override public Result getNextTuple() throws ExecException { res = super.getNextTuple(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java Wed Feb 22 09:43:41 2017 @@ -51,6 +51,7 @@ public class PORankTez extends PORank im private transient Map<Integer, Long> counterOffsets; private transient Configuration conf; private transient boolean finished = false; + private transient Boolean hasFirstRecord; public PORankTez(PORank copy) { super(copy); @@ -100,6 +101,7 @@ public class PORankTez extends PORank im try { reader = (KeyValueReader) input.getReader(); LOG.info("Attached input from vertex " + tuplesInputKey + " : input=" + input + ", reader=" + reader); + hasFirstRecord = reader.next(); } catch (Exception e) { throw new ExecException(e); } @@ -140,9 +142,18 @@ public class PORankTez extends PORank im Result inp = null; try { - while (reader.next()) { - inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue()); - return addRank(inp); + if (hasFirstRecord != null) { + if (hasFirstRecord) { + hasFirstRecord = null; + inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue()); + return addRank(inp); + } + hasFirstRecord = null; + } else { + while (reader.next()) { + inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue()); + return addRank(inp); + } } } catch (IOException e) { throw new ExecException(e); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java Wed Feb 22 09:43:41 2017 @@ -25,6 +25,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.WritableComparator; import org.apache.pig.backend.executionengine.ExecException; @@ -32,12 +34,16 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.AccumulativeTupleBuffer; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil; import org.apache.pig.data.AccumulativeBag; import org.apache.pig.data.DataBag; import org.apache.pig.data.InternalCachedBag; +import org.apache.pig.data.ReadOnceBag; import org.apache.pig.data.Tuple; import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; @@ -48,6 +54,7 @@ import org.apache.tez.runtime.library.co public class POShuffleTezLoad extends POPackage implements TezInput { private static final long serialVersionUID = 1L; + private static final Log LOG = LogFactory.getLog(POShuffleTezLoad.class); protected List<String> inputKeys = new ArrayList<String>(); private boolean isSkewedJoin = false; @@ -61,6 +68,7 @@ public class POShuffleTezLoad extends PO private transient WritableComparator groupingComparator = null; private transient Configuration conf; private transient int accumulativeBatchSize; + private transient boolean readOnceOneBag; public POShuffleTezLoad(POPackage pack) { super(pack); @@ -101,7 +109,10 @@ public class POShuffleTezLoad extends PO // - Input key will be repeated, but index would be same within a TezInput if (!this.inputs.contains(input)) { this.inputs.add(input); - this.readers.add((KeyValuesReader)input.getReader()); + KeyValuesReader reader = (KeyValuesReader)input.getReader(); + this.readers.add(reader); + LOG.info("Attached input from vertex " + inputKey + + " : input=" + input + ", reader=" + reader); } } @@ -117,6 +128,13 @@ public class POShuffleTezLoad extends PO for (int i = 0; i < numTezInputs; i++) { finished[i] = !readers.get(i).next(); } + + this.readOnceOneBag = (numInputs == 1) + && (pkgr instanceof CombinerPackager + || pkgr instanceof LitePackager || pkgr instanceof BloomPackager); + if (readOnceOneBag) { + readOnce[0] = true; + } } catch (Exception e) { throw new ExecException(e); } @@ -187,43 +205,47 @@ public class POShuffleTezLoad extends PO } else { - for (int i = 0; i < numInputs; i++) { - bags[i] = new InternalCachedBag(numInputs); - } - - if (numTezInputs == 1) { - do { - Iterable<Object> vals = readers.get(0).getCurrentValues(); - for (Object val : vals) { - NullableTuple nTup = (NullableTuple) val; - int index = nTup.getIndex(); - Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index); - bags[index].add(tup); - } - finished[0] = !readers.get(0).next(); - if (finished[0]) { - break; - } - cur = readers.get(0).getCurrentKey(); - } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators + if (readOnceOneBag) { + bags[0] = new TezReadOnceBag(pkgr, min); } else { - for (int i = 0; i < numTezInputs; i++) { - if (!finished[i]) { - cur = readers.get(i).getCurrentKey(); - // We need to loop in case of Grouping Comparators - while (groupingComparator.compare(min, cur) == 0) { - Iterable<Object> vals = readers.get(i).getCurrentValues(); - for (Object val : vals) { - NullableTuple nTup = (NullableTuple) val; - int index = nTup.getIndex(); - Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index); - bags[index].add(tup); - } - finished[i] = !readers.get(i).next(); - if (finished[i]) { - break; - } + for (int i = 0; i < numInputs; i++) { + bags[i] = new InternalCachedBag(numInputs); + } + + if (numTezInputs == 1) { + do { + Iterable<Object> vals = readers.get(0).getCurrentValues(); + for (Object val : vals) { + NullableTuple nTup = (NullableTuple) val; + int index = nTup.getIndex(); + Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index); + bags[index].add(tup); + } + finished[0] = !readers.get(0).next(); + if (finished[0]) { + break; + } + cur = readers.get(0).getCurrentKey(); + } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators + } else { + for (int i = 0; i < numTezInputs; i++) { + if (!finished[i]) { cur = readers.get(i).getCurrentKey(); + // We need to loop in case of Grouping Comparators + while (groupingComparator.compare(min, cur) == 0) { + Iterable<Object> vals = readers.get(i).getCurrentValues(); + for (Object val : vals) { + NullableTuple nTup = (NullableTuple) val; + int index = nTup.getIndex(); + Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index); + bags[index].add(tup); + } + finished[i] = !readers.get(i).next(); + if (finished[i]) { + break; + } + cur = readers.get(i).getCurrentKey(); + } } } } @@ -383,4 +405,74 @@ public class POShuffleTezLoad extends PO } + private class TezReadOnceBag extends ReadOnceBag { + + private static final long serialVersionUID = 1L; + private Iterator<Object> iter; + + public TezReadOnceBag(Packager pkgr, + PigNullableWritable currentKey) throws IOException { + this.pkgr = pkgr; + this.keyWritable = currentKey; + this.iter = readers.get(0).getCurrentValues().iterator(); + } + + @Override + public Iterator<Tuple> iterator() { + return new TezReadOnceBagIterator(); + } + + private class TezReadOnceBagIterator implements Iterator<Tuple> { + + @Override + public boolean hasNext() { + if (iter.hasNext()) { + return true; + } else { + try { + finished[0] = !readers.get(0).next(); + if (finished[0]) { + return false; + } + // Currently combiner is not being applied when secondary key(grouping comparator) is used + // But might change in future. So check if the next key is same and return its values + Object cur = readers.get(0).getCurrentKey(); + if (groupingComparator.compare(keyWritable, cur) == 0) { + iter = readers.get(0).getCurrentValues().iterator(); + // Key should at least have one value. But doing a check just for safety + if (iter.hasNext()) { + return true; + } else { + throw new RuntimeException("Unexpected. Key " + keyWritable + " does not have any values"); + } + } + return false; + } catch (IOException e) { + throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e); + } + } + } + + @Override + public Tuple next() { + NullableTuple ntup = (NullableTuple) iter.next(); + int index = ntup.getIndex(); + Tuple ret = null; + try { + ret = pkgr.getValueTuple(keyWritable, ntup, index); + } catch (ExecException e) { + throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e); + } + return ret; + } + + @Override + public void remove() { + throw new RuntimeException("ReadOnceBag.iterator().remove() is not allowed"); + } + } + + } + + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java Wed Feb 22 09:43:41 2017 @@ -57,6 +57,7 @@ public class POShuffledValueInputTez ext private transient Iterator<KeyValueReader> readers; private transient KeyValueReader currentReader; private transient Configuration conf; + private transient Boolean hasFirstRecord; public POShuffledValueInputTez(OperatorKey k) { super(k); @@ -98,6 +99,8 @@ public class POShuffledValueInputTez ext } readers = readersList.iterator(); currentReader = readers.next(); + // Force input fetch + hasFirstRecord = currentReader.next(); } catch (Exception e) { throw new ExecException(e); } @@ -111,7 +114,15 @@ public class POShuffledValueInputTez ext } do { - if (currentReader.next()) { + if (hasFirstRecord != null) { + if (hasFirstRecord) { + hasFirstRecord = null; + Tuple origTuple = (Tuple) currentReader.getCurrentValue(); + Tuple copy = mTupleFactory.newTuple(origTuple.getAll()); + return new Result(POStatus.STATUS_OK, copy); + } + hasFirstRecord = null; + } else if (currentReader.next()) { Tuple origTuple = (Tuple) currentReader.getCurrentValue(); Tuple copy = mTupleFactory.newTuple(origTuple.getAll()); return new Result(POStatus.STATUS_OK, copy); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java Wed Feb 22 09:43:41 2017 @@ -60,6 +60,8 @@ public class POSimpleTezLoad extends POL private transient Configuration conf; private transient boolean finished = false; private transient TezCounter inputRecordCounter; + private transient boolean initialized; + private transient boolean noTupleCopy; public POSimpleTezLoad(OperatorKey k, LoadFunc loader) { super(k, loader); @@ -149,7 +151,13 @@ public class POSimpleTezLoad extends POL } else { Result res = new Result(); Tuple next = (Tuple) reader.getCurrentValue(); - res.result = next; + if (!initialized) { + noTupleCopy = mTupleFactory.newTuple(1).getClass().isInstance(next); + initialized = true; + } + // Some Loaders return implementations of DefaultTuple instead of BinSedesTuple + // In that case copy to BinSedesTuple + res.result = noTupleCopy ? next : mTupleFactory.newTupleNoCopy(next.getAll()); res.returnStatus = POStatus.STATUS_OK; if (inputRecordCounter != null) { inputRecordCounter.increment(1); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java Wed Feb 22 09:43:41 2017 @@ -102,19 +102,19 @@ public class POStoreTez extends POStore throw new ExecException(e); } - // Multiple outputs - can be another store or other outputs (shuffle, broadcast) - if (outputs.size() > 1) { - CounterGroup multiStoreGroup = processorContext.getCounters() - .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP); - if (multiStoreGroup == null) { - processorContext.getCounters().addGroup( - MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP, - MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP); - } - String name = MRPigStatsUtil.getMultiStoreCounterName(this); - if (name != null) { - outputRecordCounter = multiStoreGroup.addCounter(name, name, 0); - } + // Even if there is a single hdfs output, we add multi store counter + // Makes it easier for user to see records for a particular store from + // the DAG counter + CounterGroup multiStoreGroup = processorContext.getCounters() + .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP); + if (multiStoreGroup == null) { + processorContext.getCounters().addGroup( + MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP, + MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP); + } + String name = MRPigStatsUtil.getMultiStoreCounterName(this); + if (name != null) { + outputRecordCounter = multiStoreGroup.addCounter(name, name, 0); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java Wed Feb 22 09:43:41 2017 @@ -57,6 +57,7 @@ public class POValueInputTez extends Phy private transient KeyValuesReader shuffleReader; private transient boolean shuffleInput; private transient boolean hasNext; + private transient Boolean hasFirstRecord; public POValueInputTez(OperatorKey k) { super(k); @@ -92,6 +93,8 @@ public class POValueInputTez extends Phy Reader r = input.getReader(); if (r instanceof KeyValueReader) { reader = (KeyValueReader) r; + // Force input fetch + hasFirstRecord = reader.next(); } else { shuffleInput = true; shuffleReader = (KeyValuesReader) r; @@ -118,10 +121,22 @@ public class POValueInputTez extends Phy } hasNext = shuffleReader.next(); } - } else if (reader.next()) { - Tuple origTuple = (Tuple) reader.getCurrentValue(); - Tuple copy = mTupleFactory.newTuple(origTuple.getAll()); - return new Result(POStatus.STATUS_OK, copy); + } else { + if (hasFirstRecord != null) { + if (hasFirstRecord) { + hasFirstRecord = null; + Tuple origTuple = (Tuple) reader.getCurrentValue(); + Tuple copy = mTupleFactory.newTuple(origTuple.getAll()); + return new Result(POStatus.STATUS_OK, copy); + } + hasFirstRecord = null; + } else { + while (reader.next()) { + Tuple origTuple = (Tuple) reader.getCurrentValue(); + Tuple copy = mTupleFactory.newTuple(origTuple.getAll()); + return new Result(POStatus.STATUS_OK, copy); + } + } } finished = true; // For certain operators (such as STREAM), we could still have some work Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java Wed Feb 22 09:43:41 2017 @@ -69,6 +69,11 @@ public class CombinerOptimizer extends T } for (TezOperator from : predecessors) { + PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan; + if (!combinePlan.isEmpty()) { + // Cases like bloom join have combine plan already set + continue; + } List<POLocalRearrangeTez> rearranges = PlanHelper.getPhysicalOperators(from.plan, POLocalRearrangeTez.class); if (rearranges.isEmpty()) { continue; @@ -77,7 +82,7 @@ public class CombinerOptimizer extends T POLocalRearrangeTez connectingLR = null; PhysicalPlan rearrangePlan = from.plan; for (POLocalRearrangeTez lr : rearranges) { - if (lr.getOutputKey().equals(to.getOperatorKey().toString())) { + if (lr.containsOutputKey(to.getOperatorKey().toString())) { connectingLR = lr; break; } @@ -90,7 +95,6 @@ public class CombinerOptimizer extends T // Detected the POLocalRearrange -> POPackage pattern. Let's add // combiner if possible. - PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan; CombinerOptimizerUtil.addCombiner(rearrangePlan, to.plan, combinePlan, messageCollector, doMapAgg); if(!combinePlan.isEmpty()) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java Wed Feb 22 09:43:41 2017 @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.pig.LoadFunc; import org.apache.pig.PigConfiguration; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; @@ -65,11 +66,6 @@ public class LoaderProcessor extends Tez this.jobConf.setBoolean("mapred.mapper.new-api", true); this.jobConf.setClass("mapreduce.inputformat.class", PigInputFormat.class, InputFormat.class); - try { - this.jobConf.set("pig.pigContext", ObjectSerializer.serialize(pc)); - } catch (IOException e) { - throw new VisitorException(e); - } } /** @@ -175,6 +171,7 @@ public class LoaderProcessor extends Tez // splits can be moved to if(loads) block below int parallelism = tezOp.getLoaderInfo().getInputSplitInfo().getNumTasks(); tezOp.setRequestedParallelism(parallelism); + tezOp.setTotalInputFilesSize(InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job)); } return lds; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java Wed Feb 22 09:43:41 2017 @@ -153,6 +153,8 @@ public class MultiQueryOptimizerTez exte } } if (getPlan().getSuccessors(successor) != null) { + nonPackageInputSuccessors.clear(); + toMergeSuccessors.clear(); for (TezOperator succSuccessor : getPlan().getSuccessors(successor)) { if (succSuccessor.isUnion()) { if (!(unionOptimizerOn && @@ -171,7 +173,13 @@ public class MultiQueryOptimizerTez exte continue; } } - toMergeSuccessors.add(succSuccessor); + if (TezCompilerUtil.isNonPackageInput(successor.getOperatorKey().toString(), succSuccessor)) { + // Output goes to scalar or POFRJoinTez in the union operator + // We need to ensure it is the only one to avoid parallel edges + canMerge = canMerge ? nonPackageInputSuccessors.add(succSuccessor) : false; + } else { + toMergeSuccessors.add(succSuccessor); + } List<TezOperator> unionSuccessors = getPlan().getSuccessors(succSuccessor); if (unionSuccessors != null) { for (TezOperator unionSuccessor : unionSuccessors) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java Wed Feb 22 09:43:41 2017 @@ -115,11 +115,16 @@ public class ParallelismSetter extends T } else if (pc.defaultParallel != -1) { parallelism = pc.defaultParallel; } + if (parallelism == 0) { + // We need to produce empty output file. + // Even if user set PARALLEL 0, mapreduce has 1 reducer + parallelism = 1; + } boolean overrideRequestedParallelism = false; if (parallelism != -1 && autoParallelismEnabled - && tezOp.isIntermediateReducer() && !tezOp.isDontEstimateParallelism() + && tezOp.isIntermediateReducer() && tezOp.isOverrideIntermediateParallelism()) { overrideRequestedParallelism = true; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java Wed Feb 22 09:43:41 2017 @@ -75,7 +75,7 @@ public class SecondaryKeyOptimizerTez ex POLocalRearrangeTez connectingLR = null; PhysicalPlan rearrangePlan = from.plan; for (POLocalRearrangeTez lr : rearranges) { - if (lr.getOutputKey().equals(to.getOperatorKey().toString())) { + if (lr.containsOutputKey(to.getOperatorKey().toString())) { connectingLR = lr; break; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java Wed Feb 22 09:43:41 2017 @@ -30,6 +30,8 @@ public class TezEstimatedParallelismClea @Override public void visitTezOp(TezOperator tezOp) throws VisitorException { - tezOp.setEstimatedParallelism(-1); + if (!tezOp.isDontEstimateParallelism()) { + tezOp.setEstimatedParallelism(-1); + } } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Wed Feb 22 09:43:41 2017 @@ -62,7 +62,6 @@ import org.apache.tez.dag.api.EdgeProper */ public class TezOperDependencyParallelismEstimator implements TezParallelismEstimator { - static private int maxTaskCount; static final double DEFAULT_FLATTEN_FACTOR = 10; static final double DEFAULT_FILTER_FACTOR = 0.7; static final double DEFAULT_LIMIT_FACTOR = 0.1; @@ -76,6 +75,8 @@ public class TezOperDependencyParallelis static final double DEFAULT_AGGREGATION_FACTOR = 0.7; private PigContext pc; + private int maxTaskCount; + private long bytesPerReducer; @Override public void setPigContext(PigContext pc) { @@ -94,16 +95,18 @@ public class TezOperDependencyParallelis maxTaskCount = conf.getInt(PigReducerEstimator.MAX_REDUCER_COUNT_PARAM, PigReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM); - // If parallelism is set explicitly, respect it - if (!tezOper.isIntermediateReducer() && tezOper.getRequestedParallelism()!=-1) { - return tezOper.getRequestedParallelism(); - } + bytesPerReducer = conf.getLong(PigReducerEstimator.BYTES_PER_REDUCER_PARAM, PigReducerEstimator.DEFAULT_BYTES_PER_REDUCER); // If we have already estimated parallelism, use that one - if (tezOper.getEstimatedParallelism()!=-1) { + if (tezOper.getEstimatedParallelism() != -1) { return tezOper.getEstimatedParallelism(); } + // If parallelism is set explicitly, respect it + if (!tezOper.isIntermediateReducer() && tezOper.getRequestedParallelism()!=-1) { + return tezOper.getRequestedParallelism(); + } + List<TezOperator> preds = plan.getPredecessors(tezOper); if (preds==null) { throw new IOException("Cannot estimate parallelism for source vertex"); @@ -130,6 +133,12 @@ public class TezOperDependencyParallelis boolean applyFactor = !tezOper.isUnion(); if (!pred.isVertexGroup() && applyFactor) { predParallelism = predParallelism * pred.getParallelismFactor(tezOper); + if (pred.getTotalInputFilesSize() > 0) { + // Estimate similar to mapreduce and use the maximum of two + int parallelismBySize = (int) Math.ceil((double) pred + .getTotalInputFilesSize() / bytesPerReducer); + predParallelism = Math.max(predParallelism, parallelismBySize); + } } estimatedParallelism += predParallelism; } @@ -157,9 +166,7 @@ public class TezOperDependencyParallelis } if (roundedEstimatedParallelism == 0) { - throw new IOException("Estimated parallelism for " - + tezOper.getOperatorKey().toString() - + " is 0 which is unexpected"); + roundedEstimatedParallelism = 1; // We need to produce empty output file } return roundedEstimatedParallelism; @@ -196,7 +203,7 @@ public class TezOperDependencyParallelis if (successor != null) { // Map side combiner TezEdgeDescriptor edge = tezOp.outEdges.get(successor.getOperatorKey()); - if (!edge.combinePlan.isEmpty()) { + if (!edge.combinePlan.isEmpty() || edge.needsDistinctCombiner()) { if (successor.isDistinct()) { factor = DEFAULT_DISTINCT_FACTOR; } else { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java Wed Feb 22 09:43:41 2017 @@ -29,6 +29,7 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.PigConfiguration; +import org.apache.pig.StoreFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; @@ -44,6 +45,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput; import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil; @@ -52,7 +54,6 @@ import org.apache.pig.builtin.AvroStorag import org.apache.pig.builtin.JsonStorage; import org.apache.pig.builtin.OrcStorage; import org.apache.pig.builtin.PigStorage; -import org.apache.pig.builtin.RoundRobinPartitioner; import org.apache.pig.builtin.mock.Storage; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.PlanException; @@ -108,6 +109,12 @@ public class UnionOptimizer extends TezO if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && tezOp.getRequestedParallelism() == 1) { return false; } + + // If user has specified a PARALLEL clause with the union operator + // turn off union optimization + if (tezOp.getRequestedParallelism() != -1) { + return false; + } // Two vertices separately ranking with 1 to n and writing to output directly // will make each rank repeate twice which is wrong. Rank always needs to be // done from single vertex to have the counting correct. @@ -120,10 +127,25 @@ public class UnionOptimizer extends TezO public static boolean isOptimizableStoreFunc(TezOperator tezOp, List<String> supportedStoreFuncs, List<String> unsupportedStoreFuncs) throws VisitorException { - if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) { - List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class); - for (POStoreTez store : stores) { - String name = store.getStoreFunc().getClass().getName(); + List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class); + + for (POStoreTez store : stores) { + String name = store.getStoreFunc().getClass().getName(); + if (store.getStoreFunc() instanceof StoreFunc) { + StoreFunc func = (StoreFunc) store.getStoreFunc(); + if (func.supportsParallelWriteToStoreLocation() != null) { + if (func.supportsParallelWriteToStoreLocation()) { + continue; + } else { + LOG.warn(name + " does not support union optimization." + + " Disabling it. There will be some performance degradation."); + return false; + } + } + } + // If StoreFunc does not explicitly state support, then check supported and + // unsupported config settings. + if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) { if (unsupportedStoreFuncs != null && unsupportedStoreFuncs.contains(name)) { return false; @@ -237,8 +259,23 @@ public class UnionOptimizer extends TezO for (TezOperator succ : successors) { if (succ.isVertexGroup() && unionStoreOutputs.get(i).getSFile().equals(succ.getVertexGroupInfo().getSFile())) { existingVertexGroup = succ; + break; + } + } + } + if (existingVertexGroup == null) { + // In the case of union + split + union + store, the different stores in the Split + // will be writing to same location after second union operator is optimized. + // So while optimizing the first union, we should just make it write to one vertex group + for (int j = 0; j < i; j++) { + if (unionStoreOutputs.get(i).getSFile().equals(storeVertexGroupOps[j].getVertexGroupInfo().getSFile())) { + storeVertexGroupOps[i] = storeVertexGroupOps[j]; + break; } } + if (storeVertexGroupOps[i] != null) { + continue; + } } if (existingVertexGroup != null) { storeVertexGroupOps[i] = existingVertexGroup; @@ -270,6 +307,15 @@ public class UnionOptimizer extends TezO TezOperator[] outputVertexGroupOps = new TezOperator[unionOutputKeys.size()]; String[] newOutputKeys = new String[unionOutputKeys.size()]; for (int i=0; i < outputVertexGroupOps.length; i++) { + for (int j = 0; j < i; j++) { + if (unionOutputKeys.get(i).equals(unionOutputKeys.get(j))) { + outputVertexGroupOps[i] = outputVertexGroupOps[j]; + break; + } + } + if (outputVertexGroupOps[i] != null) { + continue; + } outputVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope)); outputVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo()); outputVertexGroupOps[i].getVertexGroupInfo().setOutput(unionOutputKeys.get(i)); @@ -515,15 +561,24 @@ public class UnionOptimizer extends TezO // Connect predecessor to the storeVertexGroups int i = 0; for (TezOperator storeVertexGroup : storeVertexGroupOps) { + // Skip connecting if they are already connected. Can happen in case of + // union + split + union + store. Because of the split all the stores + // will be writing to same location + List<OperatorKey> inputs = storeVertexGroup.getVertexGroupInfo().getInputs(); + if (inputs == null || !inputs.contains(pred.getOperatorKey())) { + tezPlan.connect(pred, storeVertexGroup); + } storeVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey()); pred.addVertexGroupStore(clonedUnionStoreOutputs.get(i++).getOperatorKey(), storeVertexGroup.getOperatorKey()); - tezPlan.connect(pred, storeVertexGroup); } for (TezOperator outputVertexGroup : outputVertexGroupOps) { + List<OperatorKey> inputs = outputVertexGroup.getVertexGroupInfo().getInputs(); + if (inputs == null || !inputs.contains(pred.getOperatorKey())) { + tezPlan.connect(pred, outputVertexGroup); + } outputVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey()); - tezPlan.connect(pred, outputVertexGroup); } copyOperatorProperties(pred, unionOp); @@ -568,7 +623,7 @@ public class UnionOptimizer extends TezO // more union predecessors. Change it to SCATTER_GATHER if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) { edge.dataMovementType = DataMovementType.SCATTER_GATHER; - edge.partitionerClass = RoundRobinPartitioner.class; + edge.partitionerClass = HashValuePartitioner.class; edge.outputClassName = UnorderedPartitionedKVOutput.class.getName(); edge.inputClassName = UnorderedKVInput.class.getName(); } Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java?rev=1783988&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java (added) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java Wed Feb 22 09:43:41 2017 @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.tez.runtime; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.NullableTuple; + +public class HashValuePartitioner extends Partitioner<Writable, Writable> { + + @SuppressWarnings("rawtypes") + @Override + public int getPartition(Writable key, Writable value, int numPartitions) { + int hash = 17; + Tuple tuple; + if (value instanceof Tuple) { + // union optimizer turned off + tuple = (Tuple) value; + } else { + // union followed by order by or skewed join + tuple = (Tuple)((NullableTuple) value).getValueAsPigType(); + } + if (tuple != null) { + for (Object o : tuple.getAll()) { + if (o != null) { + // Skip computing hashcode for bags. + // Order of elements in the map/bag may be different on each run + // Can't even include size as some DataBag implementations + // iterate through all elements in the bag to get the size. + if (o instanceof DataBag) { + hash = 31 * hash; + } else { + hash = 31 * hash + o.hashCode(); + } + } + } + } + return (hash & Integer.MAX_VALUE) % numPartitions; + } + +} \ No newline at end of file Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java Wed Feb 22 09:43:41 2017 @@ -17,23 +17,25 @@ */ package org.apache.pig.backend.hadoop.executionengine.tez.runtime; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.VertexManagerPlugin; import org.apache.tez.dag.api.VertexManagerPluginContext; import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint; -import org.apache.tez.dag.app.dag.impl.ScatterGatherEdgeManager; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.events.VertexManagerEvent; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; /** @@ -46,8 +48,13 @@ import com.google.common.collect.Lists; public class PartitionerDefinedVertexManager extends VertexManagerPlugin { private static final Log LOG = LogFactory.getLog(PartitionerDefinedVertexManager.class); - private boolean isParallelismSet = false; + private volatile boolean parallelismSet; private int dynamicParallelism = -1; + private int numConfiguredSources; + private int numSources = -1; + private volatile boolean configured; + private volatile boolean started; + private volatile boolean scheduled; public PartitionerDefinedVertexManager(VertexManagerPluginContext context) { super(context); @@ -55,7 +62,31 @@ public class PartitionerDefinedVertexMan @Override public void initialize() { - // Nothing to do + // this will prevent vertex from starting until we notify we are done + getContext().vertexReconfigurationPlanned(); + parallelismSet = false; + numConfiguredSources = 0; + configured = false; + started = false; + numSources = getContext().getInputVertexEdgeProperties().size(); + // wait for sources and self to start + Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties(); + for (String entry : edges.keySet()) { + getContext().registerForVertexStateUpdates(entry, EnumSet.of(VertexState.CONFIGURED)); + } + } + + @Override + public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) + throws Exception { + numConfiguredSources++; + LOG.info("For vertex: " + getContext().getVertexName() + " Received configured signal from: " + + stateUpdate.getVertexName() + " numConfiguredSources: " + numConfiguredSources + + " needed: " + numSources); + Preconditions.checkState(numConfiguredSources <= numSources, "Vertex: " + getContext().getVertexName()); + if (numConfiguredSources == numSources) { + configure(); + } } @Override @@ -73,10 +104,9 @@ public class PartitionerDefinedVertexMan public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception { // There could be multiple partition vertex sending VertexManagerEvent // Only need to setVertexParallelism once - if (isParallelismSet) { + if (parallelismSet) { return; } - isParallelismSet = true; // Need to distinguish from VertexManagerEventPayloadProto emitted by OrderedPartitionedKVOutput if (vmEvent.getUserPayload().limit()==4) { dynamicParallelism = vmEvent.getUserPayload().getInt(); @@ -96,18 +126,50 @@ public class PartitionerDefinedVertexMan edgeManagers.put(entry.getKey(), edge); } getContext().reconfigureVertex(dynamicParallelism, null, edgeManagers); + parallelismSet = true; + configure(); } } } - @Override - public void onVertexStarted(Map<String, List<Integer>> completions) { - if (dynamicParallelism != -1) { + private void configure() { + if(parallelismSet && (numSources == numConfiguredSources)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Done reconfiguring vertex " + getContext().getVertexName()); + } + getContext().doneReconfiguringVertex(); + configured = true; + trySchedulingTasks(); + } + } + + private synchronized void trySchedulingTasks() { + if (configured && started && !scheduled) { + LOG.info("Scheduling " + dynamicParallelism + " tasks for vertex " + getContext().getVertexName()); List<TaskWithLocationHint> tasksToStart = Lists.newArrayListWithCapacity(dynamicParallelism); - for (int i=0; i<dynamicParallelism; ++i) { + for (int i = 0; i < dynamicParallelism; ++i) { tasksToStart.add(new TaskWithLocationHint(new Integer(i), null)); } getContext().scheduleVertexTasks(tasksToStart); + scheduled = true; } } + + @Override + public void onVertexStarted(Map<String, List<Integer>> completions) { + // This vertex manager will be getting the following calls + // 1) onVertexManagerEventReceived - Parallelism vertex manager event sent by sample aggregator vertex + // 2) onVertexStateUpdated - Vertex CONFIGURED status updates from + // - Order by Partitioner vertex (1-1) in case of Order by + // - Skewed Join Left Partitioner (1-1) and Right Input Vertices in case of SkewedJoin + // 3) onVertexStarted + // Calls 2) and 3) can happen in any order. So we should schedule tasks + // only after start is called and configuration is also complete + started = true; + if (LOG.isDebugEnabled()) { + LOG.debug("Vertex start received for " + getContext().getVertexName()); + } + trySchedulingTasks(); + } + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java Wed Feb 22 09:43:41 2017 @@ -33,15 +33,15 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter; import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.TezEstimatedParallelismClearer; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.EdgeProperty; -import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.VertexManagerPluginContext; -import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; @@ -72,7 +72,7 @@ public class PigGraceShuffleVertexManage conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); bytesPerTask = conf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER); - pc = (PigContext)ObjectSerializer.deserialize(conf.get("pig.pigContext")); + pc = (PigContext)ObjectSerializer.deserialize(conf.get(PigImplConstants.PIG_CONTEXT)); tezPlan = (TezOperPlan)ObjectSerializer.deserialize(conf.get("pig.tez.plan")); TezEstimatedParallelismClearer clearer = new TezEstimatedParallelismClearer(tezPlan); try { @@ -81,9 +81,10 @@ public class PigGraceShuffleVertexManage throw new TezUncheckedException(e); } TezOperator op = tezPlan.getOperator(OperatorKey.fromString(getContext().getVertexName())); - + // Collect grandparents of the vertex - Function<TezOperator, String> tezOpToString = new Function<TezOperator, String>() { + Function<TezOperator, String> tezOpToString = new Function<TezOperator, String>() { + @Override public String apply(TezOperator op) { return op.getOperatorKey().toString(); } }; grandParents = Lists.transform(TezOperPlan.getGrandParentsForGraceParallelism(tezPlan, op), tezOpToString); @@ -135,7 +136,7 @@ public class PigGraceShuffleVertexManage // Now one of the predecessor is about to start, we need to make a decision now if (anyPredAboutToStart) { // All grandparents finished, start parents with right parallelism - + for (TezOperator pred : preds) { if (pred.getRequestedParallelism()==-1) { List<TezOperator> predPreds = tezPlan.getPredecessors(pred); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java Wed Feb 22 09:43:41 2017 @@ -25,6 +25,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Properties; import java.util.Set; import org.apache.commons.logging.Log; @@ -32,6 +33,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.log4j.PropertyConfigurator; import org.apache.pig.JVMReuseImpl; import org.apache.pig.PigConstants; import org.apache.pig.PigException; @@ -39,6 +41,7 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ProgressableReporter; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor; @@ -53,6 +56,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.data.SchemaTupleBackend; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.plan.DependencyOrderWalker; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.ObjectSerializer; @@ -132,7 +136,11 @@ public class PigProcessor extends Abstra SpillableMemoryManager.getInstance().configure(conf); PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer .deserialize(conf.get("udf.import.list"))); - PigContext pc = (PigContext) ObjectSerializer.deserialize(conf.get("pig.pigContext")); + Properties log4jProperties = (Properties) ObjectSerializer + .deserialize(conf.get(PigImplConstants.PIG_LOG4J_PROPERTIES)); + if (log4jProperties != null) { + PropertyConfigurator.configure(log4jProperties); + } // To determine front-end in UDFContext conf.set(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, getContext().getUniqueIdentifier()); @@ -151,6 +159,12 @@ public class PigProcessor extends Abstra conf.setInt(JobContext.TASK_PARTITION, taskAttemptId.getTaskID().getId()); conf.set(JobContext.ID, taskAttemptId.getJobID().toString()); + if (conf.get(PigInputFormat.PIG_INPUT_LIMITS) != null) { + // Has Load and is a root vertex + conf.setInt(JobContext.NUM_MAPS, getContext().getVertexParallelism()); + } else { + conf.setInt(JobContext.NUM_REDUCES, getContext().getVertexParallelism()); + } conf.set(PigConstants.TASK_INDEX, Integer.toString(getContext().getTaskIndex())); UDFContext.getUDFContext().addJobConf(conf); @@ -158,7 +172,7 @@ public class PigProcessor extends Abstra String execPlanString = conf.get(PLAN); execPlan = (PhysicalPlan) ObjectSerializer.deserialize(execPlanString); - SchemaTupleBackend.initialize(conf, pc); + SchemaTupleBackend.initialize(conf); PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new org.apache.hadoop.mapreduce.JobID()); // Set the job conf as a thread-local member of PigMapReduce @@ -167,7 +181,7 @@ public class PigProcessor extends Abstra Utils.setDefaultTimeZone(conf); - boolean aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning")); + boolean aggregateWarning = "true".equalsIgnoreCase(conf.get("aggregate.warning")); PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance(); pigStatusReporter.setContext(new TezTaskContext(getContext())); pigHadoopLogger = PigHadoopLogger.getInstance(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java Wed Feb 22 09:43:41 2017 @@ -43,6 +43,15 @@ public interface TezInput { */ public void addInputsToSkip(Set<String> inputsToSkip); + /** + * Attach the inputs to the operator. Also ensure reader.next() is called to force fetch + * the input so that all inputs are fetched and memory released before memory is allocated + * for outputs + * + * @param inputs available inputs + * @param conf configuration + * @throws ExecException + */ public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf) throws ExecException; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java Wed Feb 22 09:43:41 2017 @@ -23,6 +23,7 @@ import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner; import org.apache.pig.data.DataBag; @@ -30,6 +31,7 @@ import org.apache.pig.data.InternalMap; import org.apache.pig.data.Tuple; import org.apache.pig.impl.builtin.FindQuantiles; import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.pig.impl.util.UDFContext; import org.apache.tez.runtime.library.common.ConfigUtils; public class WeightedRangePartitionerTez extends WeightedRangePartitioner { @@ -64,11 +66,13 @@ public class WeightedRangePartitionerTez InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS); estimatedNumPartitions = (Integer)quantileMap.get(PigProcessor.ESTIMATED_NUM_PARALLELISM); convertToArray(quantilesList); + long taskIdHashCode = UDFContext.getUDFContext().getJobConf().get(JobContext.TASK_ID).hashCode(); + long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL); for (Entry<Object, Object> ent : weightedPartsData.entrySet()) { Tuple key = (Tuple) ent.getKey(); // sample item which repeats float[] probVec = getProbVec((Tuple) ent.getValue()); weightedParts.put(getPigNullableWritable(key), - new DiscreteProbabilitySampleGenerator(probVec)); + new DiscreteProbabilitySampleGenerator(randomSeed, probVec)); } } catch (Exception e) { throw new RuntimeException(e); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java Wed Feb 22 09:43:41 2017 @@ -50,6 +50,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.classification.InterfaceAudience; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.apache.tez.mapreduce.hadoop.DeprecatedKeys; import org.apache.tez.mapreduce.hadoop.InputSplitInfo; import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk; @@ -102,7 +103,6 @@ public class MRToTezHelper { mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE, TezConfiguration.TEZ_AM_SPECULATION_ENABLED); mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL); mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit", "tez.am.vertex.max-task-concurrency"); - mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", "tez.am.vertex.max-task-concurrency"); mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.am.progress.stuck.interval-ms"); } @@ -165,11 +165,7 @@ public class MRToTezHelper { continue; } } - if (key.startsWith("dfs.datanode")) { - tezConf.unset(key); - } else if (key.startsWith("dfs.namenode")) { - tezConf.unset(key); - } else if (key.startsWith("yarn.nodemanager")) { + if (key.startsWith("yarn.nodemanager")) { tezConf.unset(key); } else if (key.startsWith("mapreduce.jobhistory")) { tezConf.unset(key); @@ -181,20 +177,15 @@ public class MRToTezHelper { } } - public static TezConfiguration getDAGAMConfFromMRConf( - Configuration tezConf) { - - // Set Tez parameters based on MR parameters. - TezConfiguration dagAMConf = new TezConfiguration(tezConf); - + public static void translateMRSettingsForTezAM(TezConfiguration dagAMConf) { convertMRToTezConf(dagAMConf, dagAMConf, DeprecatedKeys.getMRToDAGParamMap()); convertMRToTezConf(dagAMConf, dagAMConf, mrAMParamToTezAMParamMap); - String env = tezConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV); - if (tezConf.get(MRJobConfig.MR_AM_ENV) != null) { - env = (env == null) ? tezConf.get(MRJobConfig.MR_AM_ENV) - : env + "," + tezConf.get(MRJobConfig.MR_AM_ENV); + String env = dagAMConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV); + if (dagAMConf.get(MRJobConfig.MR_AM_ENV) != null) { + env = (env == null) ? dagAMConf.get(MRJobConfig.MR_AM_ENV) + : env + "," + dagAMConf.get(MRJobConfig.MR_AM_ENV); } if (env != null) { @@ -203,24 +194,23 @@ public class MRToTezHelper { dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, org.apache.tez.mapreduce.hadoop.MRHelpers - .getJavaOptsForMRAM(tezConf)); + .getJavaOptsForMRAM(dagAMConf)); - String queueName = tezConf.get(JobContext.QUEUE_NAME, + String queueName = dagAMConf.get(JobContext.QUEUE_NAME, YarnConfiguration.DEFAULT_QUEUE_NAME); dagAMConf.setIfUnset(TezConfiguration.TEZ_QUEUE_NAME, queueName); dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_VIEW_ACLS, - tezConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB)); + dagAMConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB)); dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MODIFY_ACLS, - tezConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB)); + dagAMConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB)); // Hardcoding at AM level instead of setting per vertex till TEZ-2710 is available dagAMConf.setIfUnset(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, "0.5"); removeUnwantedSettings(dagAMConf, true); - return dagAMConf; } /** @@ -263,6 +253,14 @@ public class MRToTezHelper { JobControlCompiler.configureCompression(tezConf); convertMRToTezConf(tezConf, mrConf, DeprecatedKeys.getMRToTezRuntimeParamMap()); removeUnwantedSettings(tezConf, false); + + // ShuffleVertexManager Plugin settings + // DeprecatedKeys.getMRToTezRuntimeParamMap() only translates min and not max + String slowStartFraction = mrConf.get(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART); + if (slowStartFraction != null) { + tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, slowStartFraction); + tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, slowStartFraction); + } } /** Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Wed Feb 22 09:43:41 2017 @@ -36,13 +36,14 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POFRJoinTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput; -import org.apache.pig.builtin.RoundRobinPartitioner; import org.apache.pig.builtin.TOBAG; import org.apache.pig.data.DataType; import org.apache.pig.data.TupleFactory; @@ -198,8 +199,8 @@ public class TezCompilerUtil { public static boolean isNonPackageInput(String inputKey, TezOperator tezOp) throws PlanException { try { - List<TezInput> inputs = PlanHelper.getPhysicalOperators(tezOp.plan, TezInput.class); - for (TezInput input : inputs) { + List<POFRJoinTez> inputs = PlanHelper.getPhysicalOperators(tezOp.plan, POFRJoinTez.class); + for (POFRJoinTez input : inputs) { if (ArrayUtils.contains(input.getTezInputs(), inputKey)) { return true; } @@ -269,7 +270,7 @@ public class TezCompilerUtil { } else if (dataMovementType == DataMovementType.SCATTER_GATHER) { edge.outputClassName = UnorderedPartitionedKVOutput.class.getName(); edge.inputClassName = UnorderedKVInput.class.getName(); - edge.partitionerClass = RoundRobinPartitioner.class; + edge.partitionerClass = HashValuePartitioner.class; } edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName()); edge.setIntermediateOutputValueClass(TUPLE_CLASS);