Author: olga
Date: Thu Apr 30 21:09:08 2009
New Revision: 770445

URL: http://svn.apache.org/viewvc?rev=770445&view=rev
Log:
PIG-732: additional UDFs

Added:
    
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/SearchQuery.java
    
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java
    
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/
    
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestSearchQuery.java
    
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java
Modified:
    hadoop/pig/trunk/contrib/CHANGES.txt

Modified: hadoop/pig/trunk/contrib/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/CHANGES.txt?rev=770445&r1=770444&r2=770445&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/CHANGES.txt Thu Apr 30 21:09:08 2009
@@ -1,3 +1,4 @@
+PIG-273: addition of Top and SearchQuery UDFs (ankur via olgan)
 PIG-246: created UDF repository (olgan)
 PIG-245: UDF wrappers for Java Math functions (ajaygarg via olgan)
 PIG-277: UDF for computing correlation and covariance between data sets 
(ajaygarg via olgan)

Added: 
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/SearchQuery.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/SearchQuery.java?rev=770445&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/SearchQuery.java
 (added)
+++ 
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/SearchQuery.java
 Thu Apr 30 21:09:08 2009
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.piggybank.evaluation.util;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * This small UDF takes a search engine URL (Google/Yahoo/AOL/Live) containing
+ * the search query and extracts it. The URL is assumed to be encoded. The 
query
+ * is normalized, converting it to lower-case, removing punctuations, removing
+ * extra spaces.
+ */
+public class SearchQuery extends EvalFunc<String> {
+  private static Pattern queryPattern = Pattern
+      .compile("(?<=([\\&\\?](as_)?[pq]=)).*?(\\z|(?=[\\&\\\"]))");
+
+  @Override
+  public String exec(Tuple tuple) throws IOException {
+    if (tuple == null || tuple.size() < 1) {
+      return null;
+    }
+    try {
+      String refURL = (String) tuple.get(0);
+      return extractQuery(refURL);
+    } catch (ExecException ee) {
+      throw new IOException(ee);
+    }
+  }
+
+  private String extractQuery(String url) {
+    try {
+      String refURL = url;
+      if (refURL == null || refURL.isEmpty())
+        return refURL;
+      String query = null;
+      refURL = refURL.toLowerCase().trim();
+      Matcher matcher = queryPattern.matcher(refURL);
+      if (matcher.find()) {
+        query = matcher.group();
+        query = URLDecoder.decode(query, "UTF-8"); // decode query
+        query = query.replaceAll("[\\p{Punct}]+", ""); // remove punctuation
+        query = query.trim().replaceAll("[\\s]+", " "); // remove extra spaces
+        query = (query.length() > 80) ? query.substring(0, 80) : query;
+      }
+      return query;
+    } catch (UnsupportedEncodingException ignore) { // should never happen
+    }
+    return null;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.pig.EvalFunc#getArgToFuncMapping()
+   */
+  @Override
+  public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
+      List<FuncSpec> funcList = new ArrayList<FuncSpec>();
+      funcList.add(new FuncSpec(this.getClass().getName(), new Schema(new 
Schema.FieldSchema(null, DataType.CHARARRAY))));
+      return funcList;
+  }
+  @Override
+  public Schema outputSchema(Schema input) {
+    try {
+      Schema s = new Schema();
+      s.add(new Schema.FieldSchema("query", DataType.CHARARRAY));
+      return s;
+    } catch (Exception e) {
+      return null;
+    }
+  }
+}

Added: 
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java?rev=770445&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java
 (added)
+++ 
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java
 Thu Apr 30 21:09:08 2009
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.piggybank.evaluation.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * TopN UDF accepts a bag of tuples and returns top-n tuples depending upon the
+ * tuple field value of type long. Both n and field number needs to be provided
+ * to the UDF. The UDF iterates through the input bag and just retains top-n
+ * tuples by storing them in a priority queue of size n+1 where priority is the
+ * long field. This is efficient as priority queue provides constant time - 
O(1)
+ * removal of the least element and O(log n) time for heap restructuring. The
+ * UDF is especially helpful for turning the nested grouping operation inside
+ * out and retaining top-n in a nested group. 
+ * 
+ * Sample usage: 
+ * A = LOAD 'test.tsv' as (first: chararray, second: chararray); 
+ * B = GROUP A BY (first, second);
+ * C = FOREACH B generate FLATTEN(group), COUNT(*) as count;
+ * D = GROUP C BY first; // again group by first 
+ * topResults = FOREACH D { 
+ *          result = Top(10, 2, C); // and retain top 10 occurrences of 
'second' in first 
+ *          GENERATE FLATTEN(result); 
+ *  }
+ */
+public class Top extends EvalFunc<DataBag> {
+
+  BagFactory mBagFactory = BagFactory.getInstance();
+
+  static class TupleComparator implements Comparator<Tuple> {
+    private int fieldNum;
+
+    public TupleComparator(int fieldNum) {
+      this.fieldNum = fieldNum;
+    }
+
+    @Override
+    public int compare(Tuple o1, Tuple o2) {
+      if (o1 == null)
+        return -1;
+      if (o2 == null)
+        return 1;
+      int retValue = 1;
+      try {
+        long count1 = (Long) o1.get(fieldNum);
+        long count2 = (Long) o2.get(fieldNum);
+        retValue = (count1 > count2) ? 1 : ((count1 == count2) ? 0 : -1);
+      } catch (ExecException e) {
+        throw new RuntimeException("Error while comparing o1:" + o1
+            + " and o2:" + o2, e);
+      }
+      return retValue;
+    }
+  }
+
+  @Override
+  public DataBag exec(Tuple tuple) throws IOException {
+    if (tuple == null || tuple.size() < 3) {
+      return null;
+    }
+    try {
+      int n = (Integer) tuple.get(0);
+      int fieldNum = (Integer) tuple.get(1);
+      DataBag inputBag = (DataBag) tuple.get(2);
+      PriorityQueue<Tuple> store = new PriorityQueue<Tuple>(n + 1,
+          new TupleComparator(fieldNum));
+      Iterator<Tuple> itr = inputBag.iterator();
+      while (itr.hasNext()) {
+        Tuple t = itr.next();
+        store.add(t);
+        if (store.size() > n)
+          store.poll();
+      }
+      DataBag outputBag = mBagFactory.newDefaultBag();
+      for (Tuple t : store) {
+        outputBag.add(t);
+      }
+      return outputBag;
+    } catch (ExecException e) {
+      throw new RuntimeException("ExecException executing function: ", e);
+    } catch (Exception e) {
+      throw new RuntimeException("General Exception executing function: " + e);
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.pig.EvalFunc#getArgToFuncMapping()
+   */
+  @Override
+  public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
+    List<FuncSpec> funcList = new ArrayList<FuncSpec>();
+    funcList.add(new FuncSpec(this.getClass().getName(), new Schema(
+        new Schema.FieldSchema(null, DataType.INTEGER))));
+    funcList.add(new FuncSpec(this.getClass().getName(), new Schema(
+        new Schema.FieldSchema(null, DataType.INTEGER))));
+    funcList.add(new FuncSpec(this.getClass().getName(), new Schema(
+        new Schema.FieldSchema(null, DataType.BAG))));
+    return funcList;
+  }
+
+  @Override
+  public Schema outputSchema(Schema input) {
+    try {
+      if (input.size() < 3) {
+        return null;
+      }
+      Schema.FieldSchema bagFs = new Schema.FieldSchema("bag_of_input_tuples",
+          input.getField(2).schema, DataType.BAG);
+      return new Schema(bagFs);
+
+    } catch (Exception e) {
+      return null;
+    }
+  }
+}

Added: 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestSearchQuery.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestSearchQuery.java?rev=770445&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestSearchQuery.java
 (added)
+++ 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestSearchQuery.java
 Thu Apr 30 21:09:08 2009
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.piggybank.test.evaluation.util;
+
+import org.apache.pig.piggybank.evaluation.util.SearchQuery;
+
+import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.Tuple;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestSearchQuery extends TestCase {
+
+  @Test
+  public void testSearchQuery() throws Exception {
+    String[] searchUrls = {
+        
"http://www.google.co.in/advanced_search?q=google+Advanced+Query&hl=en";,
+        
"http://www.google.co.in/search?hl=en&as_q=google+simple+Query&as_epq=&as_oq=&as_eq=&num=10&lr=&as_filetype=&ft=i&as_sitesearch=&as_qdr=all&as_rights=&as_occt=any&cr=&as_nlo=&as_nhi=&safe=images";,
+        "http://search.live.com/results.aspx?q=live+Query&go=&form=QBRE";,
+        "http://search.aol.com/aol/search?s_it=searchbox.webhome&q=aol+query";,
+        
"http://search.yahoo.com/search;_ylt=A0geu8zce8NJQxYBmgal87UF?p=Yahoo+query&fr=sfp&fr2=&iscqry=";
 };
+    String[] queries = { "google advanced query", "google simple query",
+        "live query", "aol query", "yahoo query" };
+    SearchQuery sq = new SearchQuery();
+    Tuple tuple = DefaultTupleFactory.getInstance().newTuple(1);
+    for (int i = 0; i < searchUrls.length; i++) {
+      tuple.set(0, searchUrls[i]);
+      super.assertEquals(sq.exec(tuple), queries[i]);
+    }
+  }
+}

Added: 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java?rev=770445&view=auto
==============================================================================
--- 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java
 (added)
+++ 
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java
 Thu Apr 30 21:09:08 2009
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.piggybank.test.evaluation.util;
+
+import java.util.Iterator;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.piggybank.evaluation.util.Top;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestTop extends TestCase {
+
+  @Test
+  public void testTop() throws Exception {
+    Top top = new Top();
+    Tuple inputTuple = DefaultTupleFactory.getInstance().newTuple(3);
+    // set N = 10 i.e retain top 10 tuples
+    inputTuple.set(0, 10);
+    // compare tuples by field number 1
+    inputTuple.set(1, 1);
+    // set the data bag containing the tuples
+    DataBag dBag = DefaultBagFactory.getInstance().newDefaultBag();
+    inputTuple.set(2, dBag);
+    // generate tuples of the form (group-1, 1), (group-2, 2) ...
+    for (long i = 0; i < 100; i++) {
+      Tuple nestedTuple = DefaultTupleFactory.getInstance().newTuple(2);
+      nestedTuple.set(0, "group-" + i);
+      nestedTuple.set(1, i);
+      dBag.add(nestedTuple);
+    }
+
+    DataBag outBag = top.exec(inputTuple);
+    super.assertEquals(outBag.size(), 10L);
+    Iterator<Tuple> itr = outBag.iterator();
+    while (itr.hasNext()) {
+      Tuple next = itr.next();
+      Long value = (Long) next.get(1);
+      super.assertTrue("Value " + value + " exceeded the expected limit",
+          value > 89);
+    }
+  }
+}


Reply via email to