Author: olga
Date: Mon May  4 20:57:17 2009
New Revision: 771438

URL: http://svn.apache.org/viewvc?rev=771438&view=rev
Log:
PIG-795: support for SAMPLE command

Added:
    hadoop/pig/trunk/src/org/apache/pig/builtin/RANDOM.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestSample.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt

Modified: hadoop/pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=771438&r1=771437&r2=771438&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon May  4 20:57:17 2009
@@ -610,3 +610,5 @@
     PIG-652: Adapt changes in store interface to multi-query changes (hagleitn 
via gates).
 
     PIG-627: multiquery support phase 3 (hagleitn and Richard Ding via olgan)
+
+    PIG-795: support for SAMPLE command (ericg via olgan)

Added: hadoop/pig/trunk/src/org/apache/pig/builtin/RANDOM.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/RANDOM.java?rev=771438&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/RANDOM.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/RANDOM.java Mon May  4 20:57:17 
2009
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.data.DataType;
+
+public class RANDOM extends EvalFunc<Double>{
+
+       public Double exec(Tuple input) throws IOException {
+               return Math.random();
+       }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        return new Schema(new 
Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), 
input), DataType.DOUBLE));
+    }
+}

Modified: 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=771438&r1=771437&r2=771438&view=diff
==============================================================================
--- 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
(original)
+++ 
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt 
Mon May  4 20:57:17 2009
@@ -41,6 +41,7 @@
 import org.apache.pig.ExecType;
 import org.apache.pig.impl.io.*;
 import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.RANDOM;
 import org.apache.pig.impl.builtin.GFAny;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.commons.logging.Log;
@@ -842,6 +843,7 @@
 TOKEN : { <STDIN: "stdin"> }
 TOKEN : { <STDOUT: "stdout"> }
 TOKEN : { <LIMIT: "limit"> }
+TOKEN : { <SAMPLE: "sample"> }
 
 TOKEN:
 {
@@ -1080,6 +1082,7 @@
 |      ((<GROUP> | <COGROUP>) op = CogroupClause(lp))
 |      (<FILTER> op = FilterClause(lp))
 |   (<LIMIT> op = LimitClause(lp))
+|   (<SAMPLE> op = SampleClause(lp))
 |   (<ORDER> op = OrderClause(lp))
 |      (<DISTINCT> op = NestedExpr(lp) 
        {
@@ -1224,6 +1227,48 @@
        }
 }
 
+/*
+ * "SAMPLE a x" is translated to "FILTER a BY RANDOM()<x"
+ */
+LogicalOperator SampleClause(LogicalPlan lp) :
+{
+    ExpressionOperator cond;
+    LogicalOperator input;
+    Token t;
+    LogicalPlan conditionPlan = new LogicalPlan();
+    log.trace("Entering SampleClause");
+}
+{
+    (
+    input = NestedExpr(lp) {log.debug("Filter input: " + input);}
+    t = <DOUBLENUMBER>
+    )
+    {
+        LOUserFunc rand = new LOUserFunc(conditionPlan, new OperatorKey(scope, 
getNextId()), new FuncSpec(RANDOM.class.getName()), DataType.DOUBLE);
+        conditionPlan.add(rand);
+
+        double l = Double.parseDouble(t.image);
+        LOConst prob = new LOConst(conditionPlan, new OperatorKey(scope, 
getNextId()), l);
+        conditionPlan.add(prob);
+
+        cond = new LOLesserThanEqual(conditionPlan, new OperatorKey(scope, 
getNextId()));
+        conditionPlan.add(cond);
+        conditionPlan.connect(rand, cond);
+        conditionPlan.connect(prob, cond);
+
+        LogicalOperator filter = new LOFilter(lp, new OperatorKey(scope, 
getNextId()), conditionPlan);
+        addAlias(input.getAlias(), input);
+        lp.add(filter);
+        log.debug("Added operator " + filter.getClass().getName() + " to the 
logical plan");
+
+        lp.connect(input, filter);
+        log.debug("Connected alias " + input.getAlias() + " operator " + 
input.getClass().getName() + " to operator " + filter.getClass().getName() +" 
in the logical plan");
+
+        log.trace("Exiting SampleClause");
+        return filter;
+    }
+}
+
 LogicalOperator LimitClause(LogicalPlan lp):
 {
         LogicalOperator input;

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestSample.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSample.java?rev=771438&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSample.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSample.java Mon May  4 
20:57:17 2009
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.text.DecimalFormat;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.junit.Test;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+public class TestSample
+extends TestCase
+{
+    private PigServer pig;
+    private File tmpFile;
+    private String tmpfilepath;
+
+    private int DATALEN = 1024;
+
+    @Override
+    protected void setUp()
+    throws Exception
+    {
+        //pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pig = new PigServer("local");
+
+        tmpFile = File.createTempFile( this.getName(), ".txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        for(int i = 0; i < DATALEN; i++) {
+            ps.println(i);
+        }
+        ps.close();
+
+        tmpfilepath = Util.generateURI(tmpFile.getCanonicalPath());
+    }
+
+    protected void tearDown()
+    throws Exception
+    {
+        tmpFile.delete();
+    }
+
+    private void verify(String query, int expected_min, int expected_max)
+    throws Exception
+    {
+        System.out.println("[TestSample] Query: "+query);
+        pig.registerQuery(query);
+
+        int count = 0;
+        Iterator<Tuple> it = pig.openIterator("myid");
+        while (it.hasNext()) {
+          it.next();
+          count ++;
+        }
+
+        boolean closeEnough = ((expected_min<=count) && (count<=expected_max));
+        System.out.println("[TestSample] Result: 
"+expected_min+"<="+count+"<="+expected_max+" -> "+closeEnough);
+        assertTrue("Count outside expected range", closeEnough);
+    }
+
+    @Test
+    public void testSample_None()
+    throws Exception
+    {
+        verify("myid = sample (load '"+ tmpfilepath + "') 0.0;", 0, 0);
+    }
+
+    @Test
+    public void testSample_All()
+    throws Exception
+    {
+        verify("myid = sample (load '"+ tmpfilepath + "') 1.0;", DATALEN, 
DATALEN);
+    }
+
+    @Test
+    public void testSample_Some()
+    throws Exception
+    {
+        verify("myid = sample (load '"+ tmpfilepath + "') 0.5;", DATALEN/3, 
DATALEN*2/3);
+    }
+}


Reply via email to