Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
 Wed Feb 22 09:43:41 2017
@@ -56,6 +56,7 @@ public class POCounterStatsTez extends P
     private transient KeyValuesReader reader;
     private transient KeyValueWriter writer;
     private transient boolean finished = false;
+    private transient boolean hasNext = false;
 
     public POCounterStatsTez(OperatorKey k) {
         super(k);
@@ -88,6 +89,7 @@ public class POCounterStatsTez extends P
         try {
             reader = (KeyValuesReader) input.getReader();
             LOG.info("Attached input from vertex " + inputKey + " : input=" + 
input + ", reader=" + reader);
+            hasNext = reader.next();
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -130,12 +132,13 @@ public class POCounterStatsTez extends P
             Integer key = null;
             Long value = null;
             // Read count of records per task
-            while (reader.next()) {
+            while (hasNext) {
                 key = ((IntWritable)reader.getCurrentKey()).get();
                 for (Object val : reader.getCurrentValues()) {
                     value = ((LongWritable)val).get();
                     counterRecords.put(key, value);
                 }
+                hasNext = reader.next();
             }
 
             // BinInterSedes only takes String for map key

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
 Wed Feb 22 09:43:41 2017
@@ -19,6 +19,8 @@
 package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -101,9 +103,13 @@ public class POFRJoinTez extends POFRJoi
                 LogicalInput input = inputs.get(key);
                 if (!this.replInputs.contains(input)) {
                     this.replInputs.add(input);
-                    this.replReaders.add((KeyValueReader) input.getReader());
+                    KeyValueReader reader = (KeyValueReader) input.getReader();
+                    this.replReaders.add(reader);
+                    log.info("Attached input from vertex " + key + " : input=" 
+ input + ", reader=" + reader);
                 }
             }
+            // Do not force fetch input by reading first record. Cases like 
MultiQuery_Union_4 have
+            // multiple POFRJoinTez loading same replicate input and will skip 
records
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -114,6 +120,7 @@ public class POFRJoinTez extends POFRJoi
      *
      * @throws ExecException
      */
+    @SuppressWarnings("unchecked")
     @Override
     protected void setUpHashMap() throws ExecException {
 
@@ -121,8 +128,8 @@ public class POFRJoinTez extends POFRJoi
         // where same POFRJoinTez occurs in different Split sub-plans
         Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
         if (cacheValue != null) {
-            replicates = (TupleToMapKey[]) cacheValue;
-            log.info("Found " + (replicates.length - 1) + " replication hash 
tables in Tez cache. cachekey=" + cacheKey);
+            replicates =  (List<Map<? extends Object, ? extends List<Tuple>>>) 
cacheValue;
+            log.info("Found " + (replicates.size() - 1) + " replication hash 
tables in Tez cache. cachekey=" + cacheKey);
             return;
         }
 
@@ -148,7 +155,7 @@ public class POFRJoinTez extends POFRJoi
 
         long time1 = System.currentTimeMillis();
 
-        replicates[fragment] = null;
+        replicates.set(fragment, null);
         int inputIdx = 0;
         // We need to adjust the index because the number of replInputs is
         // one less than the number of inputSchemas. The inputSchemas
@@ -158,7 +165,12 @@ public class POFRJoinTez extends POFRJoi
             SchemaTupleFactory inputSchemaTupleFactory = 
inputSchemaTupleFactories[schemaIdx];
             SchemaTupleFactory keySchemaTupleFactory = 
keySchemaTupleFactories[schemaIdx];
 
-            TupleToMapKey replicate = new TupleToMapKey(4000, 
keySchemaTupleFactory);
+            Map<Object, ArrayList<Tuple>> replicate;
+            if (keySchemaTupleFactory == null) {
+                replicate = new HashMap<Object, ArrayList<Tuple>>(4000);
+            } else {
+                replicate = new TupleToMapKey(4000, keySchemaTupleFactory);
+            }
             POLocalRearrange lr = LRs[schemaIdx];
 
             try {
@@ -168,7 +180,8 @@ public class POFRJoinTez extends POFRJoi
                     }
 
                     PigNullableWritable key = (PigNullableWritable) 
replReaders.get(inputIdx).getCurrentKey();
-                    if (isKeyNull(key.getValueAsPigType())) continue;
+                    Object keyValue = key.getValueAsPigType();
+                    if (isKeyNull(keyValue)) continue;
                     NullableTuple val = (NullableTuple) 
replReaders.get(inputIdx).getCurrentValue();
 
                     // POFRJoin#getValueTuple() is reused to construct 
valTuple,
@@ -176,27 +189,31 @@ public class POFRJoinTez extends POFRJoi
                     // construct one here.
                     Tuple retTuple = mTupleFactory.newTuple(3);
                     retTuple.set(0, key.getIndex());
-                    retTuple.set(1, key.getValueAsPigType());
+                    retTuple.set(1, keyValue);
                     retTuple.set(2, val.getValueAsPigType());
                     Tuple valTuple = getValueTuple(lr, retTuple);
 
-                    Tuple keyTuple = mTupleFactory.newTuple(1);
-                    keyTuple.set(0, key.getValueAsPigType());
-                    if (replicate.get(keyTuple) == null) {
-                        replicate.put(keyTuple, new TuplesToSchemaTupleList(1, 
inputSchemaTupleFactory));
+                    ArrayList<Tuple> values = replicate.get(keyValue);
+                    if (values == null) {
+                        if (inputSchemaTupleFactory == null) {
+                            values = new ArrayList<Tuple>(1);
+                        } else {
+                            values = new TuplesToSchemaTupleList(1, 
inputSchemaTupleFactory);
+                        }
+                        replicate.put(keyValue, values);
                     }
-                    replicate.get(keyTuple).add(valTuple);
+                    values.add(valTuple);
                 }
             } catch (IOException e) {
                 throw new ExecException(e);
             }
-            replicates[schemaIdx] = replicate;
+            replicates.set(schemaIdx, replicate);
             inputIdx++;
             schemaIdx++;
         }
 
         long time2 = System.currentTimeMillis();
-        log.info((replicates.length - 1) + " replication hash tables built. 
Time taken: " + (time2 - time1));
+        log.info((replicates.size() - 1) + " replication hash tables built. 
Time taken: " + (time2 - time1));
 
         ObjectCache.getInstance().cache(cacheKey, replicates);
         log.info("Cached replicate hash tables in Tez ObjectRegistry with 
vertex scope. cachekey=" + cacheKey);

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
 Wed Feb 22 09:43:41 2017
@@ -57,6 +57,7 @@ public class POIdentityInOutTez extends
     private transient KeyValuesReader shuffleReader;
     private transient boolean shuffleInput;
     private transient boolean finished = false;
+    private transient boolean hasNext = false;
 
     public POIdentityInOutTez(OperatorKey k, POLocalRearrange inputRearrange, 
String inputKey) {
         super(inputRearrange);
@@ -95,9 +96,12 @@ public class POIdentityInOutTez extends
             Reader r = input.getReader();
             if (r instanceof KeyValueReader) {
                 reader = (KeyValueReader) r;
+                // Force input fetch
+                hasNext = reader.next();
             } else {
                 shuffleInput = true;
                 shuffleReader = (KeyValuesReader) r;
+                hasNext = shuffleReader.next();
             }
             LOG.info("Attached input from vertex " + inputKey + " : input=" + 
input + ", reader=" + r);
         } catch (Exception e) {
@@ -127,7 +131,7 @@ public class POIdentityInOutTez extends
                 return RESULT_EOP;
             }
             if (shuffleInput) {
-                while (shuffleReader.next()) {
+                while (hasNext) {
                     Object curKey = shuffleReader.getCurrentKey();
                     Iterable<Object> vals = shuffleReader.getCurrentValues();
                     if (isSkewedJoin) {
@@ -139,9 +143,10 @@ public class POIdentityInOutTez extends
                     for (Object val : vals) {
                         writer.write(curKey, val);
                     }
+                    hasNext = shuffleReader.next();
                 }
             } else {
-                while (reader.next()) {
+                while (hasNext) {
                     if (isSkewedJoin) {
                         NullablePartitionWritable wrappedKey = new 
NullablePartitionWritable(
                                 (PigNullableWritable) reader.getCurrentKey());
@@ -155,6 +160,7 @@ public class POIdentityInOutTez extends
                         writer.write(reader.getCurrentKey(),
                                 reader.getCurrentValue());
                     }
+                    hasNext = reader.next();
                 }
             }
             finished = true;

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
 Wed Feb 22 09:43:41 2017
@@ -71,8 +71,8 @@ public class POLocalRearrangeTez extends
         }
     }
 
-    public String getOutputKey() {
-        return outputKey;
+    public boolean containsOutputKey(String key) {
+        return outputKey.equals(key);
     }
 
     public void setOutputKey(String outputKey) {
@@ -122,6 +122,10 @@ public class POLocalRearrangeTez extends
         }
     }
 
+    protected Result getRearrangedTuple() throws ExecException {
+        return super.getNextTuple();
+    }
+
     @Override
     public Result getNextTuple() throws ExecException {
         res = super.getNextTuple();

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java
 Wed Feb 22 09:43:41 2017
@@ -51,6 +51,7 @@ public class PORankTez extends PORank im
     private transient Map<Integer, Long> counterOffsets;
     private transient Configuration conf;
     private transient boolean finished = false;
+    private transient Boolean hasFirstRecord;
 
     public PORankTez(PORank copy) {
         super(copy);
@@ -100,6 +101,7 @@ public class PORankTez extends PORank im
         try {
             reader = (KeyValueReader) input.getReader();
             LOG.info("Attached input from vertex " + tuplesInputKey + " : 
input=" + input + ", reader=" + reader);
+            hasFirstRecord = reader.next();
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -140,9 +142,18 @@ public class PORankTez extends PORank im
         Result inp = null;
 
         try {
-            while (reader.next()) {
-                inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
-                return addRank(inp);
+            if (hasFirstRecord != null) {
+                if (hasFirstRecord) {
+                    hasFirstRecord = null;
+                    inp = new Result(POStatus.STATUS_OK, 
reader.getCurrentValue());
+                    return addRank(inp);
+                }
+                hasFirstRecord = null;
+            } else {
+                while (reader.next()) {
+                    inp = new Result(POStatus.STATUS_OK, 
reader.getCurrentValue());
+                    return addRank(inp);
+                }
             }
         } catch (IOException e) {
             throw new ExecException(e);

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
 Wed Feb 22 09:43:41 2017
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -32,12 +34,16 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.AccumulativeTupleBuffer;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import 
org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
 import org.apache.pig.data.AccumulativeBag;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.ReadOnceBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -48,6 +54,7 @@ import org.apache.tez.runtime.library.co
 public class POShuffleTezLoad extends POPackage implements TezInput {
 
     private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(POShuffleTezLoad.class);
 
     protected List<String> inputKeys = new ArrayList<String>();
     private boolean isSkewedJoin = false;
@@ -61,6 +68,7 @@ public class POShuffleTezLoad extends PO
     private transient WritableComparator groupingComparator = null;
     private transient Configuration conf;
     private transient int accumulativeBatchSize;
+    private transient boolean readOnceOneBag;
 
     public POShuffleTezLoad(POPackage pack) {
         super(pack);
@@ -101,7 +109,10 @@ public class POShuffleTezLoad extends PO
                 //     - Input key will be repeated, but index would be same 
within a TezInput
                 if (!this.inputs.contains(input)) {
                     this.inputs.add(input);
-                    this.readers.add((KeyValuesReader)input.getReader());
+                    KeyValuesReader reader = 
(KeyValuesReader)input.getReader();
+                    this.readers.add(reader);
+                    LOG.info("Attached input from vertex " + inputKey
+                            + " : input=" + input + ", reader=" + reader);
                 }
             }
 
@@ -117,6 +128,13 @@ public class POShuffleTezLoad extends PO
             for (int i = 0; i < numTezInputs; i++) {
                 finished[i] = !readers.get(i).next();
             }
+
+            this.readOnceOneBag = (numInputs == 1)
+                    && (pkgr instanceof CombinerPackager
+                            || pkgr instanceof LitePackager || pkgr instanceof 
BloomPackager);
+            if (readOnceOneBag) {
+                readOnce[0] = true;
+            }
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -187,43 +205,47 @@ public class POShuffleTezLoad extends PO
 
                 } else {
 
-                    for (int i = 0; i < numInputs; i++) {
-                        bags[i] = new InternalCachedBag(numInputs);
-                    }
-
-                    if (numTezInputs == 1) {
-                        do {
-                            Iterable<Object> vals = 
readers.get(0).getCurrentValues();
-                            for (Object val : vals) {
-                                NullableTuple nTup = (NullableTuple) val;
-                                int index = nTup.getIndex();
-                                Tuple tup = pkgr.getValueTuple(keyWritable, 
nTup, index);
-                                bags[index].add(tup);
-                            }
-                            finished[0] = !readers.get(0).next();
-                            if (finished[0]) {
-                                break;
-                            }
-                            cur = readers.get(0).getCurrentKey();
-                        } while (groupingComparator.compare(min, cur) == 0); 
// We need to loop in case of Grouping Comparators
+                    if (readOnceOneBag) {
+                        bags[0] = new TezReadOnceBag(pkgr, min);
                     } else {
-                        for (int i = 0; i < numTezInputs; i++) {
-                            if (!finished[i]) {
-                                cur = readers.get(i).getCurrentKey();
-                                // We need to loop in case of Grouping 
Comparators
-                                while (groupingComparator.compare(min, cur) == 
0) {
-                                    Iterable<Object> vals = 
readers.get(i).getCurrentValues();
-                                    for (Object val : vals) {
-                                        NullableTuple nTup = (NullableTuple) 
val;
-                                        int index = nTup.getIndex();
-                                        Tuple tup = 
pkgr.getValueTuple(keyWritable, nTup, index);
-                                        bags[index].add(tup);
-                                    }
-                                    finished[i] = !readers.get(i).next();
-                                    if (finished[i]) {
-                                        break;
-                                    }
+                        for (int i = 0; i < numInputs; i++) {
+                            bags[i] = new InternalCachedBag(numInputs);
+                        }
+
+                        if (numTezInputs == 1) {
+                            do {
+                                Iterable<Object> vals = 
readers.get(0).getCurrentValues();
+                                for (Object val : vals) {
+                                    NullableTuple nTup = (NullableTuple) val;
+                                    int index = nTup.getIndex();
+                                    Tuple tup = 
pkgr.getValueTuple(keyWritable, nTup, index);
+                                    bags[index].add(tup);
+                                }
+                                finished[0] = !readers.get(0).next();
+                                if (finished[0]) {
+                                    break;
+                                }
+                                cur = readers.get(0).getCurrentKey();
+                            } while (groupingComparator.compare(min, cur) == 
0); // We need to loop in case of Grouping Comparators
+                        } else {
+                            for (int i = 0; i < numTezInputs; i++) {
+                                if (!finished[i]) {
                                     cur = readers.get(i).getCurrentKey();
+                                    // We need to loop in case of Grouping 
Comparators
+                                    while (groupingComparator.compare(min, 
cur) == 0) {
+                                        Iterable<Object> vals = 
readers.get(i).getCurrentValues();
+                                        for (Object val : vals) {
+                                            NullableTuple nTup = 
(NullableTuple) val;
+                                            int index = nTup.getIndex();
+                                            Tuple tup = 
pkgr.getValueTuple(keyWritable, nTup, index);
+                                            bags[index].add(tup);
+                                        }
+                                        finished[i] = !readers.get(i).next();
+                                        if (finished[i]) {
+                                            break;
+                                        }
+                                        cur = readers.get(i).getCurrentKey();
+                                    }
                                 }
                             }
                         }
@@ -383,4 +405,74 @@ public class POShuffleTezLoad extends PO
 
     }
 
+    private class TezReadOnceBag extends ReadOnceBag {
+
+        private static final long serialVersionUID = 1L;
+        private Iterator<Object> iter;
+
+        public TezReadOnceBag(Packager pkgr,
+                PigNullableWritable currentKey) throws IOException {
+            this.pkgr = pkgr;
+            this.keyWritable = currentKey;
+            this.iter = readers.get(0).getCurrentValues().iterator();
+        }
+
+        @Override
+        public Iterator<Tuple> iterator() {
+            return new TezReadOnceBagIterator();
+        }
+
+        private class TezReadOnceBagIterator implements Iterator<Tuple> {
+
+            @Override
+            public boolean hasNext() {
+                if (iter.hasNext()) {
+                    return true;
+                } else {
+                    try {
+                        finished[0] = !readers.get(0).next();
+                        if (finished[0]) {
+                            return false;
+                        }
+                        // Currently combiner is not being applied when 
secondary key(grouping comparator) is used
+                        // But might change in future. So check if the next 
key is same and return its values
+                        Object cur = readers.get(0).getCurrentKey();
+                        if (groupingComparator.compare(keyWritable, cur) == 0) 
{
+                            iter = 
readers.get(0).getCurrentValues().iterator();
+                            // Key should at least have one value. But doing a 
check just for safety
+                            if (iter.hasNext()) {
+                                return true;
+                            } else {
+                                throw new RuntimeException("Unexpected. Key " 
+ keyWritable + " does not have any values");
+                            }
+                        }
+                        return false;
+                    } catch (IOException e) {
+                        throw new RuntimeException("ReadOnceBag failed to get 
value tuple : ", e);
+                    }
+                }
+            }
+
+            @Override
+            public Tuple next() {
+                NullableTuple ntup = (NullableTuple) iter.next();
+                int index = ntup.getIndex();
+                Tuple ret = null;
+                try {
+                    ret = pkgr.getValueTuple(keyWritable, ntup, index);
+                } catch (ExecException e) {
+                    throw new RuntimeException("ReadOnceBag failed to get 
value tuple : ", e);
+                }
+                return ret;
+            }
+
+            @Override
+            public void remove() {
+                throw new RuntimeException("ReadOnceBag.iterator().remove() is 
not allowed");
+            }
+        }
+
+    }
+
+
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
 Wed Feb 22 09:43:41 2017
@@ -57,6 +57,7 @@ public class POShuffledValueInputTez ext
     private transient Iterator<KeyValueReader> readers;
     private transient KeyValueReader currentReader;
     private transient Configuration conf;
+    private transient Boolean hasFirstRecord;
 
     public POShuffledValueInputTez(OperatorKey k) {
         super(k);
@@ -98,6 +99,8 @@ public class POShuffledValueInputTez ext
             }
             readers = readersList.iterator();
             currentReader = readers.next();
+            // Force input fetch
+            hasFirstRecord = currentReader.next();
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -111,7 +114,15 @@ public class POShuffledValueInputTez ext
             }
 
             do {
-                if (currentReader.next()) {
+                if (hasFirstRecord != null) {
+                    if (hasFirstRecord) {
+                        hasFirstRecord = null;
+                        Tuple origTuple = (Tuple) 
currentReader.getCurrentValue();
+                        Tuple copy = 
mTupleFactory.newTuple(origTuple.getAll());
+                        return new Result(POStatus.STATUS_OK, copy);
+                    }
+                    hasFirstRecord = null;
+                } else if (currentReader.next()) {
                     Tuple origTuple = (Tuple) currentReader.getCurrentValue();
                     Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
                     return new Result(POStatus.STATUS_OK, copy);

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
 Wed Feb 22 09:43:41 2017
@@ -60,6 +60,8 @@ public class POSimpleTezLoad extends POL
     private transient Configuration conf;
     private transient boolean finished = false;
     private transient TezCounter inputRecordCounter;
+    private transient boolean initialized;
+    private transient boolean noTupleCopy;
 
     public POSimpleTezLoad(OperatorKey k, LoadFunc loader) {
         super(k, loader);
@@ -149,7 +151,13 @@ public class POSimpleTezLoad extends POL
             } else {
                 Result res = new Result();
                 Tuple next = (Tuple) reader.getCurrentValue();
-                res.result = next;
+                if (!initialized) {
+                    noTupleCopy = 
mTupleFactory.newTuple(1).getClass().isInstance(next);
+                    initialized = true;
+                }
+                // Some Loaders return implementations of DefaultTuple instead 
of BinSedesTuple
+                // In that case copy to BinSedesTuple
+                res.result = noTupleCopy ? next : 
mTupleFactory.newTupleNoCopy(next.getAll());
                 res.returnStatus = POStatus.STATUS_OK;
                 if (inputRecordCounter != null) {
                     inputRecordCounter.increment(1);

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
 Wed Feb 22 09:43:41 2017
@@ -102,19 +102,19 @@ public class POStoreTez extends POStore
             throw new ExecException(e);
         }
 
-        // Multiple outputs - can be another store or other outputs (shuffle, 
broadcast)
-        if (outputs.size() > 1) {
-            CounterGroup multiStoreGroup = processorContext.getCounters()
-                    .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
-            if (multiStoreGroup == null) {
-                processorContext.getCounters().addGroup(
-                        MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
-                        MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
-            }
-            String name = MRPigStatsUtil.getMultiStoreCounterName(this);
-            if (name != null) {
-                outputRecordCounter = multiStoreGroup.addCounter(name, name, 
0);
-            }
+        // Even if there is a single hdfs output, we add multi store counter
+        // Makes it easier for user to see records for a particular store from
+        // the DAG counter
+        CounterGroup multiStoreGroup = processorContext.getCounters()
+                .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+        if (multiStoreGroup == null) {
+            processorContext.getCounters().addGroup(
+                    MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
+                    MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+        }
+        String name = MRPigStatsUtil.getMultiStoreCounterName(this);
+        if (name != null) {
+            outputRecordCounter = multiStoreGroup.addCounter(name, name, 0);
         }
     }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
 Wed Feb 22 09:43:41 2017
@@ -57,6 +57,7 @@ public class POValueInputTez extends Phy
     private transient KeyValuesReader shuffleReader;
     private transient boolean shuffleInput;
     private transient boolean hasNext;
+    private transient Boolean hasFirstRecord;
 
     public POValueInputTez(OperatorKey k) {
         super(k);
@@ -92,6 +93,8 @@ public class POValueInputTez extends Phy
             Reader r = input.getReader();
             if (r instanceof KeyValueReader) {
                 reader = (KeyValueReader) r;
+                // Force input fetch
+                hasFirstRecord = reader.next();
             } else {
                 shuffleInput = true;
                 shuffleReader = (KeyValuesReader) r;
@@ -118,10 +121,22 @@ public class POValueInputTez extends Phy
                     }
                     hasNext = shuffleReader.next();
                 }
-            } else if (reader.next()) {
-                Tuple origTuple = (Tuple) reader.getCurrentValue();
-                Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
-                return new Result(POStatus.STATUS_OK, copy);
+            } else {
+                if (hasFirstRecord != null) {
+                    if (hasFirstRecord) {
+                        hasFirstRecord = null;
+                        Tuple origTuple = (Tuple) reader.getCurrentValue();
+                        Tuple copy = 
mTupleFactory.newTuple(origTuple.getAll());
+                        return new Result(POStatus.STATUS_OK, copy);
+                    }
+                    hasFirstRecord = null;
+                } else {
+                    while (reader.next()) {
+                        Tuple origTuple = (Tuple) reader.getCurrentValue();
+                        Tuple copy = 
mTupleFactory.newTuple(origTuple.getAll());
+                        return new Result(POStatus.STATUS_OK, copy);
+                    }
+                }
             }
             finished = true;
             // For certain operators (such as STREAM), we could still have 
some work

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
 Wed Feb 22 09:43:41 2017
@@ -69,6 +69,11 @@ public class CombinerOptimizer extends T
         }
 
         for (TezOperator from : predecessors) {
+            PhysicalPlan combinePlan = 
to.inEdges.get(from.getOperatorKey()).combinePlan;
+            if (!combinePlan.isEmpty()) {
+                // Cases like bloom join have combine plan already set
+                continue;
+            }
             List<POLocalRearrangeTez> rearranges = 
PlanHelper.getPhysicalOperators(from.plan, POLocalRearrangeTez.class);
             if (rearranges.isEmpty()) {
                 continue;
@@ -77,7 +82,7 @@ public class CombinerOptimizer extends T
             POLocalRearrangeTez connectingLR = null;
             PhysicalPlan rearrangePlan = from.plan;
             for (POLocalRearrangeTez lr : rearranges) {
-                if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+                if (lr.containsOutputKey(to.getOperatorKey().toString())) {
                     connectingLR = lr;
                     break;
                 }
@@ -90,7 +95,6 @@ public class CombinerOptimizer extends T
 
             // Detected the POLocalRearrange -> POPackage pattern. Let's add
             // combiner if possible.
-            PhysicalPlan combinePlan = 
to.inEdges.get(from.getOperatorKey()).combinePlan;
             CombinerOptimizerUtil.addCombiner(rearrangePlan, to.plan, 
combinePlan, messageCollector, doMapAgg);
 
             if(!combinePlan.isEmpty()) {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
 Wed Feb 22 09:43:41 2017
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
@@ -65,11 +66,6 @@ public class LoaderProcessor extends Tez
         this.jobConf.setBoolean("mapred.mapper.new-api", true);
         this.jobConf.setClass("mapreduce.inputformat.class",
                 PigInputFormat.class, InputFormat.class);
-        try {
-            this.jobConf.set("pig.pigContext", ObjectSerializer.serialize(pc));
-        } catch (IOException e) {
-            throw new VisitorException(e);
-        }
     }
 
     /**
@@ -175,6 +171,7 @@ public class LoaderProcessor extends Tez
             // splits can be moved to if(loads) block below
             int parallelism = 
tezOp.getLoaderInfo().getInputSplitInfo().getNumTasks();
             tezOp.setRequestedParallelism(parallelism);
+            
tezOp.setTotalInputFilesSize(InputSizeReducerEstimator.getTotalInputFileSize(conf,
 lds, job));
         }
         return lds;
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
 Wed Feb 22 09:43:41 2017
@@ -153,6 +153,8 @@ public class MultiQueryOptimizerTez exte
                     }
                 }
                 if (getPlan().getSuccessors(successor) != null) {
+                    nonPackageInputSuccessors.clear();
+                    toMergeSuccessors.clear();
                     for (TezOperator succSuccessor : 
getPlan().getSuccessors(successor)) {
                         if (succSuccessor.isUnion()) {
                             if (!(unionOptimizerOn &&
@@ -171,7 +173,13 @@ public class MultiQueryOptimizerTez exte
                                         continue;
                                     }
                                 }
-                                toMergeSuccessors.add(succSuccessor);
+                                if 
(TezCompilerUtil.isNonPackageInput(successor.getOperatorKey().toString(), 
succSuccessor)) {
+                                    // Output goes to scalar or POFRJoinTez in 
the union operator
+                                    // We need to ensure it is the only one to 
avoid parallel edges
+                                    canMerge = canMerge ? 
nonPackageInputSuccessors.add(succSuccessor) : false;
+                                } else {
+                                    toMergeSuccessors.add(succSuccessor);
+                                }
                                 List<TezOperator> unionSuccessors = 
getPlan().getSuccessors(succSuccessor);
                                 if (unionSuccessors != null) {
                                     for (TezOperator unionSuccessor : 
unionSuccessors) {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
 Wed Feb 22 09:43:41 2017
@@ -115,11 +115,16 @@ public class ParallelismSetter extends T
                     } else if (pc.defaultParallel != -1) {
                         parallelism = pc.defaultParallel;
                     }
+                    if (parallelism == 0) {
+                        // We need to produce empty output file.
+                        // Even if user set PARALLEL 0, mapreduce has 1 reducer
+                        parallelism = 1;
+                    }
                     boolean overrideRequestedParallelism = false;
                     if (parallelism != -1
                             && autoParallelismEnabled
-                            && tezOp.isIntermediateReducer()
                             && !tezOp.isDontEstimateParallelism()
+                            && tezOp.isIntermediateReducer()
                             && tezOp.isOverrideIntermediateParallelism()) {
                         overrideRequestedParallelism = true;
                     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java
 Wed Feb 22 09:43:41 2017
@@ -75,7 +75,7 @@ public class SecondaryKeyOptimizerTez ex
         POLocalRearrangeTez connectingLR = null;
         PhysicalPlan rearrangePlan = from.plan;
         for (POLocalRearrangeTez lr : rearranges) {
-            if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+            if (lr.containsOutputKey(to.getOperatorKey().toString())) {
                 connectingLR = lr;
                 break;
             }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java
 Wed Feb 22 09:43:41 2017
@@ -30,6 +30,8 @@ public class TezEstimatedParallelismClea
 
     @Override
     public void visitTezOp(TezOperator tezOp) throws VisitorException {
-        tezOp.setEstimatedParallelism(-1);
+        if (!tezOp.isDontEstimateParallelism()) {
+            tezOp.setEstimatedParallelism(-1);
+        }
     }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
 Wed Feb 22 09:43:41 2017
@@ -62,7 +62,6 @@ import org.apache.tez.dag.api.EdgeProper
  */
 public class TezOperDependencyParallelismEstimator implements 
TezParallelismEstimator {
 
-    static private int maxTaskCount;
     static final double DEFAULT_FLATTEN_FACTOR = 10;
     static final double DEFAULT_FILTER_FACTOR = 0.7;
     static final double DEFAULT_LIMIT_FACTOR = 0.1;
@@ -76,6 +75,8 @@ public class TezOperDependencyParallelis
     static final double DEFAULT_AGGREGATION_FACTOR = 0.7;
 
     private PigContext pc;
+    private int maxTaskCount;
+    private long bytesPerReducer;
 
     @Override
     public void setPigContext(PigContext pc) {
@@ -94,16 +95,18 @@ public class TezOperDependencyParallelis
         maxTaskCount = conf.getInt(PigReducerEstimator.MAX_REDUCER_COUNT_PARAM,
                 PigReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
 
-        // If parallelism is set explicitly, respect it
-        if (!tezOper.isIntermediateReducer() && 
tezOper.getRequestedParallelism()!=-1) {
-            return tezOper.getRequestedParallelism();
-        }
+        bytesPerReducer = 
conf.getLong(PigReducerEstimator.BYTES_PER_REDUCER_PARAM, 
PigReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
 
         // If we have already estimated parallelism, use that one
-        if (tezOper.getEstimatedParallelism()!=-1) {
+        if (tezOper.getEstimatedParallelism() != -1) {
             return tezOper.getEstimatedParallelism();
         }
 
+        // If parallelism is set explicitly, respect it
+        if (!tezOper.isIntermediateReducer() && 
tezOper.getRequestedParallelism()!=-1) {
+            return tezOper.getRequestedParallelism();
+        }
+
         List<TezOperator> preds = plan.getPredecessors(tezOper);
         if (preds==null) {
             throw new IOException("Cannot estimate parallelism for source 
vertex");
@@ -130,6 +133,12 @@ public class TezOperDependencyParallelis
                 boolean applyFactor = !tezOper.isUnion();
                 if (!pred.isVertexGroup() && applyFactor) {
                     predParallelism = predParallelism * 
pred.getParallelismFactor(tezOper);
+                    if (pred.getTotalInputFilesSize() > 0) {
+                        // Estimate similar to mapreduce and use the maximum 
of two
+                        int parallelismBySize = (int) Math.ceil((double) pred
+                                .getTotalInputFilesSize() / bytesPerReducer);
+                        predParallelism = Math.max(predParallelism, 
parallelismBySize);
+                    }
                 }
                 estimatedParallelism += predParallelism;
             }
@@ -157,9 +166,7 @@ public class TezOperDependencyParallelis
         }
 
         if (roundedEstimatedParallelism == 0) {
-            throw new IOException("Estimated parallelism for "
-                    + tezOper.getOperatorKey().toString()
-                    + " is 0 which is unexpected");
+            roundedEstimatedParallelism = 1; // We need to produce empty 
output file
         }
 
         return roundedEstimatedParallelism;
@@ -196,7 +203,7 @@ public class TezOperDependencyParallelis
             if (successor != null) {
                 // Map side combiner
                 TezEdgeDescriptor edge = 
tezOp.outEdges.get(successor.getOperatorKey());
-                if (!edge.combinePlan.isEmpty()) {
+                if (!edge.combinePlan.isEmpty() || 
edge.needsDistinctCombiner()) {
                     if (successor.isDistinct()) {
                         factor = DEFAULT_DISTINCT_FACTOR;
                     } else {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 Wed Feb 22 09:43:41 2017
@@ -29,6 +29,7 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigConfiguration;
+import org.apache.pig.StoreFunc;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -44,6 +45,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
@@ -52,7 +54,6 @@ import org.apache.pig.builtin.AvroStorag
 import org.apache.pig.builtin.JsonStorage;
 import org.apache.pig.builtin.OrcStorage;
 import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.builtin.RoundRobinPartitioner;
 import org.apache.pig.builtin.mock.Storage;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
@@ -108,6 +109,12 @@ public class UnionOptimizer extends TezO
         if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && 
tezOp.getRequestedParallelism() == 1) {
             return false;
         }
+
+        // If user has specified a PARALLEL clause with the union operator
+        // turn off union optimization
+        if (tezOp.getRequestedParallelism() != -1) {
+            return false;
+        }
         // Two vertices separately ranking with 1 to n and writing to output 
directly
         // will make each rank repeate twice which is wrong. Rank always needs 
to be
         // done from single vertex to have the counting correct.
@@ -120,10 +127,25 @@ public class UnionOptimizer extends TezO
     public static boolean isOptimizableStoreFunc(TezOperator tezOp,
             List<String> supportedStoreFuncs, List<String> 
unsupportedStoreFuncs)
             throws VisitorException {
-        if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) {
-            List<POStoreTez> stores = 
PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class);
-            for (POStoreTez store : stores) {
-                String name = store.getStoreFunc().getClass().getName();
+        List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, 
POStoreTez.class);
+
+        for (POStoreTez store : stores) {
+            String name = store.getStoreFunc().getClass().getName();
+            if (store.getStoreFunc() instanceof StoreFunc) {
+                StoreFunc func = (StoreFunc) store.getStoreFunc();
+                if (func.supportsParallelWriteToStoreLocation() != null) {
+                    if (func.supportsParallelWriteToStoreLocation()) {
+                        continue;
+                    } else {
+                        LOG.warn(name + " does not support union optimization."
+                                + " Disabling it. There will be some 
performance degradation.");
+                        return false;
+                    }
+                }
+            }
+            // If StoreFunc does not explicitly state support, then check 
supported and
+            // unsupported config settings.
+            if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) {
                 if (unsupportedStoreFuncs != null
                         && unsupportedStoreFuncs.contains(name)) {
                     return false;
@@ -237,8 +259,23 @@ public class UnionOptimizer extends TezO
                 for (TezOperator succ : successors) {
                     if (succ.isVertexGroup() && 
unionStoreOutputs.get(i).getSFile().equals(succ.getVertexGroupInfo().getSFile()))
 {
                         existingVertexGroup = succ;
+                        break;
+                    }
+                }
+            }
+            if (existingVertexGroup == null) {
+                // In the case of union + split + union + store, the different 
stores in the Split
+                // will be writing to same location after second union 
operator is optimized.
+                // So while optimizing the first union, we should just make it 
write to one vertex group
+                for (int j = 0; j < i; j++) {
+                    if 
(unionStoreOutputs.get(i).getSFile().equals(storeVertexGroupOps[j].getVertexGroupInfo().getSFile()))
 {
+                        storeVertexGroupOps[i] = storeVertexGroupOps[j];
+                        break;
                     }
                 }
+                if (storeVertexGroupOps[i] != null) {
+                    continue;
+                }
             }
             if (existingVertexGroup != null) {
                 storeVertexGroupOps[i] = existingVertexGroup;
@@ -270,6 +307,15 @@ public class UnionOptimizer extends TezO
         TezOperator[] outputVertexGroupOps = new 
TezOperator[unionOutputKeys.size()];
         String[] newOutputKeys = new String[unionOutputKeys.size()];
         for (int i=0; i < outputVertexGroupOps.length; i++) {
+            for (int j = 0; j < i; j++) {
+                if (unionOutputKeys.get(i).equals(unionOutputKeys.get(j))) {
+                    outputVertexGroupOps[i] = outputVertexGroupOps[j];
+                    break;
+                }
+            }
+            if (outputVertexGroupOps[i] != null) {
+                continue;
+            }
             outputVertexGroupOps[i] = new 
TezOperator(OperatorKey.genOpKey(scope));
             outputVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo());
             
outputVertexGroupOps[i].getVertexGroupInfo().setOutput(unionOutputKeys.get(i));
@@ -515,15 +561,24 @@ public class UnionOptimizer extends TezO
         // Connect predecessor to the storeVertexGroups
         int i = 0;
         for (TezOperator storeVertexGroup : storeVertexGroupOps) {
+            // Skip connecting if they are already connected. Can happen in 
case of
+            // union + split + union + store. Because of the split all the 
stores
+            // will be writing to same location
+            List<OperatorKey> inputs = 
storeVertexGroup.getVertexGroupInfo().getInputs();
+            if (inputs == null || !inputs.contains(pred.getOperatorKey())) {
+                tezPlan.connect(pred, storeVertexGroup);
+            }
             
storeVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
             
pred.addVertexGroupStore(clonedUnionStoreOutputs.get(i++).getOperatorKey(),
                     storeVertexGroup.getOperatorKey());
-            tezPlan.connect(pred, storeVertexGroup);
         }
 
         for (TezOperator outputVertexGroup : outputVertexGroupOps) {
+            List<OperatorKey> inputs = 
outputVertexGroup.getVertexGroupInfo().getInputs();
+            if (inputs == null || !inputs.contains(pred.getOperatorKey())) {
+                tezPlan.connect(pred, outputVertexGroup);
+            }
             
outputVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
-            tezPlan.connect(pred, outputVertexGroup);
         }
 
         copyOperatorProperties(pred, unionOp);
@@ -568,7 +623,7 @@ public class UnionOptimizer extends TezO
             // more union predecessors. Change it to SCATTER_GATHER
             if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
                 edge.dataMovementType = DataMovementType.SCATTER_GATHER;
-                edge.partitionerClass = RoundRobinPartitioner.class;
+                edge.partitionerClass = HashValuePartitioner.class;
                 edge.outputClassName = 
UnorderedPartitionedKVOutput.class.getName();
                 edge.inputClassName = UnorderedKVInput.class.getName();
             }

Added: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java?rev=1783988&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java
 Wed Feb 22 09:43:41 2017
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+
+public class HashValuePartitioner extends Partitioner<Writable, Writable> {
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public int getPartition(Writable key, Writable value, int numPartitions) {
+        int hash = 17;
+        Tuple tuple;
+        if (value instanceof Tuple) {
+            // union optimizer turned off
+            tuple = (Tuple) value;
+        } else {
+            // union followed by order by or skewed join
+            tuple = (Tuple)((NullableTuple) value).getValueAsPigType();
+        }
+        if (tuple != null) {
+            for (Object o : tuple.getAll()) {
+                if (o != null) {
+                    // Skip computing hashcode for bags.
+                    // Order of elements in the map/bag may be different on 
each run
+                    // Can't even include size as some DataBag implementations
+                    // iterate through all elements in the bag to get the size.
+                    if (o instanceof DataBag) {
+                        hash = 31 * hash;
+                    } else {
+                        hash = 31 * hash + o.hashCode();
+                    }
+                }
+            }
+        }
+        return (hash & Integer.MAX_VALUE) % numPartitions;
+    }
+
+}
\ No newline at end of file

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
 Wed Feb 22 09:43:41 2017
@@ -17,23 +17,25 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
 
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
-import org.apache.tez.dag.app.dag.impl.ScatterGatherEdgeManager;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
@@ -46,8 +48,13 @@ import com.google.common.collect.Lists;
 public class PartitionerDefinedVertexManager extends VertexManagerPlugin {
     private static final Log LOG = 
LogFactory.getLog(PartitionerDefinedVertexManager.class);
 
-    private boolean isParallelismSet = false;
+    private volatile boolean parallelismSet;
     private int dynamicParallelism = -1;
+    private int numConfiguredSources;
+    private int numSources = -1;
+    private volatile boolean configured;
+    private volatile boolean started;
+    private volatile boolean scheduled;
 
     public PartitionerDefinedVertexManager(VertexManagerPluginContext context) 
{
         super(context);
@@ -55,7 +62,31 @@ public class PartitionerDefinedVertexMan
 
     @Override
     public void initialize() {
-        // Nothing to do
+        // this will prevent vertex from starting until we notify we are done
+        getContext().vertexReconfigurationPlanned();
+        parallelismSet = false;
+        numConfiguredSources = 0;
+        configured = false;
+        started = false;
+        numSources = getContext().getInputVertexEdgeProperties().size();
+        // wait for sources and self to start
+        Map<String, EdgeProperty> edges = 
getContext().getInputVertexEdgeProperties();
+        for (String entry : edges.keySet()) {
+            getContext().registerForVertexStateUpdates(entry, 
EnumSet.of(VertexState.CONFIGURED));
+        }
+    }
+
+    @Override
+    public synchronized void onVertexStateUpdated(VertexStateUpdate 
stateUpdate)
+            throws Exception {
+        numConfiguredSources++;
+        LOG.info("For vertex: " + getContext().getVertexName() + " Received 
configured signal from: "
+            + stateUpdate.getVertexName() + " numConfiguredSources: " + 
numConfiguredSources
+            + " needed: " + numSources);
+        Preconditions.checkState(numConfiguredSources <= numSources, "Vertex: 
" + getContext().getVertexName());
+        if (numConfiguredSources == numSources) {
+            configure();
+        }
     }
 
     @Override
@@ -73,10 +104,9 @@ public class PartitionerDefinedVertexMan
     public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) 
throws Exception {
         // There could be multiple partition vertex sending VertexManagerEvent
         // Only need to setVertexParallelism once
-        if (isParallelismSet) {
+        if (parallelismSet) {
             return;
         }
-        isParallelismSet = true;
         // Need to distinguish from VertexManagerEventPayloadProto emitted by 
OrderedPartitionedKVOutput
         if (vmEvent.getUserPayload().limit()==4) {
             dynamicParallelism = vmEvent.getUserPayload().getInt();
@@ -96,18 +126,50 @@ public class PartitionerDefinedVertexMan
                     edgeManagers.put(entry.getKey(), edge);
                 }
                 getContext().reconfigureVertex(dynamicParallelism, null, 
edgeManagers);
+                parallelismSet = true;
+                configure();
             }
         }
     }
 
-    @Override
-    public void onVertexStarted(Map<String, List<Integer>> completions) {
-        if (dynamicParallelism != -1) {
+    private void configure() {
+        if(parallelismSet && (numSources == numConfiguredSources)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Done reconfiguring vertex " + 
getContext().getVertexName());
+            }
+            getContext().doneReconfiguringVertex();
+            configured = true;
+            trySchedulingTasks();
+        }
+    }
+
+    private synchronized void trySchedulingTasks() {
+        if (configured && started && !scheduled) {
+            LOG.info("Scheduling " + dynamicParallelism + " tasks for vertex " 
+ getContext().getVertexName());
             List<TaskWithLocationHint> tasksToStart = 
Lists.newArrayListWithCapacity(dynamicParallelism);
-            for (int i=0; i<dynamicParallelism; ++i) {
+            for (int i = 0; i < dynamicParallelism; ++i) {
                 tasksToStart.add(new TaskWithLocationHint(new Integer(i), 
null));
             }
             getContext().scheduleVertexTasks(tasksToStart);
+            scheduled = true;
         }
     }
+
+    @Override
+    public void onVertexStarted(Map<String, List<Integer>> completions) {
+        // This vertex manager will be getting the following calls
+        //   1) onVertexManagerEventReceived - Parallelism vertex manager 
event sent by sample aggregator vertex
+        //   2) onVertexStateUpdated - Vertex CONFIGURED status updates from
+        //       - Order by Partitioner vertex (1-1) in case of Order by
+        //       - Skewed Join Left Partitioner (1-1) and Right Input Vertices 
in case of SkewedJoin
+        //   3) onVertexStarted
+        // Calls 2) and 3) can happen in any order. So we should schedule tasks
+        // only after start is called and configuration is also complete
+        started = true;
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Vertex start received for " + 
getContext().getVertexName());
+        }
+        trySchedulingTasks();
+    }
+
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java
 Wed Feb 22 09:43:41 2017
@@ -33,15 +33,15 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.TezEstimatedParallelismClearer;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
@@ -72,7 +72,7 @@ public class PigGraceShuffleVertexManage
             conf = 
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
             bytesPerTask = 
conf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                     InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
-            pc = 
(PigContext)ObjectSerializer.deserialize(conf.get("pig.pigContext"));
+            pc = 
(PigContext)ObjectSerializer.deserialize(conf.get(PigImplConstants.PIG_CONTEXT));
             tezPlan = 
(TezOperPlan)ObjectSerializer.deserialize(conf.get("pig.tez.plan"));
             TezEstimatedParallelismClearer clearer = new 
TezEstimatedParallelismClearer(tezPlan);
             try {
@@ -81,9 +81,10 @@ public class PigGraceShuffleVertexManage
                 throw new TezUncheckedException(e);
             }
             TezOperator op = 
tezPlan.getOperator(OperatorKey.fromString(getContext().getVertexName()));
-    
+
             // Collect grandparents of the vertex
-            Function<TezOperator, String> tezOpToString = new 
Function<TezOperator, String>() { 
+            Function<TezOperator, String> tezOpToString = new 
Function<TezOperator, String>() {
+                @Override
                 public String apply(TezOperator op) { return 
op.getOperatorKey().toString(); }
             };
             grandParents = 
Lists.transform(TezOperPlan.getGrandParentsForGraceParallelism(tezPlan, op), 
tezOpToString);
@@ -135,7 +136,7 @@ public class PigGraceShuffleVertexManage
         // Now one of the predecessor is about to start, we need to make a 
decision now
         if (anyPredAboutToStart) {
             // All grandparents finished, start parents with right parallelism
-            
+
             for (TezOperator pred : preds) {
                 if (pred.getRequestedParallelism()==-1) {
                     List<TezOperator> predPreds = 
tezPlan.getPredecessors(pred);

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
 Wed Feb 22 09:43:41 2017
@@ -25,6 +25,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Properties;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -32,6 +33,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.log4j.PropertyConfigurator;
 import org.apache.pig.JVMReuseImpl;
 import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
@@ -39,6 +41,7 @@ import org.apache.pig.backend.executione
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ProgressableReporter;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
@@ -53,6 +56,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -132,7 +136,11 @@ public class PigProcessor extends Abstra
         SpillableMemoryManager.getInstance().configure(conf);
         PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer
                 .deserialize(conf.get("udf.import.list")));
-        PigContext pc = (PigContext) 
ObjectSerializer.deserialize(conf.get("pig.pigContext"));
+        Properties log4jProperties = (Properties) ObjectSerializer
+                .deserialize(conf.get(PigImplConstants.PIG_LOG4J_PROPERTIES));
+        if (log4jProperties != null) {
+            PropertyConfigurator.configure(log4jProperties);
+        }
 
         // To determine front-end in UDFContext
         conf.set(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, 
getContext().getUniqueIdentifier());
@@ -151,6 +159,12 @@ public class PigProcessor extends Abstra
         conf.setInt(JobContext.TASK_PARTITION,
               taskAttemptId.getTaskID().getId());
         conf.set(JobContext.ID, taskAttemptId.getJobID().toString());
+        if (conf.get(PigInputFormat.PIG_INPUT_LIMITS) != null) {
+            // Has Load and is a root vertex
+            conf.setInt(JobContext.NUM_MAPS, 
getContext().getVertexParallelism());
+        } else {
+            conf.setInt(JobContext.NUM_REDUCES, 
getContext().getVertexParallelism());
+        }
 
         conf.set(PigConstants.TASK_INDEX, 
Integer.toString(getContext().getTaskIndex()));
         UDFContext.getUDFContext().addJobConf(conf);
@@ -158,7 +172,7 @@ public class PigProcessor extends Abstra
 
         String execPlanString = conf.get(PLAN);
         execPlan = (PhysicalPlan) ObjectSerializer.deserialize(execPlanString);
-        SchemaTupleBackend.initialize(conf, pc);
+        SchemaTupleBackend.initialize(conf);
         PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new 
org.apache.hadoop.mapreduce.JobID());
 
         // Set the job conf as a thread-local member of PigMapReduce
@@ -167,7 +181,7 @@ public class PigProcessor extends Abstra
 
         Utils.setDefaultTimeZone(conf);
 
-        boolean aggregateWarning = 
"true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
+        boolean aggregateWarning = 
"true".equalsIgnoreCase(conf.get("aggregate.warning"));
         PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
         pigStatusReporter.setContext(new TezTaskContext(getContext()));
         pigHadoopLogger = PigHadoopLogger.getInstance();

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java
 Wed Feb 22 09:43:41 2017
@@ -43,6 +43,15 @@ public interface TezInput {
      */
     public void addInputsToSkip(Set<String> inputsToSkip);
 
+    /**
+     * Attach the inputs to the operator. Also ensure reader.next() is called 
to force fetch
+     * the input so that all inputs are fetched and memory released before 
memory is allocated
+     * for outputs
+     *
+     * @param inputs available inputs
+     * @param conf configuration
+     * @throws ExecException
+     */
     public void attachInputs(Map<String, LogicalInput> inputs,
             Configuration conf) throws ExecException;
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java
 Wed Feb 22 09:43:41 2017
@@ -23,6 +23,7 @@ import java.util.Map.Entry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.JobContext;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
 import org.apache.pig.data.DataBag;
@@ -30,6 +31,7 @@ import org.apache.pig.data.InternalMap;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.UDFContext;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 
 public class WeightedRangePartitionerTez extends WeightedRangePartitioner {
@@ -64,11 +66,13 @@ public class WeightedRangePartitionerTez
             InternalMap weightedPartsData = (InternalMap) 
quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
             estimatedNumPartitions = 
(Integer)quantileMap.get(PigProcessor.ESTIMATED_NUM_PARALLELISM);
             convertToArray(quantilesList);
+            long taskIdHashCode = 
UDFContext.getUDFContext().getJobConf().get(JobContext.TASK_ID).hashCode();
+            long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 
0xffffffffL);
             for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
                 Tuple key = (Tuple) ent.getKey(); // sample item which repeats
                 float[] probVec = getProbVec((Tuple) ent.getValue());
                 weightedParts.put(getPigNullableWritable(key),
-                        new DiscreteProbabilitySampleGenerator(probVec));
+                        new DiscreteProbabilitySampleGenerator(randomSeed, 
probVec));
             }
         } catch (Exception e) {
             throw new RuntimeException(e);

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
 Wed Feb 22 09:43:41 2017
@@ -50,6 +50,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk;
@@ -102,7 +103,6 @@ public class MRToTezHelper {
         mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE, 
TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
         mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL, 
TezConfiguration.TEZ_TASK_LOG_LEVEL);
         
mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit", 
"tez.am.vertex.max-task-concurrency");
-        
mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", 
"tez.am.vertex.max-task-concurrency");
         mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, 
"tez.am.progress.stuck.interval-ms");
     }
 
@@ -165,11 +165,7 @@ public class MRToTezHelper {
                     continue;
                 }
             }
-            if (key.startsWith("dfs.datanode")) {
-                tezConf.unset(key);
-            } else if (key.startsWith("dfs.namenode")) {
-                tezConf.unset(key);
-            } else if (key.startsWith("yarn.nodemanager")) {
+            if (key.startsWith("yarn.nodemanager")) {
                 tezConf.unset(key);
             } else if (key.startsWith("mapreduce.jobhistory")) {
                 tezConf.unset(key);
@@ -181,20 +177,15 @@ public class MRToTezHelper {
         }
     }
 
-    public static TezConfiguration getDAGAMConfFromMRConf(
-            Configuration tezConf) {
-
-        // Set Tez parameters based on MR parameters.
-        TezConfiguration dagAMConf = new TezConfiguration(tezConf);
-
+    public static void translateMRSettingsForTezAM(TezConfiguration dagAMConf) 
{
 
         convertMRToTezConf(dagAMConf, dagAMConf, 
DeprecatedKeys.getMRToDAGParamMap());
         convertMRToTezConf(dagAMConf, dagAMConf, mrAMParamToTezAMParamMap);
 
-        String env = tezConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV);
-        if (tezConf.get(MRJobConfig.MR_AM_ENV) != null) {
-            env = (env == null) ? tezConf.get(MRJobConfig.MR_AM_ENV)
-                                : env + "," + 
tezConf.get(MRJobConfig.MR_AM_ENV);
+        String env = dagAMConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV);
+        if (dagAMConf.get(MRJobConfig.MR_AM_ENV) != null) {
+            env = (env == null) ? dagAMConf.get(MRJobConfig.MR_AM_ENV)
+                                : env + "," + 
dagAMConf.get(MRJobConfig.MR_AM_ENV);
         }
 
         if (env != null) {
@@ -203,24 +194,23 @@ public class MRToTezHelper {
 
         dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
                 org.apache.tez.mapreduce.hadoop.MRHelpers
-                        .getJavaOptsForMRAM(tezConf));
+                        .getJavaOptsForMRAM(dagAMConf));
 
-        String queueName = tezConf.get(JobContext.QUEUE_NAME,
+        String queueName = dagAMConf.get(JobContext.QUEUE_NAME,
                 YarnConfiguration.DEFAULT_QUEUE_NAME);
         dagAMConf.setIfUnset(TezConfiguration.TEZ_QUEUE_NAME, queueName);
 
         dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_VIEW_ACLS,
-                tezConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, 
MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
+                dagAMConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, 
MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
 
         dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MODIFY_ACLS,
-                tezConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, 
MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
+                dagAMConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, 
MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
 
         // Hardcoding at AM level instead of setting per vertex till TEZ-2710 
is available
         
dagAMConf.setIfUnset(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, 
"0.5");
 
         removeUnwantedSettings(dagAMConf, true);
 
-        return dagAMConf;
     }
 
     /**
@@ -263,6 +253,14 @@ public class MRToTezHelper {
         JobControlCompiler.configureCompression(tezConf);
         convertMRToTezConf(tezConf, mrConf, 
DeprecatedKeys.getMRToTezRuntimeParamMap());
         removeUnwantedSettings(tezConf, false);
+
+        // ShuffleVertexManager Plugin settings
+        // DeprecatedKeys.getMRToTezRuntimeParamMap() only translates min and 
not max
+        String slowStartFraction = 
mrConf.get(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART);
+        if (slowStartFraction != null) {
+            
tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
 slowStartFraction);
+            
tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
 slowStartFraction);
+        }
     }
 
     /**

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
 Wed Feb 22 09:43:41 2017
@@ -36,13 +36,14 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POFRJoinTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
-import org.apache.pig.builtin.RoundRobinPartitioner;
 import org.apache.pig.builtin.TOBAG;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.TupleFactory;
@@ -198,8 +199,8 @@ public class TezCompilerUtil {
 
     public static boolean isNonPackageInput(String inputKey, TezOperator 
tezOp) throws PlanException {
         try {
-            List<TezInput> inputs = 
PlanHelper.getPhysicalOperators(tezOp.plan, TezInput.class);
-            for (TezInput input : inputs) {
+            List<POFRJoinTez> inputs = 
PlanHelper.getPhysicalOperators(tezOp.plan, POFRJoinTez.class);
+            for (POFRJoinTez input : inputs) {
                 if (ArrayUtils.contains(input.getTezInputs(), inputKey)) {
                     return true;
                 }
@@ -269,7 +270,7 @@ public class TezCompilerUtil {
         } else if (dataMovementType == DataMovementType.SCATTER_GATHER) {
             edge.outputClassName = 
UnorderedPartitionedKVOutput.class.getName();
             edge.inputClassName = UnorderedKVInput.class.getName();
-            edge.partitionerClass = RoundRobinPartitioner.class;
+            edge.partitionerClass = HashValuePartitioner.class;
         }
         
edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName());
         edge.setIntermediateOutputValueClass(TUPLE_CLASS);


Reply via email to