Author: daijy
Date: Fri Oct 9 18:07:22 2009
New Revision: 823645
URL: http://svn.apache.org/viewvc?rev=823645&view=rev
Log:
PIG-894: order-by fails when input is empty
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/SampleLoader.java
hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=823645&r1=823644&r2=823645&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Oct 9 18:07:22 2009
@@ -58,6 +58,8 @@
PIG-989: Allow type merge between numerical type and non-numerical type (daijy)
+PIG-894: order-by fails when input is empty (daijy)
+
Release 0.5.0 - Unreleased
INCOMPATIBLE CHANGES
Modified:
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=823645&r1=823644&r2=823645&view=diff
==============================================================================
---
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
(original)
+++
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
Fri Oct 9 18:07:22 2009
@@ -41,6 +41,7 @@
import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.io.NullableBytesWritable;
import org.apache.pig.impl.io.NullableDoubleWritable;
import org.apache.pig.impl.io.NullableFloatWritable;
@@ -49,6 +50,8 @@
import org.apache.pig.impl.io.NullableText;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Pair;
public class WeightedRangePartitioner implements
Partitioner<PigNullableWritable, Writable> {
PigNullableWritable[] quantiles;
@@ -86,19 +89,32 @@
DataBag quantilesList;
loader.bindTo(quantilesFile, new
BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
Tuple t = loader.getNext();
- if(t==null) throw new RuntimeException("Empty samples file");
- // the Quantiles file has a tuple as under:
- // (numQuantiles, bag of samples)
- // numQuantiles here is the reduce parallelism
- Map<String, Object> quantileMap = (Map<String, Object>) t.get(0);
- quantilesList = (DataBag)
quantileMap.get(FindQuantiles.QUANTILES_LIST);
- InternalMap weightedPartsData = (InternalMap)
quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
- convertToArray(quantilesList);
- for(Entry<Object, Object> ent : weightedPartsData.entrySet()){
- Tuple key = (Tuple)ent.getKey(); // sample item which repeats
- float[] probVec = getProbVec((Tuple)ent.getValue());
- weightedParts.put(getPigNullableWritable(key),
- new DiscreteProbabilitySampleGenerator(probVec));
+ if(t!=null)
+ {
+ // the Quantiles file has a tuple as under:
+ // (numQuantiles, bag of samples)
+ // numQuantiles here is the reduce parallelism
+ Map<String, Object> quantileMap = (Map<String, Object>)
t.get(0);
+ quantilesList = (DataBag)
quantileMap.get(FindQuantiles.QUANTILES_LIST);
+ InternalMap weightedPartsData = (InternalMap)
quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
+ convertToArray(quantilesList);
+ for(Entry<Object, Object> ent : weightedPartsData.entrySet()){
+ Tuple key = (Tuple)ent.getKey(); // sample item which
repeats
+ float[] probVec = getProbVec((Tuple)ent.getValue());
+ weightedParts.put(getPigNullableWritable(key),
+ new DiscreteProbabilitySampleGenerator(probVec));
+ }
+ }
+ else
+ {
+ ArrayList<Pair<FileSpec, Boolean>> inp =
(ArrayList<Pair<FileSpec,
Boolean>>)ObjectSerializer.deserialize(job.get("pig.inputs", ""));
+ String inputFileName = inp.get(0).first.getFileName();
+ long inputSize = FileLocalizer.getSize(inputFileName);
+ if (inputSize!=0)
+ {
+ throw new RuntimeException("Empty samples file and
non-empty input file");
+ }
+ // Otherwise, we do not put anything to weightedParts
}
}catch (Exception e){
throw new RuntimeException(e);
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/SampleLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/SampleLoader.java?rev=823645&r1=823644&r2=823645&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/SampleLoader.java
(original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/SampleLoader.java Fri Oct
9 18:07:22 2009
@@ -145,12 +145,8 @@
public Tuple getNext() throws IOException {
long initialPos = loader.getPosition();
- // make sure we move to a boundry of a record
- Tuple t = loader.getSampledTuple();
- long middlePos = loader.getPosition();
-
// we move to next boundry
- t = loader.getSampledTuple();
+ Tuple t = loader.getSampledTuple();
long finalPos = loader.getPosition();
long toSkip = skipInterval - (finalPos - initialPos);
@@ -187,7 +183,7 @@
}
// add size of the tuple at the end
- m.set(t.size(), (finalPos-middlePos) + 1); // offset 1 for null
+ m.set(t.size(), (finalPos-initialPos) + 1); // offset 1 for null
return m;
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=823645&r1=823644&r2=823645&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Fri Oct 9
18:07:22 2009
@@ -59,7 +59,6 @@
// pigServer = new PigServer(ExecType.LOCAL);
}
-
@Test
public void testUdfInputOrder() throws IOException {
String[] input = {
@@ -409,5 +408,14 @@
assertEquals(10, numIdentity);
}
-
+ @Test
+ // See PIG-894
+ public void testEmptySort() throws Exception{
+ File tmpFile = File.createTempFile("test", "txt");
+ pigServer.registerQuery("A = LOAD '" +
Util.generateURI(tmpFile.toString()) + "';");
+ pigServer.registerQuery("B = order A by $0;");
+ Iterator<Tuple> iter = pigServer.openIterator("B");
+
+ assertTrue(iter.hasNext()==false);
+ }
}