Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=1784237&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (added) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Fri Feb 24 08:19:42 2017 @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + + +import java.io.IOException; + +import java.net.URI; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configuration.IntegerRanges; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.StatusReporter; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.Reducer.Context; +import org.apache.hadoop.mapreduce.lib.map.WrappedMapper; +import org.apache.hadoop.mapreduce.task.MapContextImpl; +import org.apache.hadoop.security.Credentials; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.pig.impl.util.Pair; +import org.apache.hadoop.mapreduce.lib.map.WrappedMapper; + +abstract public class PigMapBase extends PigGenericMapBase { + /** + * + * Get mapper's illustrator context + * + * @param conf Configuration + * @param input Input bag to serve as data source + * @param output Map output buffer + * @param split the split + * @return Illustrator's context + * @throws IOException + * @throws InterruptedException + */ + @Override + public Context getIllustratorContext(Configuration conf, DataBag input, + List<Pair<PigNullableWritable, Writable>> output, InputSplit split) + throws IOException, InterruptedException { + org.apache.hadoop.mapreduce.Mapper.Context mapperContext = new WrappedMapper<Text, Tuple, PigNullableWritable, Writable>().getMapContext(new IllustratorContext(conf, input, output, split)); + return mapperContext; + } + + public class IllustratorContext extends MapContextImpl<Text, Tuple, PigNullableWritable, Writable> { + private DataBag input; + List<Pair<PigNullableWritable, Writable>> output; + private Iterator<Tuple> it = null; + private Tuple value = null; + private boolean init = false; + + public IllustratorContext(Configuration conf, DataBag input, + List<Pair<PigNullableWritable, Writable>> output, + InputSplit split) throws IOException, InterruptedException { + super(conf, new TaskAttemptID(), null, null, null, new IllustrateDummyReporter(), split); + conf.set("inIllustrator", "true"); + if (output == null) + throw new IOException("Null output can not be used"); + this.input = input; this.output = output; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (input == null) { + if (!init) { + init = true; + return true; + } + return false; + } + if (it == null) + it = input.iterator(); + if (!it.hasNext()) + return false; + value = it.next(); + return true; + } + + @Override + public Text getCurrentKey() { + return null; + } + + @Override + public Tuple getCurrentValue() { + return value; + } + + @Override + public void write(PigNullableWritable key, Writable value) + throws IOException, InterruptedException { + output.add(new Pair<PigNullableWritable, Writable>(key, value)); + } + + @Override + public void progress() { + + } + } + + @Override + public boolean inIllustrator(Context context) { + return ((WrappedMapper.Context)context).getConfiguration().get("inIllustrator")!=null; + } +}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=1784237&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (added) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Fri Feb 24 08:19:42 2017 @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configuration.IntegerRanges; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.jobcontrol.Job; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.ReduceContext; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.Reducer.Context; +import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer; +import org.apache.hadoop.mapreduce.task.ReduceContextImpl; +import org.apache.hadoop.security.Credentials; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.IllustratorContext; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.NullableTuple; +import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.pig.impl.util.Pair; +import org.apache.pig.pen.FakeRawKeyValueIterator; + +public class PigMapReduce extends PigGenericMapReduce { + + static class IllustrateReducerContext extends WrappedReducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> { + public IllustratorContext + getReducerContext(ReduceContext<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> reduceContext) { + return new IllustratorContext(reduceContext); + } + + public class IllustratorContext + extends WrappedReducer.Context { + public IllustratorContext( + ReduceContext<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> reduceContext) { + super(reduceContext); + } + public POPackage getPack() { + return ((Reduce.IllustratorContextImpl)reduceContext).pack; + } + } + } + + public static class Reduce extends PigGenericMapReduce.Reduce { + /** + * Get reducer's illustrator context + * + * @param input Input buffer as output by maps + * @param pkg package + * @return reducer's illustrator context + * @throws IOException + * @throws InterruptedException + */ + @Override + public Context getIllustratorContext(Job job, + List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException { + org.apache.hadoop.mapreduce.Reducer.Context reducerContext = new IllustrateReducerContext() + .getReducerContext(new IllustratorContextImpl(job, input, pkg)); + return reducerContext; + } + + @SuppressWarnings("unchecked") + public class IllustratorContextImpl extends ReduceContextImpl<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> { + private PigNullableWritable currentKey = null, nextKey = null; + private NullableTuple nextValue = null; + private List<NullableTuple> currentValues = null; + private Iterator<Pair<PigNullableWritable, Writable>> it; + private final ByteArrayOutputStream bos; + private final DataOutputStream dos; + private final RawComparator sortComparator, groupingComparator; + public POPackage pack = null; + private IllustratorValueIterable iterable = new IllustratorValueIterable(); + + public IllustratorContextImpl(Job job, + List<Pair<PigNullableWritable, Writable>> input, + POPackage pkg + ) throws IOException, InterruptedException { + super(job.getJobConf(), new TaskAttemptID(), new FakeRawKeyValueIterator(input.iterator().hasNext()), + null, null, null, null, new IllustrateDummyReporter(), null, PigNullableWritable.class, NullableTuple.class); + bos = new ByteArrayOutputStream(); + dos = new DataOutputStream(bos); + org.apache.hadoop.mapreduce.Job nwJob = new org.apache.hadoop.mapreduce.Job(job.getJobConf()); + sortComparator = nwJob.getSortComparator(); + groupingComparator = nwJob.getGroupingComparator(); + + Collections.sort(input, new Comparator<Pair<PigNullableWritable, Writable>>() { + @Override + public int compare(Pair<PigNullableWritable, Writable> o1, + Pair<PigNullableWritable, Writable> o2) { + try { + o1.first.write(dos); + int l1 = bos.size(); + o2.first.write(dos); + int l2 = bos.size(); + byte[] bytes = bos.toByteArray(); + bos.reset(); + return sortComparator.compare(bytes, 0, l1, bytes, l1, l2-l1); + } catch (IOException e) { + throw new RuntimeException("Serialization exception in sort:"+e.getMessage()); + } + } + } + ); + currentValues = new ArrayList<NullableTuple>(); + it = input.iterator(); + if (it.hasNext()) { + Pair<PigNullableWritable, Writable> entry = it.next(); + nextKey = entry.first; + nextValue = (NullableTuple) entry.second; + } + pack = pkg; + } + + public class IllustratorValueIterator implements ReduceContext.ValueIterator<NullableTuple> { + + private int pos = -1; + private int mark = -1; + + @Override + public void mark() throws IOException { + mark=pos-1; + if (mark<-1) + mark=-1; + } + + @Override + public void reset() throws IOException { + pos=mark; + } + + @Override + public void clearMark() throws IOException { + mark=-1; + } + + @Override + public boolean hasNext() { + return pos<currentValues.size()-1; + } + + @Override + public NullableTuple next() { + pos++; + return currentValues.get(pos); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove not implemented"); + } + + @Override + public void resetBackupStore() throws IOException { + pos=-1; + mark=-1; + } + + } + + protected class IllustratorValueIterable implements Iterable<NullableTuple> { + private IllustratorValueIterator iterator = new IllustratorValueIterator(); + @Override + public Iterator<NullableTuple> iterator() { + return iterator; + } + } + + @Override + public PigNullableWritable getCurrentKey() { + return currentKey; + } + + @Override + public boolean nextKey() { + if (nextKey == null) + return false; + currentKey = nextKey; + currentValues.clear(); + currentValues.add(nextValue); + nextKey = null; + for(; it.hasNext(); ) { + Pair<PigNullableWritable, Writable> entry = it.next(); + /* Why can't raw comparison be used? + byte[] bytes; + int l1, l2; + try { + currentKey.write(dos); + l1 = bos.size(); + entry.first.write(dos); + l2 = bos.size(); + bytes = bos.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("nextKey exception : "+e.getMessage()); + } + bos.reset(); + if (groupingComparator.compare(bytes, 0, l1, bytes, l1, l2-l1) == 0) + */ + if (groupingComparator.compare(currentKey, entry.first) == 0) + { + currentValues.add((NullableTuple)entry.second); + } else { + nextKey = entry.first; + nextValue = (NullableTuple) entry.second; + break; + } + } + return true; + } + + @Override + public Iterable<NullableTuple> getValues() { + return iterable; + } + + @Override + public void write(PigNullableWritable k, Writable t) { + } + + @Override + public void progress() { + } + } + + @Override + public boolean inIllustrator(org.apache.hadoop.mapreduce.Reducer.Context context) { + return (context instanceof PigMapReduce.IllustrateReducerContext.IllustratorContext); + } + + @Override + public POPackage getPack(org.apache.hadoop.mapreduce.Reducer.Context context) { + return ((PigMapReduce.IllustrateReducerContext.IllustratorContext) context).getPack(); + } + } +} Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Fri Feb 24 08:19:42 2017 @@ -18,7 +18,6 @@ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; import java.io.IOException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; @@ -156,12 +155,7 @@ public class PigOutputCommitter extends for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) { if (mapCommitter.first!=null) { try { - // Use reflection, Hadoop 1.x line does not have such method - Method m = mapCommitter.first.getClass().getMethod("isRecoverySupported"); - allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery - && (Boolean)m.invoke(mapCommitter.first); - } catch (NoSuchMethodException e) { - allOutputCommitterSupportRecovery = false; + allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && mapCommitter.first.isRecoverySupported(); } catch (Exception e) { throw new RuntimeException(e); } @@ -173,12 +167,7 @@ public class PigOutputCommitter extends reduceOutputCommitters) { if (reduceCommitter.first!=null) { try { - // Use reflection, Hadoop 1.x line does not have such method - Method m = reduceCommitter.first.getClass().getMethod("isRecoverySupported"); - allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery - && (Boolean)m.invoke(reduceCommitter.first); - } catch (NoSuchMethodException e) { - allOutputCommitterSupportRecovery = false; + allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && reduceCommitter.first.isRecoverySupported(); } catch (Exception e) { throw new RuntimeException(e); } @@ -197,10 +186,7 @@ public class PigOutputCommitter extends mapCommitter.second); try { // Use reflection, Hadoop 1.x line does not have such method - Method m = mapCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class); - m.invoke(mapCommitter.first, updatedContext); - } catch (NoSuchMethodException e) { - // We are using Hadoop 1.x, ignore + mapCommitter.first.recoverTask(updatedContext); } catch (Exception e) { throw new IOException(e); } @@ -212,11 +198,7 @@ public class PigOutputCommitter extends TaskAttemptContext updatedContext = setUpContext(context, reduceCommitter.second); try { - // Use reflection, Hadoop 1.x line does not have such method - Method m = reduceCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class); - m.invoke(reduceCommitter.first, updatedContext); - } catch (NoSuchMethodException e) { - // We are using Hadoop 1.x, ignore + reduceCommitter.first.recoverTask(updatedContext); } catch (Exception e) { throw new IOException(e); } @@ -256,10 +238,7 @@ public class PigOutputCommitter extends mapCommitter.second); // PIG-2642 promote files before calling storeCleanup/storeSchema try { - // Use reflection, 20.2 does not have such method - Method m = mapCommitter.first.getClass().getMethod("commitJob", JobContext.class); - m.setAccessible(true); - m.invoke(mapCommitter.first, updatedContext); + mapCommitter.first.commitJob(updatedContext); } catch (Exception e) { throw new IOException(e); } @@ -273,10 +252,7 @@ public class PigOutputCommitter extends reduceCommitter.second); // PIG-2642 promote files before calling storeCleanup/storeSchema try { - // Use reflection, 20.2 does not have such method - Method m = reduceCommitter.first.getClass().getMethod("commitJob", JobContext.class); - m.setAccessible(true); - m.invoke(reduceCommitter.first, updatedContext); + reduceCommitter.first.commitJob(updatedContext); } catch (Exception e) { throw new IOException(e); } @@ -293,10 +269,7 @@ public class PigOutputCommitter extends JobContext updatedContext = setUpContext(context, mapCommitter.second); try { - // Use reflection, 20.2 does not have such method - Method m = mapCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class); - m.setAccessible(true); - m.invoke(mapCommitter.first, updatedContext, state); + mapCommitter.first.abortJob(updatedContext, state); } catch (Exception e) { throw new IOException(e); } @@ -309,10 +282,7 @@ public class PigOutputCommitter extends JobContext updatedContext = setUpContext(context, reduceCommitter.second); try { - // Use reflection, 20.2 does not have such method - Method m = reduceCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class); - m.setAccessible(true); - m.invoke(reduceCommitter.first, updatedContext, state); + reduceCommitter.first.abortJob(updatedContext, state); } catch (Exception e) { throw new IOException(e); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Fri Feb 24 08:19:42 2017 @@ -515,9 +515,11 @@ public class PigSplit extends InputSplit for (int i = 0; i < wrappedSplits.length; i++) { st.append("Input split["+i+"]:\n Length = "+ wrappedSplits[i].getLength()+"\n ClassName: " + wrappedSplits[i].getClass().getName() + "\n Locations:\n"); - for (String location : wrappedSplits[i].getLocations()) - st.append(" "+location+"\n"); - st.append("\n-----------------------\n"); + if (wrappedSplits[i]!=null && wrappedSplits[i].getLocations()!=null) { + for (String location : wrappedSplits[i].getLocations()) + st.append(" "+location+"\n"); + st.append("\n-----------------------\n"); + } } } catch (IOException e) { return null; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java Fri Feb 24 08:19:42 2017 @@ -26,21 +26,21 @@ public class DiscreteProbabilitySampleGe Random rGen; float[] probVec; float epsilon = 0.0001f; - + private static final Log LOG = LogFactory.getLog(DiscreteProbabilitySampleGenerator.class); - - public DiscreteProbabilitySampleGenerator(float[] probVec) { - rGen = new Random(); + + public DiscreteProbabilitySampleGenerator(long seed, float[] probVec) { + rGen = new Random(seed); float sum = 0.0f; for (float f : probVec) { sum += f; } this.probVec = probVec; - if (1-epsilon > sum || sum > 1+epsilon) { + if (1-epsilon > sum || sum > 1+epsilon) { LOG.info("Sum of probabilities should be near one: " + sum); } } - + public int getNext(){ double toss = rGen.nextDouble(); // if the uniformly random number that I generated @@ -57,13 +57,13 @@ public class DiscreteProbabilitySampleGe toss -= probVec[i]; if(toss<=0.0) return i; - } + } return lastIdx; } - + public static void main(String[] args) { float[] vec = { 0, 0.3f, 0.2f, 0, 0, 0.5f }; - DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(vec); + DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(11317, vec); CountingMap<Integer> cm = new CountingMap<Integer>(); for(int i=0;i<100;i++){ cm.put(gen.getNext(), 1); @@ -75,6 +75,6 @@ public class DiscreteProbabilitySampleGe public String toString() { return Arrays.toString(probVec); } - - + + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Fri Feb 24 08:19:42 2017 @@ -17,7 +17,6 @@ */ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -31,13 +30,13 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.HDataType; -import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.DataBag; import org.apache.pig.data.InternalMap; import org.apache.pig.data.Tuple; -import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.builtin.FindQuantiles; import org.apache.pig.impl.io.NullableBigDecimalWritable; import org.apache.pig.impl.io.NullableBigIntegerWritable; @@ -52,7 +51,6 @@ import org.apache.pig.impl.io.NullableTe import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; import org.apache.pig.impl.io.ReadToEndLoader; -import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.Utils; public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable> @@ -62,7 +60,6 @@ public class WeightedRangePartitioner ex new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>(); protected PigNullableWritable[] quantiles; protected RawComparator<PigNullableWritable> comparator; - private PigContext pigContext; protected Configuration job; protected boolean inited = false; @@ -93,11 +90,6 @@ public class WeightedRangePartitioner ex @SuppressWarnings("unchecked") public void init() { weightedParts = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>(); - try { - pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext")); - } catch (IOException e) { - throw new RuntimeException("Failed to deserialize pig context: ", e); - } String quantilesFile = job.get("pig.quantilesFile", ""); if (quantilesFile.length() == 0) { @@ -109,10 +101,10 @@ public class WeightedRangePartitioner ex // use local file system to get the quantilesFile Map<String, Object> quantileMap = null; Configuration conf; - if (!pigContext.getExecType().isLocal()) { - conf = ConfigurationUtil.toConfiguration(pigContext.getProperties()); - } else { + if (job.getBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, false)) { conf = new Configuration(false); + } else { + conf = new Configuration(job); } if (job.get("fs.file.impl") != null) { conf.set("fs.file.impl", job.get("fs.file.impl")); @@ -138,11 +130,13 @@ public class WeightedRangePartitioner ex DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST); InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS); convertToArray(quantilesList); + long taskIdHashCode = job.get(MRConfiguration.TASK_ID).hashCode(); + long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL); for (Entry<Object, Object> ent : weightedPartsData.entrySet()) { Tuple key = (Tuple)ent.getKey(); // sample item which repeats float[] probVec = getProbVec((Tuple)ent.getValue()); weightedParts.put(getPigNullableWritable(key), - new DiscreteProbabilitySampleGenerator(probVec)); + new DiscreteProbabilitySampleGenerator(randomSeed, probVec)); } } // else - the quantiles file is empty - unless we have a bug, the Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Fri Feb 24 08:19:42 2017 @@ -21,14 +21,16 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.VisitorException; @@ -105,7 +107,7 @@ public class EndOfAllInputSetter extends public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException { endOfAllInputFlag = true; } - + @Override public void visitPoissonSample(POPoissonSample poissonSample) throws VisitorException { endOfAllInputFlag = true; @@ -122,6 +124,13 @@ public class EndOfAllInputSetter extends } } + @Override + public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{ + if (lr instanceof POBuildBloomRearrangeTez) { + endOfAllInputFlag = true; + } + super.visitLocalRearrange(lr); + } /** * @return if end of all input is present Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java Fri Feb 24 08:19:42 2017 @@ -27,7 +27,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter; -import org.apache.pig.impl.plan.DepthFirstWalker; +import org.apache.pig.impl.plan.DependencyOrderWalker; import org.apache.pig.impl.plan.VisitorException; /** @@ -43,7 +43,7 @@ public class MRPrinter extends MROpPlanV * @param plan MR plan to print */ public MRPrinter(PrintStream ps, MROperPlan plan) { - super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan)); + super(plan, new DependencyOrderWalker<MapReduceOper, MROperPlan>(plan, true)); mStream = ps; mStream.println("#--------------------------------------------------"); mStream.println("# Map Reduce Plan "); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Fri Feb 24 08:19:42 2017 @@ -441,6 +441,10 @@ public abstract class PhysicalOperator e public void reset() { } + public boolean isEndOfAllInput() { + return parentPlan.endOfAllInput; + } + /** * @return PigProgressable stored in threadlocal */ Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java Fri Feb 24 08:19:42 2017 @@ -19,7 +19,10 @@ package org.apache.pig.backend.hadoop.ex import java.math.BigDecimal; import java.math.BigInteger; +import java.math.RoundingMode; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.pig.PigWarning; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; @@ -36,6 +39,8 @@ public class Divide extends BinaryExpres * */ private static final long serialVersionUID = 1L; + public static final short BIGDECIMAL_MINIMAL_SCALE = 6; + private static final Log LOG = LogFactory.getLog(Divide.class); public Divide(OperatorKey k) { super(k); @@ -72,12 +77,22 @@ public class Divide extends BinaryExpres case DataType.BIGINTEGER: return ((BigInteger) a).divide((BigInteger) b); case DataType.BIGDECIMAL: - return ((BigDecimal) a).divide((BigDecimal) b); + return bigDecimalDivideWithScale(a, b); default: throw new ExecException("called on unsupported Number class " + DataType.findTypeName(dataType)); } } + private Number bigDecimalDivideWithScale(Number a, Number b) { + // Using same result scaling as Hive. See Arithmetic Rules: + // https://cwiki.apache.org/confluence/download/attachments/27362075/Hive_Decimal_Precision_Scale_Support.pdf + int resultScale = Math.max(BIGDECIMAL_MINIMAL_SCALE, ((BigDecimal)a).scale() + ((BigDecimal)b).precision() + 1); + if (LOG.isDebugEnabled()) { + LOG.debug("For bigdecimal divide: using " + resultScale + " as result scale."); + } + return ((BigDecimal)a).divide((BigDecimal)b, resultScale, RoundingMode.HALF_UP); + } + /* * This method is used to invoke the appropriate method, as Java does not provide generic * dispatch for it. Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Fri Feb 24 08:19:42 2017 @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.EvalFunc; import org.apache.pig.FuncSpec; import org.apache.pig.LoadCaster; import org.apache.pig.LoadFunc; @@ -89,6 +90,8 @@ public class POCast extends ExpressionOp caster = ((LoadFunc)obj).getLoadCaster(); } else if (obj instanceof StreamToPig) { caster = ((StreamToPig)obj).getLoadCaster(); + } else if (obj instanceof EvalFunc) { + caster = ((EvalFunc)obj).getLoadCaster(); } else { throw new IOException("Invalid class type " + funcSpec.getClassName()); @@ -165,7 +168,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToBigInteger(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "BigInteger."; + String msg = unknownByteArrayErrorMessage + "BigInteger for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -281,7 +284,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToBigDecimal(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "BigDecimal."; + String msg = unknownByteArrayErrorMessage + "BigDecimal for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -396,7 +399,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToBoolean(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "boolean."; + String msg = unknownByteArrayErrorMessage + "boolean for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -510,7 +513,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToInteger(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "int."; + String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -636,7 +639,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToLong(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "long."; + String msg = unknownByteArrayErrorMessage + "long for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -759,7 +762,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToDouble(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "double."; + String msg = unknownByteArrayErrorMessage + "double for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -881,7 +884,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToFloat(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "float."; + String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -1007,7 +1010,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToDateTime(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "datetime."; + String msg = unknownByteArrayErrorMessage + "datetime for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -1118,7 +1121,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToCharArray(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "string."; + String msg = unknownByteArrayErrorMessage + "string for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -1270,7 +1273,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToTuple(dba.get(), fieldSchema); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "tuple."; + String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -1332,7 +1335,7 @@ public class POCast extends ExpressionOp result = caster.bytesToBag(((DataByteArray)obj).get(), fs); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "bag."; + String msg = unknownByteArrayErrorMessage + "bag for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } else { @@ -1363,7 +1366,7 @@ public class POCast extends ExpressionOp result = caster.bytesToTuple(((DataByteArray)obj).get(), fs); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "tuple."; + String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } else { @@ -1388,7 +1391,7 @@ public class POCast extends ExpressionOp result = caster.bytesToMap(((DataByteArray)obj).get(), fs); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "tuple."; + String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } else { @@ -1402,7 +1405,7 @@ public class POCast extends ExpressionOp result = caster.bytesToBoolean(((DataByteArray) obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "int."; + String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1441,7 +1444,7 @@ public class POCast extends ExpressionOp result = caster.bytesToInteger(((DataByteArray) obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "int."; + String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1487,7 +1490,7 @@ public class POCast extends ExpressionOp result = caster.bytesToDouble(((DataByteArray) obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "double."; + String msg = unknownByteArrayErrorMessage + "double for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1533,7 +1536,7 @@ public class POCast extends ExpressionOp result = caster.bytesToLong(((DataByteArray)obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "long."; + String msg = unknownByteArrayErrorMessage + "long for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1579,7 +1582,7 @@ public class POCast extends ExpressionOp result = caster.bytesToFloat(((DataByteArray)obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "float."; + String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1625,7 +1628,7 @@ public class POCast extends ExpressionOp result = caster.bytesToDateTime(((DataByteArray)obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "datetime."; + String msg = unknownByteArrayErrorMessage + "datetime for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1664,7 +1667,7 @@ public class POCast extends ExpressionOp result = caster.bytesToCharArray(((DataByteArray)obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "float."; + String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1712,7 +1715,7 @@ public class POCast extends ExpressionOp result = caster.bytesToBigInteger(((DataByteArray)obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "BigInteger."; + String msg = unknownByteArrayErrorMessage + "BigInteger for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1757,7 +1760,7 @@ public class POCast extends ExpressionOp result = caster.bytesToBigDecimal(((DataByteArray)obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "BigDecimal."; + String msg = unknownByteArrayErrorMessage + "BigDecimal for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1795,6 +1798,10 @@ public class POCast extends ExpressionOp default: throw new ExecException("Cannot convert "+ obj + " to " + fs, 1120, PigException.INPUT); } + case DataType.BYTEARRAY: + //no-op (PIG-4933) + result = obj; + break; default: throw new ExecException("Don't know how to convert "+ obj + " to " + fs, 1120, PigException.INPUT); } @@ -1861,7 +1868,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToBag(dba.get(), fieldSchema); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "bag."; + String msg = unknownByteArrayErrorMessage + "bag for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -1952,7 +1959,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToMap(dba.get(), fieldSchema); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "map."; + String msg = unknownByteArrayErrorMessage + "map for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Fri Feb 24 08:19:42 2017 @@ -158,23 +158,19 @@ public class POProject extends Expressio illustratorMarkup(inpValue, res.result, -1); return res; } else if(columns.size() == 1) { - try { + if ( inpValue == null ) { + // the tuple is null, so a dereference should also produce a null + res.returnStatus = POStatus.STATUS_OK; + ret = null; + } else if( inpValue.size() > columns.get(0) ) { ret = inpValue.get(columns.get(0)); - } catch (IndexOutOfBoundsException ie) { + } else { if(pigLogger != null) { pigLogger.warn(this,"Attempt to access field " + "which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD); } res.returnStatus = POStatus.STATUS_OK; ret = null; - } catch (NullPointerException npe) { - // the tuple is null, so a dereference should also produce a null - // there is a slight danger here that the Tuple implementation - // may have given the exception for a different reason but if we - // don't catch it, we will die and the most common case for the - // exception would be because the tuple is null - res.returnStatus = POStatus.STATUS_OK; - ret = null; } } else if(isProjectToEnd){ ret = getRangeTuple(inpValue); @@ -215,23 +211,18 @@ public class POProject extends Expressio */ private void addColumn(ArrayList<Object> objList, Tuple inpValue, int i) throws ExecException { - try { + if( inpValue == null ) { + // the tuple is null, so a dereference should also produce a null + objList.add(null); + } else if( inpValue.size() > i ) { objList.add(inpValue.get(i)); - } catch (IndexOutOfBoundsException ie) { + } else { if(pigLogger != null) { pigLogger.warn(this,"Attempt to access field " + i + " which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD); } objList.add(null); } - catch (NullPointerException npe) { - // the tuple is null, so a dereference should also produce a null - // there is a slight danger here that the Tuple implementation - // may have given the exception for a different reason but if we - // don't catch it, we will die and the most common case for the - // exception would be because the tuple is null - objList.add(null); - } } @Override @@ -406,21 +397,17 @@ public class POProject extends Expressio Object ret; if(columns.size() == 1) { - try{ + if( inpValue == null ) { + // the tuple is null, so a dereference should also produce a null + ret = null; + } else if( inpValue.size() > columns.get(0) ) { ret = inpValue.get(columns.get(0)); - } catch (IndexOutOfBoundsException ie) { + } else { if(pigLogger != null) { pigLogger.warn(this,"Attempt to access field " + "which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD); } ret = null; - } catch (NullPointerException npe) { - // the tuple is null, so a dereference should also produce a null - // there is a slight danger here that the Tuple implementation - // may have given the exception for a different reason but if we - // don't catch it, we will die and the most common case for the - // exception would be because the tuple is null - ret = null; } } else if(isProjectToEnd) { ret = getRangeTuple(inpValue); @@ -428,21 +415,17 @@ public class POProject extends Expressio ArrayList<Object> objList = new ArrayList<Object>(columns.size()); for(int col: columns) { - try { + if( inpValue == null ) { + // the tuple is null, so a dereference should also produce a null + objList.add(null); + } else if( inpValue.size() > col ) { objList.add(inpValue.get(col)); - } catch (IndexOutOfBoundsException ie) { + } else { if(pigLogger != null) { pigLogger.warn(this,"Attempt to access field " + "which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD); } objList.add(null); - } catch (NullPointerException npe) { - // the tuple is null, so a dereference should also produce a null - // there is a slight danger here that the Tuple implementation - // may have given the exception for a different reason but if we - // don't catch it, we will die and the most common case for the - // exception would be because the tuple is null - objList.add(null); } } ret = mTupleFactory.newTuple(objList); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Fri Feb 24 08:19:42 2017 @@ -49,7 +49,7 @@ public class CombinerPackager extends Pa private Map<Integer, Integer> keyLookup; private int numBags; - + private transient boolean initialized; private transient boolean useDefaultBag; @@ -77,6 +77,15 @@ public class CombinerPackager extends Pa } } + @Override + public void attachInput(Object key, DataBag[] bags, boolean[] readOnce) + throws ExecException { + this.key = key; + this.bags = bags; + this.readOnce = readOnce; + // Bag can be read directly and need not be materialized again + } + /** * @param keyInfo the keyInfo to set */ Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java Fri Feb 24 08:19:42 2017 @@ -17,7 +17,7 @@ */ /** - * + * */ package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; @@ -48,6 +49,15 @@ public class LitePackager extends Packag private PigNullableWritable keyWritable; @Override + public void attachInput(Object key, DataBag[] bags, boolean[] readOnce) + throws ExecException { + this.key = key; + this.bags = bags; + this.readOnce = readOnce; + // Bag can be read directly and need not be materialized again + } + + @Override public boolean[] getInner() { return null; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java Fri Feb 24 08:19:42 2017 @@ -256,4 +256,9 @@ public class POCross extends PhysicalOpe data = null; } + @Override + public void reset() { + clearMemory(); + } + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Fri Feb 24 08:19:42 2017 @@ -97,7 +97,7 @@ public class POFRJoin extends PhysicalOp // The array of Hashtables one per replicated input. replicates[fragment] = // null fragment is the input which is fragmented and not replicated. - protected transient TupleToMapKey replicates[]; + protected transient List<Map<? extends Object, ? extends List<Tuple>>> replicates; // varaible which denotes whether we are returning tuples from the foreach // operator protected transient boolean processingPlan; @@ -234,7 +234,10 @@ public class POFRJoin extends PhysicalOp Result res = null; Result inp = null; if (!setUp) { - replicates = new TupleToMapKey[phyPlanLists.size()]; + replicates = new ArrayList<Map<? extends Object, ? extends List<Tuple>>>(phyPlanLists.size()); + for (int i = 0 ; i < phyPlanLists.size(); i++) { + replicates.add(null); + } dumTup = mTupleFactory.newTuple(1); setUpHashMap(); setUp = true; @@ -282,8 +285,7 @@ public class POFRJoin extends PhysicalOp return new Result(); } Tuple lrOutTuple = (Tuple) lrOut.result; - Tuple key = mTupleFactory.newTuple(1); - key.set(0, lrOutTuple.get(1)); + Object key = lrOutTuple.get(1); Tuple value = getValueTuple(lr, lrOutTuple); lr.detachInput(); // Configure the for each operator with the relevant bags @@ -296,7 +298,7 @@ public class POFRJoin extends PhysicalOp ce.setValue(value); continue; } - TupleToMapKey replicate = replicates[i]; + Map<? extends Object, ? extends List<Tuple>> replicate = replicates.get(i); if (replicate.get(key) == null) { if (isLeftOuterJoin) { ce.setValue(nullBag); @@ -304,7 +306,7 @@ public class POFRJoin extends PhysicalOp noMatch = true; break; } - ce.setValue(new NonSpillableDataBag(replicate.get(key).getList())); + ce.setValue(new NonSpillableDataBag(replicate.get(key))); } // If this is not LeftOuter Join and there was no match we @@ -327,27 +329,28 @@ public class POFRJoin extends PhysicalOp } } - protected static class TupleToMapKey { - private HashMap<Tuple, TuplesToSchemaTupleList> tuples; + protected static class TupleToMapKey extends HashMap<Object, ArrayList<Tuple>> { private SchemaTupleFactory tf; public TupleToMapKey(int ct, SchemaTupleFactory tf) { - tuples = new HashMap<Tuple, TuplesToSchemaTupleList>(ct); + super(ct); this.tf = tf; } - public TuplesToSchemaTupleList put(Tuple key, TuplesToSchemaTupleList val) { - if (tf != null) { - key = TuplesToSchemaTupleList.convert(key, tf); + @Override + public TuplesToSchemaTupleList put(Object key, ArrayList<Tuple> val) { + if (tf != null && key instanceof Tuple) { + key = TuplesToSchemaTupleList.convert((Tuple)key, tf); } - return tuples.put(key, val); + return (TuplesToSchemaTupleList) super.put(key, val); } - public TuplesToSchemaTupleList get(Tuple key) { - if (tf != null) { - key = TuplesToSchemaTupleList.convert(key, tf); + @Override + public TuplesToSchemaTupleList get(Object key) { + if (tf != null && key instanceof Tuple) { + key = TuplesToSchemaTupleList.convert((Tuple)key, tf); } - return tuples.get(key); + return (TuplesToSchemaTupleList) super.get(key); } } @@ -382,7 +385,7 @@ public class POFRJoin extends PhysicalOp SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[i]; if (i == fragment) { - replicates[i] = null; + replicates.set(i, null); continue; } @@ -401,25 +404,34 @@ public class POFRJoin extends PhysicalOp POLocalRearrange lr = LRs[i]; lr.setInputs(Arrays.asList((PhysicalOperator) ld)); - TupleToMapKey replicate = new TupleToMapKey(1000, keySchemaTupleFactory); + Map<Object, ArrayList<Tuple>> replicate; + if (keySchemaTupleFactory == null) { + replicate = new HashMap<Object, ArrayList<Tuple>>(1000); + } else { + replicate = new TupleToMapKey(1000, keySchemaTupleFactory); + } log.debug("Completed setup. Trying to build replication hash table"); for (Result res = lr.getNextTuple(); res.returnStatus != POStatus.STATUS_EOP; res = lr.getNextTuple()) { if (getReporter() != null) getReporter().progress(); Tuple tuple = (Tuple) res.result; - if (isKeyNull(tuple.get(1))) continue; - Tuple key = mTupleFactory.newTuple(1); - key.set(0, tuple.get(1)); + Object key = tuple.get(1); + if (isKeyNull(key)) continue; Tuple value = getValueTuple(lr, tuple); - if (replicate.get(key) == null) { - replicate.put(key, new TuplesToSchemaTupleList(1, inputSchemaTupleFactory)); + ArrayList<Tuple> values = replicate.get(key); + if (values == null) { + if (inputSchemaTupleFactory == null) { + values = new ArrayList<Tuple>(1); + } else { + values = new TuplesToSchemaTupleList(1, inputSchemaTupleFactory); + } + replicate.put(key, values); } - - replicate.get(key).add(value); + values.add(value); } - replicates[i] = replicate; + replicates.set(i, replicate); } long time2 = System.currentTimeMillis(); log.debug("Hash Table built. Time taken: " + (time2 - time1)); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java Fri Feb 24 08:19:42 2017 @@ -51,7 +51,7 @@ public class POFRJoinSpark extends POFRJ addSchemaToFactories(keySchemas[i], keySchemaTupleFactories, i); } - replicates[fragment] = null; + replicates.set(fragment, null); int i = -1; long start = System.currentTimeMillis(); for (int k = 0; k < inputSchemas.length; ++k) { @@ -61,7 +61,7 @@ public class POFRJoinSpark extends POFRJ SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[i]; if (i == fragment) { - replicates[i] = null; + replicates.set(fragment, null); continue; } @@ -91,7 +91,7 @@ public class POFRJoinSpark extends POFRJ replicate.get(key).add(value); } - replicates[i] = replicate; + replicates.set(i, replicate); } long end = System.currentTimeMillis(); log.debug("Hash Table built. Time taken: " + (end - start)); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Fri Feb 24 08:19:42 2017 @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; @@ -55,6 +56,7 @@ import org.apache.pig.pen.util.LineageTr @SuppressWarnings("unchecked") public class POForEach extends PhysicalOperator { private static final long serialVersionUID = 1L; + private static final Result UNLIMITED_NULL_RESULT = new Result(POStatus.STATUS_OK, new UnlimitedNullTuple()); protected List<PhysicalPlan> inputPlans; @@ -264,7 +266,7 @@ public class POForEach extends PhysicalO if (inp.returnStatus == POStatus.STATUS_EOP) { if (parentPlan!=null && parentPlan.endOfAllInput && !endOfAllInputProcessed && endOfAllInputProcessing) { // continue pull one more output - inp = new Result(POStatus.STATUS_OK, new UnlimitedNullTuple()); + inp = UNLIMITED_NULL_RESULT; } else { return inp; } @@ -441,6 +443,8 @@ public class POForEach extends PhysicalO if(inputData.result instanceof DataBag && isToBeFlattenedArray[i]) { its[i] = ((DataBag)bags[i]).iterator(); + } else if (inputData.result instanceof Map && isToBeFlattenedArray[i]) { + its[i] = ((Map)bags[i]).entrySet().iterator(); } else { its[i] = null; } @@ -466,7 +470,7 @@ public class POForEach extends PhysicalO //we instantiate the template array and start populating it with data data = new Object[noItems]; for(int i = 0; i < noItems; ++i) { - if(isToBeFlattenedArray[i] && bags[i] instanceof DataBag) { + if(isToBeFlattenedArray[i] && (bags[i] instanceof DataBag || bags[i] instanceof Map)) { if(its[i].hasNext()) { data[i] = its[i].next(); } else { @@ -540,6 +544,15 @@ public class POForEach extends PhysicalO out.append(t.get(j)); } } + } else if (isToBeFlattenedArray[i] && in instanceof Map.Entry) { + Map.Entry entry = (Map.Entry)in; + if (knownSize) { + out.set(idx++, entry.getKey()); + out.set(idx++, entry.getValue()); + } else { + out.append(entry.getKey()); + out.append(entry.getValue()); + } } else { if (knownSize) { out.set(idx++, in); @@ -738,9 +751,12 @@ public class POForEach extends PhysicalO opsToBeReset.add(sort); } - /* (non-Javadoc) - * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject) - */ + @Override + public void visitCross(POCross c) throws VisitorException { + // FIXME: add only if limit is present + opsToBeReset.add(c); + } + @Override public void visitProject(POProject proj) throws VisitorException { if(proj instanceof PORelationToExprProject) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Fri Feb 24 08:19:42 2017 @@ -56,11 +56,11 @@ import org.apache.pig.impl.plan.VisitorE import org.apache.pig.impl.util.MultiMap; import org.apache.pig.newplan.logical.relational.LOJoin; -/** This operator implements merge join algorithm to do map side joins. +/** This operator implements merge join algorithm to do map side joins. * Currently, only two-way joins are supported. One input of join is identified as left * and other is identified as right. Left input tuples are the input records in map. * Right tuples are read from HDFS by opening right stream. - * + * * This join doesn't support outer join. * Data is assumed to be sorted in ascending order. It will fail if data is sorted in descending order. */ @@ -99,7 +99,7 @@ public class POMergeJoin extends Physica private FuncSpec rightLoaderFuncSpec; private String rightInputFileName; - + private String indexFile; // Buffer to hold accumulated left tuples. @@ -249,12 +249,11 @@ public class POMergeJoin extends Physica * from Tuple to SchemaTuple. This is necessary because we are not getting SchemaTuples * from the source, though in the future that is what we would like to do. */ - public static class TuplesToSchemaTupleList { - private List<Tuple> tuples; + public static class TuplesToSchemaTupleList extends ArrayList<Tuple> { private SchemaTupleFactory tf; public TuplesToSchemaTupleList(int ct, TupleMaker<?> tf) { - tuples = new ArrayList<Tuple>(ct); + super(ct); if (tf instanceof SchemaTupleFactory) { this.tf = (SchemaTupleFactory)tf; } @@ -273,24 +272,24 @@ public class POMergeJoin extends Physica } } + @Override public boolean add(Tuple t) { if (tf != null) { t = convert(t, tf); } - return tuples.add(t); + return super.add(t); } + @Override public Tuple get(int i) { - return tuples.get(i); + return super.get(i); } + @Override public int size() { - return tuples.size(); + return super.size(); } - public List<Tuple> getList() { - return tuples; - } } @SuppressWarnings("unchecked") @@ -357,7 +356,7 @@ public class POMergeJoin extends Physica } else{ Object rightKey = extractKeysFromTuple(rightInp, 1); - if(null == rightKey) // If we see tuple having null keys in stream, we drop them + if(null == rightKey) // If we see tuple having null keys in stream, we drop them continue; // and fetch next tuple. int cmpval = ((Comparable)rightKey).compareTo(curJoinKey); @@ -399,7 +398,7 @@ public class POMergeJoin extends Physica "Last two tuples encountered were: \n"+ curJoiningRightTup+ "\n" + (Tuple)rightInp.result ; throw new ExecException(errMsg,errCode); - } + } } } } @@ -430,7 +429,7 @@ public class POMergeJoin extends Physica prevLeftKey+ "\n" + curLeftKey ; throw new ExecException(errMsg,errCode); } - + case POStatus.STATUS_EOP: if(this.parentPlan.endOfAllInput || isEndOfInput()){ // We hit the end on left input. @@ -487,17 +486,17 @@ public class POMergeJoin extends Physica slidingToNextRecord = false; } else rightInp = getNextRightInp(prevLeftKey); - + if(rightInp.returnStatus != POStatus.STATUS_OK) return rightInp; Object extractedRightKey = extractKeysFromTuple(rightInp, 1); - - if(null == extractedRightKey) // If we see tuple having null keys in stream, we drop them + + if(null == extractedRightKey) // If we see tuple having null keys in stream, we drop them continue; // and fetch next tuple. - + Comparable rightKey = (Comparable)extractedRightKey; - + if( prevRightKey != null && rightKey.compareTo(prevRightKey) < 0){ // Sanity check. int errCode = 1102; @@ -528,7 +527,7 @@ public class POMergeJoin extends Physica else{ // We got ahead on right side. Store currently read right tuple. prevRightKey = rightKey; prevRightInp = rightInp; - // Since we didn't find any matching right tuple we throw away the buffered left tuples and add the one read in this function call. + // Since we didn't find any matching right tuple we throw away the buffered left tuples and add the one read in this function call. leftTuples = newLeftTupleArray(); leftTuples.add((Tuple)curLeftInp.result); prevLeftInp = curLeftInp; @@ -555,7 +554,7 @@ public class POMergeJoin extends Physica DefaultIndexableLoader loader = (DefaultIndexableLoader)rightLoader; loader.setIndexFile(indexFile); } - + // Pass signature of the loader to rightLoader // make a copy of the conf to use in calls to rightLoader. rightLoader.setUDFContextSignature(signature); @@ -608,11 +607,11 @@ public class POMergeJoin extends Physica // run the tuple through the pipeline rightPipelineRoot.attachInput(t); return this.getNextRightInp(); - + } default: // We don't deal with ERR/NULL. just pass them down throwProcessingException(false, null); - + } } } catch (IOException e) { @@ -643,8 +642,8 @@ public class POMergeJoin extends Physica int errCode = 2167; String errMsg = "LocalRearrange used to extract keys from tuple isn't configured correctly"; throw new ExecException(errMsg,errCode,PigException.BUG); - } - + } + return ((Tuple) lrOut.result).get(1); } @@ -660,7 +659,7 @@ public class POMergeJoin extends Physica noInnerPlanOnRightSide = false; this.rightPipelineLeaf = rightPipeline.getLeaves().get(0); this.rightPipelineRoot = rightPipeline.getRoots().get(0); - this.rightPipelineRoot.setInputs(null); + this.rightPipelineRoot.setInputs(null); } else noInnerPlanOnRightSide = true; @@ -711,18 +710,18 @@ public class POMergeJoin extends Physica public boolean supportsMultipleOutputs() { return false; } - + /** * @param rightInputFileName the rightInputFileName to set */ public void setRightInputFileName(String rightInputFileName) { this.rightInputFileName = rightInputFileName; } - + public String getSignature() { return signature; } - + public void setSignature(String signature) { this.signature = signature; } @@ -734,12 +733,12 @@ public class POMergeJoin extends Physica public String getIndexFile() { return indexFile; } - + @Override public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { return null; } - + public LOJoin.JOINTYPE getJoinType() { return joinType; }