Added: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=1784237&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
 Fri Feb 24 08:19:42 2017
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+
+import java.io.IOException;
+
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.security.Credentials;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.Pair;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+
+abstract public class PigMapBase extends PigGenericMapBase {
+    /**
+     * 
+     * Get mapper's illustrator context
+     * 
+     * @param conf  Configuration
+     * @param input Input bag to serve as data source
+     * @param output Map output buffer
+     * @param split the split
+     * @return Illustrator's context
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    @Override
+    public Context getIllustratorContext(Configuration conf, DataBag input,
+          List<Pair<PigNullableWritable, Writable>> output, InputSplit split)
+          throws IOException, InterruptedException {
+       org.apache.hadoop.mapreduce.Mapper.Context mapperContext = new 
WrappedMapper<Text, Tuple, PigNullableWritable, Writable>().getMapContext(new 
IllustratorContext(conf, input, output, split));
+        return mapperContext;
+    }
+    
+    public class IllustratorContext extends MapContextImpl<Text, Tuple, 
PigNullableWritable, Writable> {
+        private DataBag input;
+        List<Pair<PigNullableWritable, Writable>> output;
+        private Iterator<Tuple> it = null;
+        private Tuple value = null;
+        private boolean init  = false;
+
+        public IllustratorContext(Configuration conf, DataBag input,
+              List<Pair<PigNullableWritable, Writable>> output,
+              InputSplit split) throws IOException, InterruptedException {
+            super(conf, new TaskAttemptID(), null, null, null, new 
IllustrateDummyReporter(), split);
+            conf.set("inIllustrator", "true");
+            if (output == null)
+                throw new IOException("Null output can not be used");
+            this.input = input; this.output = output;
+        }
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException 
{
+            if (input == null) {
+                if (!init) {
+                    init = true;
+                    return true;
+                }
+                return false;
+            }
+            if (it == null)
+                it = input.iterator();
+            if (!it.hasNext())
+                return false;
+            value = it.next();
+            return true;
+        }
+        
+        @Override
+        public Text getCurrentKey() {
+          return null;
+        }
+        
+        @Override
+        public Tuple getCurrentValue() {
+          return value;
+        }
+        
+        @Override
+        public void write(PigNullableWritable key, Writable value) 
+            throws IOException, InterruptedException {
+            output.add(new Pair<PigNullableWritable, Writable>(key, value));
+        }
+        
+        @Override
+        public void progress() {
+          
+        }
+    }
+    
+    @Override
+    public boolean inIllustrator(Context context) {
+        return 
((WrappedMapper.Context)context).getConfiguration().get("inIllustrator")!=null;
+    }
+}

Added: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=1784237&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
 Fri Feb 24 08:19:42 2017
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.ReduceContext;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
+import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.security.Credentials;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.IllustratorContext;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.pen.FakeRawKeyValueIterator;
+
+public class PigMapReduce extends PigGenericMapReduce {
+    
+    static class IllustrateReducerContext extends 
WrappedReducer<PigNullableWritable, NullableTuple, PigNullableWritable, 
Writable> {
+        public IllustratorContext 
+        getReducerContext(ReduceContext<PigNullableWritable, NullableTuple, 
PigNullableWritable, Writable> reduceContext) {
+            return new IllustratorContext(reduceContext);
+        }
+        
+        public class IllustratorContext 
+            extends WrappedReducer.Context {
+            public IllustratorContext(
+                    ReduceContext<PigNullableWritable, NullableTuple, 
PigNullableWritable, Writable> reduceContext) {
+                super(reduceContext);
+            }
+            public POPackage getPack() {
+                return ((Reduce.IllustratorContextImpl)reduceContext).pack;
+            }
+        }
+    }
+                 
+    public static class Reduce extends PigGenericMapReduce.Reduce {
+        /**
+         * Get reducer's illustrator context
+         * 
+         * @param input Input buffer as output by maps
+         * @param pkg package
+         * @return reducer's illustrator context
+         * @throws IOException
+         * @throws InterruptedException
+         */
+        @Override
+        public Context getIllustratorContext(Job job,
+               List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) 
throws IOException, InterruptedException {
+               org.apache.hadoop.mapreduce.Reducer.Context reducerContext = 
new IllustrateReducerContext()
+                               .getReducerContext(new 
IllustratorContextImpl(job, input, pkg));
+               return reducerContext;
+        }
+        
+        @SuppressWarnings("unchecked")
+        public class IllustratorContextImpl extends 
ReduceContextImpl<PigNullableWritable, NullableTuple, PigNullableWritable, 
Writable> {
+            private PigNullableWritable currentKey = null, nextKey = null;
+            private NullableTuple nextValue = null;
+            private List<NullableTuple> currentValues = null;
+            private Iterator<Pair<PigNullableWritable, Writable>> it;
+            private final ByteArrayOutputStream bos;
+            private final DataOutputStream dos;
+            private final RawComparator sortComparator, groupingComparator;
+            public POPackage pack = null;
+            private IllustratorValueIterable iterable = new 
IllustratorValueIterable();
+
+            public IllustratorContextImpl(Job job,
+                  List<Pair<PigNullableWritable, Writable>> input,
+                  POPackage pkg
+                  ) throws IOException, InterruptedException {
+                super(job.getJobConf(), new TaskAttemptID(), new 
FakeRawKeyValueIterator(input.iterator().hasNext()),
+                    null, null, null, null, new IllustrateDummyReporter(), 
null, PigNullableWritable.class, NullableTuple.class);
+                bos = new ByteArrayOutputStream();
+                dos = new DataOutputStream(bos);
+                org.apache.hadoop.mapreduce.Job nwJob = new 
org.apache.hadoop.mapreduce.Job(job.getJobConf());
+                sortComparator = nwJob.getSortComparator();
+                groupingComparator = nwJob.getGroupingComparator();
+                
+                Collections.sort(input, new 
Comparator<Pair<PigNullableWritable, Writable>>() {
+                        @Override
+                        public int compare(Pair<PigNullableWritable, Writable> 
o1,
+                                           Pair<PigNullableWritable, Writable> 
o2) {
+                            try {
+                                o1.first.write(dos);
+                                int l1 = bos.size();
+                                o2.first.write(dos);
+                                int l2 = bos.size();
+                                byte[] bytes = bos.toByteArray();
+                                bos.reset();
+                                return sortComparator.compare(bytes, 0, l1, 
bytes, l1, l2-l1);
+                            } catch (IOException e) {
+                                throw new RuntimeException("Serialization 
exception in sort:"+e.getMessage());
+                            }
+                        }
+                    }
+                );
+                currentValues = new ArrayList<NullableTuple>();
+                it = input.iterator();
+                if (it.hasNext()) {
+                    Pair<PigNullableWritable, Writable> entry = it.next();
+                    nextKey = entry.first;
+                    nextValue = (NullableTuple) entry.second;
+                }
+                pack = pkg;
+            }
+            
+            public class IllustratorValueIterator implements 
ReduceContext.ValueIterator<NullableTuple> {
+                
+                private int pos = -1;
+                private int mark = -1;
+
+                @Override
+                public void mark() throws IOException {
+                    mark=pos-1;
+                    if (mark<-1)
+                        mark=-1;
+                }
+
+                @Override
+                public void reset() throws IOException {
+                    pos=mark;
+                }
+
+                @Override
+                public void clearMark() throws IOException {
+                    mark=-1;
+                }
+
+                @Override
+                public boolean hasNext() {
+                    return pos<currentValues.size()-1;
+                }
+
+                @Override
+                public NullableTuple next() {
+                    pos++;
+                    return currentValues.get(pos);
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException("remove not 
implemented");
+                }
+
+                @Override
+                public void resetBackupStore() throws IOException {
+                    pos=-1;
+                    mark=-1;
+                }
+                
+            }
+            
+            protected class IllustratorValueIterable implements 
Iterable<NullableTuple> {
+                private IllustratorValueIterator iterator = new 
IllustratorValueIterator();
+                @Override
+                public Iterator<NullableTuple> iterator() {
+                    return iterator;
+                } 
+            }
+            
+            @Override
+            public PigNullableWritable getCurrentKey() {
+                return currentKey;
+            }
+            
+            @Override
+            public boolean nextKey() {
+                if (nextKey == null)
+                    return false;
+                currentKey = nextKey;
+                currentValues.clear();
+                currentValues.add(nextValue);
+                nextKey = null;
+                for(; it.hasNext(); ) {
+                    Pair<PigNullableWritable, Writable> entry = it.next();
+                    /* Why can't raw comparison be used?
+                    byte[] bytes;
+                    int l1, l2;
+                    try {
+                        currentKey.write(dos);
+                        l1 = bos.size();
+                        entry.first.write(dos);
+                        l2 = bos.size();
+                        bytes = bos.toByteArray();
+                    } catch (IOException e) {
+                        throw new RuntimeException("nextKey exception : 
"+e.getMessage());
+                    }
+                    bos.reset();
+                    if (groupingComparator.compare(bytes, 0, l1, bytes, l1, 
l2-l1) == 0)
+                    */
+                    if (groupingComparator.compare(currentKey, entry.first) == 
0)
+                    {
+                        currentValues.add((NullableTuple)entry.second);
+                    } else {
+                        nextKey = entry.first;
+                        nextValue = (NullableTuple) entry.second;
+                        break;
+                    }
+                }
+                return true;
+            }
+            
+            @Override
+            public Iterable<NullableTuple> getValues() {
+                return iterable;
+            }
+            
+            @Override
+            public void write(PigNullableWritable k, Writable t) {
+            }
+            
+            @Override
+            public void progress() { 
+            }
+        }
+
+        @Override
+        public boolean 
inIllustrator(org.apache.hadoop.mapreduce.Reducer.Context context) {
+            return (context instanceof 
PigMapReduce.IllustrateReducerContext.IllustratorContext);
+        }
+
+        @Override
+        public POPackage getPack(org.apache.hadoop.mapreduce.Reducer.Context 
context) {
+            return ((PigMapReduce.IllustrateReducerContext.IllustratorContext) 
context).getPack();
+        }
+    }
+}

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
 Fri Feb 24 08:19:42 2017
@@ -18,7 +18,6 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.io.IOException;
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -156,12 +155,7 @@ public class PigOutputCommitter extends
         for (Pair<OutputCommitter, POStore> mapCommitter : 
mapOutputCommitters) {
             if (mapCommitter.first!=null) {
                 try {
-                    // Use reflection, Hadoop 1.x line does not have such 
method
-                    Method m = 
mapCommitter.first.getClass().getMethod("isRecoverySupported");
-                    allOutputCommitterSupportRecovery = 
allOutputCommitterSupportRecovery
-                            && (Boolean)m.invoke(mapCommitter.first);
-                } catch (NoSuchMethodException e) {
-                    allOutputCommitterSupportRecovery = false;
+                    allOutputCommitterSupportRecovery = 
allOutputCommitterSupportRecovery && mapCommitter.first.isRecoverySupported();
                 } catch (Exception e) {
                     throw new RuntimeException(e);
                 }
@@ -173,12 +167,7 @@ public class PigOutputCommitter extends
             reduceOutputCommitters) {
             if (reduceCommitter.first!=null) {
                 try {
-                    // Use reflection, Hadoop 1.x line does not have such 
method
-                    Method m = 
reduceCommitter.first.getClass().getMethod("isRecoverySupported");
-                    allOutputCommitterSupportRecovery = 
allOutputCommitterSupportRecovery
-                            && (Boolean)m.invoke(reduceCommitter.first);
-                } catch (NoSuchMethodException e) {
-                    allOutputCommitterSupportRecovery = false;
+                    allOutputCommitterSupportRecovery = 
allOutputCommitterSupportRecovery && 
reduceCommitter.first.isRecoverySupported();
                 } catch (Exception e) {
                     throw new RuntimeException(e);
                 }
@@ -197,10 +186,7 @@ public class PigOutputCommitter extends
                         mapCommitter.second);
                 try {
                     // Use reflection, Hadoop 1.x line does not have such 
method
-                    Method m = 
mapCommitter.first.getClass().getMethod("recoverTask", 
TaskAttemptContext.class);
-                    m.invoke(mapCommitter.first, updatedContext);
-                } catch (NoSuchMethodException e) {
-                    // We are using Hadoop 1.x, ignore
+                    mapCommitter.first.recoverTask(updatedContext);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -212,11 +198,7 @@ public class PigOutputCommitter extends
                 TaskAttemptContext updatedContext = setUpContext(context,
                         reduceCommitter.second);
                 try {
-                    // Use reflection, Hadoop 1.x line does not have such 
method
-                    Method m = 
reduceCommitter.first.getClass().getMethod("recoverTask", 
TaskAttemptContext.class);
-                    m.invoke(reduceCommitter.first, updatedContext);
-                } catch (NoSuchMethodException e) {
-                    // We are using Hadoop 1.x, ignore
+                    reduceCommitter.first.recoverTask(updatedContext);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -256,10 +238,7 @@ public class PigOutputCommitter extends
                         mapCommitter.second);
                 // PIG-2642 promote files before calling 
storeCleanup/storeSchema 
                 try {
-                    // Use reflection, 20.2 does not have such method
-                    Method m = 
mapCommitter.first.getClass().getMethod("commitJob", JobContext.class);
-                    m.setAccessible(true);
-                    m.invoke(mapCommitter.first, updatedContext);
+                    mapCommitter.first.commitJob(updatedContext);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -273,10 +252,7 @@ public class PigOutputCommitter extends
                         reduceCommitter.second);
                 // PIG-2642 promote files before calling 
storeCleanup/storeSchema 
                 try {
-                    // Use reflection, 20.2 does not have such method
-                    Method m = 
reduceCommitter.first.getClass().getMethod("commitJob", JobContext.class);
-                    m.setAccessible(true);
-                    m.invoke(reduceCommitter.first, updatedContext);
+                    reduceCommitter.first.commitJob(updatedContext);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -293,10 +269,7 @@ public class PigOutputCommitter extends
                 JobContext updatedContext = setUpContext(context,
                         mapCommitter.second);
                 try {
-                    // Use reflection, 20.2 does not have such method
-                    Method m = 
mapCommitter.first.getClass().getMethod("abortJob", JobContext.class, 
State.class);
-                    m.setAccessible(true);
-                    m.invoke(mapCommitter.first, updatedContext, state);
+                    mapCommitter.first.abortJob(updatedContext, state);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -309,10 +282,7 @@ public class PigOutputCommitter extends
                 JobContext updatedContext = setUpContext(context,
                         reduceCommitter.second);
                 try {
-                    // Use reflection, 20.2 does not have such method
-                    Method m = 
reduceCommitter.first.getClass().getMethod("abortJob", JobContext.class, 
State.class);
-                    m.setAccessible(true);
-                    m.invoke(reduceCommitter.first, updatedContext, state);
+                    reduceCommitter.first.abortJob(updatedContext, state);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
 Fri Feb 24 08:19:42 2017
@@ -515,9 +515,11 @@ public class PigSplit extends InputSplit
             for (int i = 0; i < wrappedSplits.length; i++) {
                 st.append("Input split["+i+"]:\n   Length = "+ 
wrappedSplits[i].getLength()+"\n   ClassName: " +
                     wrappedSplits[i].getClass().getName() + "\n   
Locations:\n");
-                for (String location :  wrappedSplits[i].getLocations())
-                    st.append("    "+location+"\n");
-                st.append("\n-----------------------\n");
+                if (wrappedSplits[i]!=null && 
wrappedSplits[i].getLocations()!=null) {
+                    for (String location :  wrappedSplits[i].getLocations())
+                        st.append("    "+location+"\n");
+                    st.append("\n-----------------------\n");
+                }
           }
         } catch (IOException e) {
           return null;

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
 Fri Feb 24 08:19:42 2017
@@ -26,21 +26,21 @@ public class DiscreteProbabilitySampleGe
     Random rGen;
     float[] probVec;
     float epsilon = 0.0001f;
-        
+
     private static final Log LOG = 
LogFactory.getLog(DiscreteProbabilitySampleGenerator.class);
-    
-    public DiscreteProbabilitySampleGenerator(float[] probVec) {
-        rGen = new Random();
+
+    public DiscreteProbabilitySampleGenerator(long seed, float[] probVec) {
+        rGen = new Random(seed);
         float sum = 0.0f;
         for (float f : probVec) {
             sum += f;
         }
         this.probVec = probVec;
-        if (1-epsilon > sum || sum > 1+epsilon) { 
+        if (1-epsilon > sum || sum > 1+epsilon) {
             LOG.info("Sum of probabilities should be near one: " + sum);
         }
     }
-    
+
     public int getNext(){
         double toss = rGen.nextDouble();
         // if the uniformly random number that I generated
@@ -57,13 +57,13 @@ public class DiscreteProbabilitySampleGe
             toss -= probVec[i];
             if(toss<=0.0)
                 return i;
-        }        
+        }
         return lastIdx;
     }
-    
+
     public static void main(String[] args) {
         float[] vec = { 0, 0.3f, 0.2f, 0, 0, 0.5f };
-        DiscreteProbabilitySampleGenerator gen = new 
DiscreteProbabilitySampleGenerator(vec);
+        DiscreteProbabilitySampleGenerator gen = new 
DiscreteProbabilitySampleGenerator(11317, vec);
         CountingMap<Integer> cm = new CountingMap<Integer>();
         for(int i=0;i<100;i++){
             cm.put(gen.getNext(), 1);
@@ -75,6 +75,6 @@ public class DiscreteProbabilitySampleGe
     public String toString() {
         return Arrays.toString(probVec);
     }
-    
-    
+
+
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
 Fri Feb 24 08:19:42 2017
@@ -17,7 +17,6 @@
  */
 package 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -31,13 +30,13 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalMap;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.io.NullableBigDecimalWritable;
 import org.apache.pig.impl.io.NullableBigIntegerWritable;
@@ -52,7 +51,6 @@ import org.apache.pig.impl.io.NullableTe
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.io.ReadToEndLoader;
-import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Utils;
 
 public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, 
Writable>
@@ -62,7 +60,6 @@ public class WeightedRangePartitioner ex
             new HashMap<PigNullableWritable, 
DiscreteProbabilitySampleGenerator>();
     protected PigNullableWritable[] quantiles;
     protected RawComparator<PigNullableWritable> comparator;
-    private PigContext pigContext;
     protected Configuration job;
 
     protected boolean inited = false;
@@ -93,11 +90,6 @@ public class WeightedRangePartitioner ex
     @SuppressWarnings("unchecked")
     public void init() {
         weightedParts = new HashMap<PigNullableWritable, 
DiscreteProbabilitySampleGenerator>();
-        try {
-            pigContext = 
(PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to deserialize pig context: ", 
e);
-        }
 
         String quantilesFile = job.get("pig.quantilesFile", "");
         if (quantilesFile.length() == 0) {
@@ -109,10 +101,10 @@ public class WeightedRangePartitioner ex
             // use local file system to get the quantilesFile
             Map<String, Object> quantileMap = null;
             Configuration conf;
-            if (!pigContext.getExecType().isLocal()) {
-                conf = 
ConfigurationUtil.toConfiguration(pigContext.getProperties());
-            } else {
+            if (job.getBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, 
false)) {
                 conf = new Configuration(false);
+            } else {
+                conf = new Configuration(job);
             }
             if (job.get("fs.file.impl") != null) {
                 conf.set("fs.file.impl", job.get("fs.file.impl"));
@@ -138,11 +130,13 @@ public class WeightedRangePartitioner ex
                 DataBag quantilesList = (DataBag) 
quantileMap.get(FindQuantiles.QUANTILES_LIST);
                 InternalMap weightedPartsData = (InternalMap) 
quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
                 convertToArray(quantilesList);
+                long taskIdHashCode = 
job.get(MRConfiguration.TASK_ID).hashCode();
+                long randomSeed = ((long)taskIdHashCode << 32) | 
(taskIdHashCode & 0xffffffffL);
                 for (Entry<Object, Object> ent : weightedPartsData.entrySet()) 
{
                     Tuple key = (Tuple)ent.getKey(); // sample item which 
repeats
                     float[] probVec = getProbVec((Tuple)ent.getValue());
                     weightedParts.put(getPigNullableWritable(key),
-                            new DiscreteProbabilitySampleGenerator(probVec));
+                            new DiscreteProbabilitySampleGenerator(randomSeed, 
probVec));
                 }
             }
             // else - the quantiles file is empty - unless we have a bug, the

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
 Fri Feb 24 08:19:42 2017
@@ -21,14 +21,16 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -105,7 +107,7 @@ public class EndOfAllInputSetter extends
         public void visitReservoirSample(POReservoirSample reservoirSample) 
throws VisitorException {
             endOfAllInputFlag = true;
         }
-        
+
         @Override
         public void visitPoissonSample(POPoissonSample poissonSample) throws 
VisitorException {
             endOfAllInputFlag = true;
@@ -122,6 +124,13 @@ public class EndOfAllInputSetter extends
             }
         }
 
+        @Override
+        public void visitLocalRearrange(POLocalRearrange lr) throws 
VisitorException{
+            if (lr instanceof POBuildBloomRearrangeTez) {
+                endOfAllInputFlag = true;
+            }
+            super.visitLocalRearrange(lr);
+        }
 
         /**
          * @return if end of all input is present

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java
 Fri Feb 24 08:19:42 2017
@@ -27,7 +27,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
-import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -43,7 +43,7 @@ public class MRPrinter extends MROpPlanV
      * @param plan MR plan to print
      */
     public MRPrinter(PrintStream ps, MROperPlan plan) {
-        super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+        super(plan, new DependencyOrderWalker<MapReduceOper, MROperPlan>(plan, 
true));
         mStream = ps;
         mStream.println("#--------------------------------------------------");
         mStream.println("# Map Reduce Plan                                  ");

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
 Fri Feb 24 08:19:42 2017
@@ -441,6 +441,10 @@ public abstract class PhysicalOperator e
     public void reset() {
     }
 
+    public boolean isEndOfAllInput() {
+        return parentPlan.endOfAllInput;
+    }
+
     /**
      * @return PigProgressable stored in threadlocal
      */

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java
 Fri Feb 24 08:19:42 2017
@@ -19,7 +19,10 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.math.RoundingMode;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigWarning;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -36,6 +39,8 @@ public class Divide extends BinaryExpres
      *
      */
     private static final long serialVersionUID = 1L;
+    public static final short BIGDECIMAL_MINIMAL_SCALE = 6;
+    private static final Log LOG = LogFactory.getLog(Divide.class);
 
     public Divide(OperatorKey k) {
         super(k);
@@ -72,12 +77,22 @@ public class Divide extends BinaryExpres
         case DataType.BIGINTEGER:
             return ((BigInteger) a).divide((BigInteger) b);
         case DataType.BIGDECIMAL:
-            return ((BigDecimal) a).divide((BigDecimal) b);
+            return bigDecimalDivideWithScale(a, b);
         default:
             throw new ExecException("called on unsupported Number class " + 
DataType.findTypeName(dataType));
         }
     }
 
+    private Number bigDecimalDivideWithScale(Number a, Number b) {
+        // Using same result scaling as Hive. See Arithmetic Rules:
+        //   
https://cwiki.apache.org/confluence/download/attachments/27362075/Hive_Decimal_Precision_Scale_Support.pdf
+        int resultScale = Math.max(BIGDECIMAL_MINIMAL_SCALE, 
((BigDecimal)a).scale() + ((BigDecimal)b).precision() + 1);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("For bigdecimal divide: using " + resultScale + " as 
result scale.");
+        }
+        return ((BigDecimal)a).divide((BigDecimal)b, resultScale, 
RoundingMode.HALF_UP);
+    }
+
     /*
      * This method is used to invoke the appropriate method, as Java does not 
provide generic
      * dispatch for it.

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
 Fri Feb 24 08:19:42 2017
@@ -28,6 +28,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
@@ -89,6 +90,8 @@ public class POCast extends ExpressionOp
                 caster = ((LoadFunc)obj).getLoadCaster();
             } else if (obj instanceof StreamToPig) {
                 caster = ((StreamToPig)obj).getLoadCaster();
+            } else if (obj instanceof EvalFunc) {
+                caster = ((EvalFunc)obj).getLoadCaster();
             } else {
                 throw new IOException("Invalid class type "
                         + funcSpec.getClassName());
@@ -165,7 +168,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToBigInteger(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + 
"BigInteger.";
+                        String msg = unknownByteArrayErrorMessage + 
"BigInteger for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, 
PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -281,7 +284,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToBigDecimal(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + 
"BigDecimal.";
+                        String msg = unknownByteArrayErrorMessage + 
"BigDecimal for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, 
PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -396,7 +399,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToBoolean(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "boolean.";
+                        String msg = unknownByteArrayErrorMessage + "boolean 
for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, 
PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -510,7 +513,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToInteger(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "int.";
+                        String msg = unknownByteArrayErrorMessage + "int for " 
+ this.getOriginalLocations();
                         throw new ExecException(msg, errCode, 
PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -636,7 +639,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToLong(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "long.";
+                        String msg = unknownByteArrayErrorMessage + "long for 
" + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, 
PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -759,7 +762,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToDouble(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "double.";
+                        String msg = unknownByteArrayErrorMessage + "double 
for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, 
PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -881,7 +884,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToFloat(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "float.";
+                        String msg = unknownByteArrayErrorMessage + "float for 
" + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, 
PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -1007,7 +1010,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToDateTime(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + 
"datetime.";
+                        String msg = unknownByteArrayErrorMessage + "datetime 
for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, 
PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -1118,7 +1121,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToCharArray(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "string.";
+                        String msg = unknownByteArrayErrorMessage + "string 
for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, 
PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -1270,7 +1273,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToTuple(dba.get(), 
fieldSchema);
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "tuple.";
+                        String msg = unknownByteArrayErrorMessage + "tuple for 
" + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, 
PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -1332,7 +1335,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToBag(((DataByteArray)obj).get(), fs);
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "bag.";
+                    String msg = unknownByteArrayErrorMessage + "bag for " + 
this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
             } else {
@@ -1363,7 +1366,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToTuple(((DataByteArray)obj).get(), 
fs);
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "tuple.";
+                    String msg = unknownByteArrayErrorMessage + "tuple for " + 
this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
             } else {
@@ -1388,7 +1391,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToMap(((DataByteArray)obj).get(), fs);
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "tuple.";
+                    String msg = unknownByteArrayErrorMessage + "tuple for " + 
this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
             } else {
@@ -1402,7 +1405,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToBoolean(((DataByteArray) 
obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "int.";
+                    String msg = unknownByteArrayErrorMessage + "int for " + 
this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1441,7 +1444,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToInteger(((DataByteArray) 
obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "int.";
+                    String msg = unknownByteArrayErrorMessage + "int for " + 
this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1487,7 +1490,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToDouble(((DataByteArray) obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "double.";
+                    String msg = unknownByteArrayErrorMessage + "double for " 
+ this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1533,7 +1536,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToLong(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "long.";
+                    String msg = unknownByteArrayErrorMessage + "long for " + 
this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1579,7 +1582,7 @@ public class POCast extends ExpressionOp
                     result = caster.bytesToFloat(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "float.";
+                    String msg = unknownByteArrayErrorMessage + "float for " + 
this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1625,7 +1628,7 @@ public class POCast extends ExpressionOp
                     result = 
caster.bytesToDateTime(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "datetime.";
+                    String msg = unknownByteArrayErrorMessage + "datetime for 
" + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1664,7 +1667,7 @@ public class POCast extends ExpressionOp
                     result = 
caster.bytesToCharArray(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "float.";
+                    String msg = unknownByteArrayErrorMessage + "float for " + 
this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1712,7 +1715,7 @@ public class POCast extends ExpressionOp
                     result = 
caster.bytesToBigInteger(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "BigInteger.";
+                    String msg = unknownByteArrayErrorMessage + "BigInteger 
for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1757,7 +1760,7 @@ public class POCast extends ExpressionOp
                     result = 
caster.bytesToBigDecimal(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "BigDecimal.";
+                    String msg = unknownByteArrayErrorMessage + "BigDecimal 
for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1795,6 +1798,10 @@ public class POCast extends ExpressionOp
             default:
                 throw new ExecException("Cannot convert "+ obj + " to " + fs, 
1120, PigException.INPUT);
             }
+        case DataType.BYTEARRAY:
+            //no-op (PIG-4933)
+            result = obj;
+            break;
         default:
             throw new ExecException("Don't know how to convert "+ obj + " to " 
+ fs, 1120, PigException.INPUT);
         }
@@ -1861,7 +1868,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToBag(dba.get(), fieldSchema);
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "bag.";
+                        String msg = unknownByteArrayErrorMessage + "bag for " 
+ this.getOriginalLocations();
                         throw new ExecException(msg, errCode, 
PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -1952,7 +1959,7 @@ public class POCast extends ExpressionOp
                         res.result = caster.bytesToMap(dba.get(), fieldSchema);
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "map.";
+                        String msg = unknownByteArrayErrorMessage + "map for " 
+ this.getOriginalLocations();
                         throw new ExecException(msg, errCode, 
PigException.INPUT);
                     }
                 } catch (ExecException ee) {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
 Fri Feb 24 08:19:42 2017
@@ -158,23 +158,19 @@ public class POProject extends Expressio
             illustratorMarkup(inpValue, res.result, -1);
             return res;
         } else if(columns.size() == 1) {
-            try {
+            if ( inpValue == null ) {
+                // the tuple is null, so a dereference should also produce a 
null
+                res.returnStatus = POStatus.STATUS_OK;
+                ret = null;
+            } else if( inpValue.size() > columns.get(0) ) {
                 ret = inpValue.get(columns.get(0));
-            } catch (IndexOutOfBoundsException ie) {
+            } else {
                 if(pigLogger != null) {
                     pigLogger.warn(this,"Attempt to access field " +
                             "which was not found in the input", 
PigWarning.ACCESSING_NON_EXISTENT_FIELD);
                 }
                 res.returnStatus = POStatus.STATUS_OK;
                 ret = null;
-            } catch (NullPointerException npe) {
-                // the tuple is null, so a dereference should also produce a 
null
-                // there is a slight danger here that the Tuple implementation
-                // may have given the exception for a different reason but if 
we
-                // don't catch it, we will die and the most common case for the
-                // exception would be because the tuple is null
-                res.returnStatus = POStatus.STATUS_OK;
-                ret = null;
             }
         } else if(isProjectToEnd){
             ret = getRangeTuple(inpValue);
@@ -215,23 +211,18 @@ public class POProject extends Expressio
      */
     private void addColumn(ArrayList<Object> objList, Tuple inpValue, int i)
     throws ExecException {
-        try {
+        if( inpValue == null ) {
+            // the tuple is null, so a dereference should also produce a null
+            objList.add(null);
+        } else if( inpValue.size() > i ) {
             objList.add(inpValue.get(i));
-        } catch (IndexOutOfBoundsException ie) {
+        } else {
             if(pigLogger != null) {
                 pigLogger.warn(this,"Attempt to access field " + i +
                         " which was not found in the input", 
PigWarning.ACCESSING_NON_EXISTENT_FIELD);
             }
             objList.add(null);
         }
-        catch (NullPointerException npe) {
-            // the tuple is null, so a dereference should also produce a null
-            // there is a slight danger here that the Tuple implementation
-            // may have given the exception for a different reason but if we
-            // don't catch it, we will die and the most common case for the
-            // exception would be because the tuple is null
-            objList.add(null);
-        }
     }
 
     @Override
@@ -406,21 +397,17 @@ public class POProject extends Expressio
             Object ret;
 
             if(columns.size() == 1) {
-                try{
+                if( inpValue == null ) {
+                    // the tuple is null, so a dereference should also produce 
a null
+                    ret = null;
+                } else if( inpValue.size() > columns.get(0) ) {
                     ret = inpValue.get(columns.get(0));
-                } catch (IndexOutOfBoundsException ie) {
+                } else {
                     if(pigLogger != null) {
                         pigLogger.warn(this,"Attempt to access field " +
                                 "which was not found in the input", 
PigWarning.ACCESSING_NON_EXISTENT_FIELD);
                     }
                     ret = null;
-                } catch (NullPointerException npe) {
-                    // the tuple is null, so a dereference should also produce 
a null
-                    // there is a slight danger here that the Tuple 
implementation
-                    // may have given the exception for a different reason but 
if we
-                    // don't catch it, we will die and the most common case 
for the
-                    // exception would be because the tuple is null
-                    ret = null;
                 }
             } else if(isProjectToEnd) {
                 ret = getRangeTuple(inpValue);
@@ -428,21 +415,17 @@ public class POProject extends Expressio
                 ArrayList<Object> objList = new 
ArrayList<Object>(columns.size());
 
                 for(int col: columns) {
-                    try {
+                    if( inpValue == null ) {
+                        // the tuple is null, so a dereference should also 
produce a null
+                        objList.add(null);
+                    } else if( inpValue.size() > col ) {
                         objList.add(inpValue.get(col));
-                    } catch (IndexOutOfBoundsException ie) {
+                    } else {
                         if(pigLogger != null) {
                             pigLogger.warn(this,"Attempt to access field " +
                                     "which was not found in the input", 
PigWarning.ACCESSING_NON_EXISTENT_FIELD);
                         }
                         objList.add(null);
-                    } catch (NullPointerException npe) {
-                        // the tuple is null, so a dereference should also 
produce a null
-                        // there is a slight danger here that the Tuple 
implementation
-                        // may have given the exception for a different reason 
but if we
-                        // don't catch it, we will die and the most common 
case for the
-                        // exception would be because the tuple is null
-                        objList.add(null);
                     }
                 }
                 ret = mTupleFactory.newTuple(objList);

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
 Fri Feb 24 08:19:42 2017
@@ -49,7 +49,7 @@ public class CombinerPackager extends Pa
     private Map<Integer, Integer> keyLookup;
 
     private int numBags;
-    
+
     private transient boolean initialized;
     private transient boolean useDefaultBag;
 
@@ -77,6 +77,15 @@ public class CombinerPackager extends Pa
         }
     }
 
+    @Override
+    public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+            throws ExecException {
+        this.key = key;
+        this.bags = bags;
+        this.readOnce = readOnce;
+        // Bag can be read directly and need not be materialized again
+    }
+
     /**
      * @param keyInfo the keyInfo to set
      */

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
 Fri Feb 24 08:19:42 2017
@@ -17,7 +17,7 @@
  */
 
 /**
- * 
+ *
  */
 package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
@@ -28,6 +28,7 @@ import java.util.Map;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -48,6 +49,15 @@ public class LitePackager extends Packag
     private PigNullableWritable keyWritable;
 
     @Override
+    public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+            throws ExecException {
+        this.key = key;
+        this.bags = bags;
+        this.readOnce = readOnce;
+        // Bag can be read directly and need not be materialized again
+    }
+
+    @Override
     public boolean[] getInner() {
         return null;
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
 Fri Feb 24 08:19:42 2017
@@ -256,4 +256,9 @@ public class POCross extends PhysicalOpe
         data = null;
     }
 
+    @Override
+    public void reset() {
+        clearMemory();
+    }
+
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
 Fri Feb 24 08:19:42 2017
@@ -97,7 +97,7 @@ public class POFRJoin extends PhysicalOp
 
     // The array of Hashtables one per replicated input. replicates[fragment] =
     // null fragment is the input which is fragmented and not replicated.
-    protected transient TupleToMapKey replicates[];
+    protected transient List<Map<? extends Object, ? extends List<Tuple>>> 
replicates;
     // varaible which denotes whether we are returning tuples from the foreach
     // operator
     protected transient boolean processingPlan;
@@ -234,7 +234,10 @@ public class POFRJoin extends PhysicalOp
         Result res = null;
         Result inp = null;
         if (!setUp) {
-            replicates = new TupleToMapKey[phyPlanLists.size()];
+            replicates = new ArrayList<Map<? extends Object, ? extends 
List<Tuple>>>(phyPlanLists.size());
+            for (int i = 0 ; i < phyPlanLists.size(); i++) {
+                replicates.add(null);
+            }
             dumTup = mTupleFactory.newTuple(1);
             setUpHashMap();
             setUp = true;
@@ -282,8 +285,7 @@ public class POFRJoin extends PhysicalOp
                 return new Result();
             }
             Tuple lrOutTuple = (Tuple) lrOut.result;
-            Tuple key = mTupleFactory.newTuple(1);
-            key.set(0, lrOutTuple.get(1));
+            Object key = lrOutTuple.get(1);
             Tuple value = getValueTuple(lr, lrOutTuple);
             lr.detachInput();
             // Configure the for each operator with the relevant bags
@@ -296,7 +298,7 @@ public class POFRJoin extends PhysicalOp
                     ce.setValue(value);
                     continue;
                 }
-                TupleToMapKey replicate = replicates[i];
+                Map<? extends Object, ? extends List<Tuple>> replicate = 
replicates.get(i);
                 if (replicate.get(key) == null) {
                     if (isLeftOuterJoin) {
                         ce.setValue(nullBag);
@@ -304,7 +306,7 @@ public class POFRJoin extends PhysicalOp
                     noMatch = true;
                     break;
                 }
-                ce.setValue(new 
NonSpillableDataBag(replicate.get(key).getList()));
+                ce.setValue(new NonSpillableDataBag(replicate.get(key)));
             }
 
             // If this is not LeftOuter Join and there was no match we
@@ -327,27 +329,28 @@ public class POFRJoin extends PhysicalOp
         }
     }
 
-    protected static class TupleToMapKey {
-        private HashMap<Tuple, TuplesToSchemaTupleList> tuples;
+    protected static class TupleToMapKey extends HashMap<Object, 
ArrayList<Tuple>> {
         private SchemaTupleFactory tf;
 
         public TupleToMapKey(int ct, SchemaTupleFactory tf) {
-            tuples = new HashMap<Tuple, TuplesToSchemaTupleList>(ct);
+            super(ct);
             this.tf = tf;
         }
 
-        public TuplesToSchemaTupleList put(Tuple key, TuplesToSchemaTupleList 
val) {
-            if (tf != null) {
-                key = TuplesToSchemaTupleList.convert(key, tf);
+        @Override
+        public TuplesToSchemaTupleList put(Object key, ArrayList<Tuple> val) {
+            if (tf != null && key instanceof Tuple) {
+                key = TuplesToSchemaTupleList.convert((Tuple)key, tf);
             }
-            return tuples.put(key, val);
+            return (TuplesToSchemaTupleList) super.put(key, val);
         }
 
-        public TuplesToSchemaTupleList get(Tuple key) {
-            if (tf != null) {
-                key = TuplesToSchemaTupleList.convert(key, tf);
+        @Override
+        public TuplesToSchemaTupleList get(Object key) {
+            if (tf != null && key instanceof Tuple) {
+                key = TuplesToSchemaTupleList.convert((Tuple)key, tf);
             }
-            return tuples.get(key);
+            return (TuplesToSchemaTupleList) super.get(key);
         }
     }
 
@@ -382,7 +385,7 @@ public class POFRJoin extends PhysicalOp
             SchemaTupleFactory keySchemaTupleFactory = 
keySchemaTupleFactories[i];
 
             if (i == fragment) {
-                replicates[i] = null;
+                replicates.set(i, null);
                 continue;
             }
 
@@ -401,25 +404,34 @@ public class POFRJoin extends PhysicalOp
             POLocalRearrange lr = LRs[i];
             lr.setInputs(Arrays.asList((PhysicalOperator) ld));
 
-            TupleToMapKey replicate = new TupleToMapKey(1000, 
keySchemaTupleFactory);
+            Map<Object, ArrayList<Tuple>> replicate;
+            if (keySchemaTupleFactory == null) {
+                replicate = new HashMap<Object, ArrayList<Tuple>>(1000);
+            } else {
+                replicate = new TupleToMapKey(1000, keySchemaTupleFactory);
+            }
 
             log.debug("Completed setup. Trying to build replication hash 
table");
             for (Result res = lr.getNextTuple(); res.returnStatus != 
POStatus.STATUS_EOP; res = lr.getNextTuple()) {
                 if (getReporter() != null)
                     getReporter().progress();
                 Tuple tuple = (Tuple) res.result;
-                if (isKeyNull(tuple.get(1))) continue;
-                Tuple key = mTupleFactory.newTuple(1);
-                key.set(0, tuple.get(1));
+                Object key = tuple.get(1);
+                if (isKeyNull(key)) continue;
                 Tuple value = getValueTuple(lr, tuple);
 
-                if (replicate.get(key) == null) {
-                    replicate.put(key, new TuplesToSchemaTupleList(1, 
inputSchemaTupleFactory));
+                ArrayList<Tuple> values = replicate.get(key);
+                if (values == null) {
+                    if (inputSchemaTupleFactory == null) {
+                        values = new ArrayList<Tuple>(1);
+                    } else {
+                        values = new TuplesToSchemaTupleList(1, 
inputSchemaTupleFactory);
+                    }
+                    replicate.put(key, values);
                 }
-
-                replicate.get(key).add(value);
+                values.add(value);
             }
-            replicates[i] = replicate;
+            replicates.set(i, replicate);
         }
         long time2 = System.currentTimeMillis();
         log.debug("Hash Table built. Time taken: " + (time2 - time1));

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java
 Fri Feb 24 08:19:42 2017
@@ -51,7 +51,7 @@ public class POFRJoinSpark extends POFRJ
             addSchemaToFactories(keySchemas[i], keySchemaTupleFactories, i);
         }
 
-        replicates[fragment] = null;
+        replicates.set(fragment, null);
         int i = -1;
         long start = System.currentTimeMillis();
         for (int k = 0; k < inputSchemas.length; ++k) {
@@ -61,7 +61,7 @@ public class POFRJoinSpark extends POFRJ
             SchemaTupleFactory keySchemaTupleFactory = 
keySchemaTupleFactories[i];
 
             if (i == fragment) {
-                replicates[i] = null;
+                replicates.set(fragment, null);
                 continue;
             }
 
@@ -91,7 +91,7 @@ public class POFRJoinSpark extends POFRJ
                 replicate.get(key).add(value);
 
             }
-            replicates[i] = replicate;
+            replicates.set(i, replicate);
         }
         long end = System.currentTimeMillis();
         log.debug("Hash Table built. Time taken: " + (end - start));

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
 Fri Feb 24 08:19:42 2017
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -55,6 +56,7 @@ import org.apache.pig.pen.util.LineageTr
 @SuppressWarnings("unchecked")
 public class POForEach extends PhysicalOperator {
     private static final long serialVersionUID = 1L;
+    private static final Result UNLIMITED_NULL_RESULT = new 
Result(POStatus.STATUS_OK, new UnlimitedNullTuple());
 
     protected List<PhysicalPlan> inputPlans;
 
@@ -264,7 +266,7 @@ public class POForEach extends PhysicalO
                 if (inp.returnStatus == POStatus.STATUS_EOP) {
                     if (parentPlan!=null && parentPlan.endOfAllInput && 
!endOfAllInputProcessed && endOfAllInputProcessing) {
                         // continue pull one more output
-                        inp = new Result(POStatus.STATUS_OK, new 
UnlimitedNullTuple());
+                        inp = UNLIMITED_NULL_RESULT;
                     } else {
                         return inp;
                     }
@@ -441,6 +443,8 @@ public class POForEach extends PhysicalO
 
                 if(inputData.result instanceof DataBag && 
isToBeFlattenedArray[i]) {
                     its[i] = ((DataBag)bags[i]).iterator();
+                } else if (inputData.result instanceof Map && 
isToBeFlattenedArray[i]) {
+                    its[i] = ((Map)bags[i]).entrySet().iterator();
                 } else {
                     its[i] = null;
                 }
@@ -466,7 +470,7 @@ public class POForEach extends PhysicalO
                 //we instantiate the template array and start populating it 
with data
                 data = new Object[noItems];
                 for(int i = 0; i < noItems; ++i) {
-                    if(isToBeFlattenedArray[i] && bags[i] instanceof DataBag) {
+                    if(isToBeFlattenedArray[i] && (bags[i] instanceof DataBag 
|| bags[i] instanceof Map)) {
                         if(its[i].hasNext()) {
                             data[i] = its[i].next();
                         } else {
@@ -540,6 +544,15 @@ public class POForEach extends PhysicalO
                     out.append(t.get(j));
                 }
                 }
+            } else if (isToBeFlattenedArray[i] && in instanceof Map.Entry) {
+                Map.Entry entry = (Map.Entry)in;
+                if (knownSize) {
+                    out.set(idx++, entry.getKey());
+                    out.set(idx++, entry.getValue());
+                } else {
+                    out.append(entry.getKey());
+                    out.append(entry.getValue());
+                }
             } else {
                 if (knownSize) {
                     out.set(idx++, in);
@@ -738,9 +751,12 @@ public class POForEach extends PhysicalO
             opsToBeReset.add(sort);
         }
 
-        /* (non-Javadoc)
-         * @see 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject)
-         */
+        @Override
+        public void visitCross(POCross c) throws VisitorException {
+            // FIXME: add only if limit is present
+            opsToBeReset.add(c);
+        }
+
         @Override
         public void visitProject(POProject proj) throws VisitorException {
             if(proj instanceof PORelationToExprProject) {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 Fri Feb 24 08:19:42 2017
@@ -56,11 +56,11 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 
-/** This operator implements merge join algorithm to do map side joins. 
+/** This operator implements merge join algorithm to do map side joins.
  *  Currently, only two-way joins are supported. One input of join is 
identified as left
  *  and other is identified as right. Left input tuples are the input records 
in map.
  *  Right tuples are read from HDFS by opening right stream.
- *  
+ *
  *    This join doesn't support outer join.
  *    Data is assumed to be sorted in ascending order. It will fail if data is 
sorted in descending order.
  */
@@ -99,7 +99,7 @@ public class POMergeJoin extends Physica
     private FuncSpec rightLoaderFuncSpec;
 
     private String rightInputFileName;
-    
+
     private String indexFile;
 
     // Buffer to hold accumulated left tuples.
@@ -249,12 +249,11 @@ public class POMergeJoin extends Physica
      * from Tuple to SchemaTuple. This is necessary because we are not getting 
SchemaTuples
      * from the source, though in the future that is what we would like to do.
      */
-    public static class TuplesToSchemaTupleList {
-        private List<Tuple> tuples;
+    public static class TuplesToSchemaTupleList extends ArrayList<Tuple> {
         private SchemaTupleFactory tf;
 
         public TuplesToSchemaTupleList(int ct, TupleMaker<?> tf) {
-            tuples = new ArrayList<Tuple>(ct);
+            super(ct);
             if (tf instanceof SchemaTupleFactory) {
                 this.tf = (SchemaTupleFactory)tf;
             }
@@ -273,24 +272,24 @@ public class POMergeJoin extends Physica
             }
         }
 
+        @Override
         public boolean add(Tuple t) {
             if (tf != null) {
                 t = convert(t, tf);
             }
-            return tuples.add(t);
+            return super.add(t);
         }
 
+        @Override
         public Tuple get(int i) {
-            return tuples.get(i);
+            return super.get(i);
         }
 
+        @Override
         public int size() {
-            return tuples.size();
+            return super.size();
         }
 
-        public List<Tuple> getList() {
-            return tuples;
-        }
     }
 
     @SuppressWarnings("unchecked")
@@ -357,7 +356,7 @@ public class POMergeJoin extends Physica
                 }
                 else{
                     Object rightKey = extractKeysFromTuple(rightInp, 1);
-                    if(null == rightKey) // If we see tuple having null keys 
in stream, we drop them 
+                    if(null == rightKey) // If we see tuple having null keys 
in stream, we drop them
                         continue;       // and fetch next tuple.
 
                     int cmpval = ((Comparable)rightKey).compareTo(curJoinKey);
@@ -399,7 +398,7 @@ public class POMergeJoin extends Physica
                             "Last two tuples encountered were: \n"+
                         curJoiningRightTup+ "\n" + (Tuple)rightInp.result ;
                         throw new ExecException(errMsg,errCode);
-                    }    
+                    }
                 }
             }
         }
@@ -430,7 +429,7 @@ public class POMergeJoin extends Physica
                 prevLeftKey+ "\n" + curLeftKey ;
                 throw new ExecException(errMsg,errCode);
             }
- 
+
         case POStatus.STATUS_EOP:
             if(this.parentPlan.endOfAllInput || isEndOfInput()){
                 // We hit the end on left input. 
@@ -487,17 +486,17 @@ public class POMergeJoin extends Physica
                 slidingToNextRecord = false;
             } else
                 rightInp = getNextRightInp(prevLeftKey);
-                
+
             if(rightInp.returnStatus != POStatus.STATUS_OK)
                 return rightInp;
 
             Object extractedRightKey = extractKeysFromTuple(rightInp, 1);
-            
-            if(null == extractedRightKey) // If we see tuple having null keys 
in stream, we drop them 
+
+            if(null == extractedRightKey) // If we see tuple having null keys 
in stream, we drop them
                 continue;       // and fetch next tuple.
-            
+
             Comparable rightKey = (Comparable)extractedRightKey;
-            
+
             if( prevRightKey != null && rightKey.compareTo(prevRightKey) < 0){
                 // Sanity check.
                 int errCode = 1102;
@@ -528,7 +527,7 @@ public class POMergeJoin extends Physica
             else{    // We got ahead on right side. Store currently read right 
tuple.
                 prevRightKey = rightKey;
                 prevRightInp = rightInp;
-                // Since we didn't find any matching right tuple we throw away 
the buffered left tuples and add the one read in this function call. 
+                // Since we didn't find any matching right tuple we throw away 
the buffered left tuples and add the one read in this function call.
                 leftTuples = newLeftTupleArray();
                 leftTuples.add((Tuple)curLeftInp.result);
                 prevLeftInp = curLeftInp;
@@ -555,7 +554,7 @@ public class POMergeJoin extends Physica
             DefaultIndexableLoader loader = 
(DefaultIndexableLoader)rightLoader;
             loader.setIndexFile(indexFile);
         }
-        
+
         // Pass signature of the loader to rightLoader
         // make a copy of the conf to use in calls to rightLoader.
         rightLoader.setUDFContextSignature(signature);
@@ -608,11 +607,11 @@ public class POMergeJoin extends Physica
                         // run the tuple through the pipeline
                         rightPipelineRoot.attachInput(t);
                         return this.getNextRightInp();
-                        
+
                     }
                     default: // We don't deal with ERR/NULL. just pass them 
down
                         throwProcessingException(false, null);
-                        
+
                 }
             }
         } catch (IOException e) {
@@ -643,8 +642,8 @@ public class POMergeJoin extends Physica
             int errCode = 2167;
             String errMsg = "LocalRearrange used to extract keys from tuple 
isn't configured correctly";
             throw new ExecException(errMsg,errCode,PigException.BUG);
-        } 
-          
+        }
+
         return ((Tuple) lrOut.result).get(1);
     }
 
@@ -660,7 +659,7 @@ public class POMergeJoin extends Physica
             noInnerPlanOnRightSide = false;
             this.rightPipelineLeaf = rightPipeline.getLeaves().get(0);
             this.rightPipelineRoot = rightPipeline.getRoots().get(0);
-            this.rightPipelineRoot.setInputs(null);            
+            this.rightPipelineRoot.setInputs(null);
         }
         else
             noInnerPlanOnRightSide = true;
@@ -711,18 +710,18 @@ public class POMergeJoin extends Physica
     public boolean supportsMultipleOutputs() {
         return false;
     }
-    
+
     /**
      * @param rightInputFileName the rightInputFileName to set
      */
     public void setRightInputFileName(String rightInputFileName) {
         this.rightInputFileName = rightInputFileName;
     }
-    
+
     public String getSignature() {
         return signature;
     }
-    
+
     public void setSignature(String signature) {
         this.signature = signature;
     }
@@ -734,12 +733,12 @@ public class POMergeJoin extends Physica
     public String getIndexFile() {
         return indexFile;
     }
-    
+
     @Override
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
         return null;
     }
-    
+
     public LOJoin.JOINTYPE getJoinType() {
         return joinType;
     }


Reply via email to