Author: olga Date: Thu Apr 30 21:09:08 2009 New Revision: 770445 URL: http://svn.apache.org/viewvc?rev=770445&view=rev Log: PIG-732: additional UDFs
Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/SearchQuery.java hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestSearchQuery.java hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java Modified: hadoop/pig/trunk/contrib/CHANGES.txt Modified: hadoop/pig/trunk/contrib/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/CHANGES.txt?rev=770445&r1=770444&r2=770445&view=diff ============================================================================== --- hadoop/pig/trunk/contrib/CHANGES.txt (original) +++ hadoop/pig/trunk/contrib/CHANGES.txt Thu Apr 30 21:09:08 2009 @@ -1,3 +1,4 @@ +PIG-273: addition of Top and SearchQuery UDFs (ankur via olgan) PIG-246: created UDF repository (olgan) PIG-245: UDF wrappers for Java Math functions (ajaygarg via olgan) PIG-277: UDF for computing correlation and covariance between data sets (ajaygarg via olgan) Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/SearchQuery.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/SearchQuery.java?rev=770445&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/SearchQuery.java (added) +++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/SearchQuery.java Thu Apr 30 21:09:08 2009 @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.piggybank.evaluation.util; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.pig.EvalFunc; +import org.apache.pig.FuncSpec; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.logicalLayer.schema.Schema; + +/** + * This small UDF takes a search engine URL (Google/Yahoo/AOL/Live) containing + * the search query and extracts it. The URL is assumed to be encoded. The query + * is normalized, converting it to lower-case, removing punctuations, removing + * extra spaces. + */ +public class SearchQuery extends EvalFunc<String> { + private static Pattern queryPattern = Pattern + .compile("(?<=([\\&\\?](as_)?[pq]=)).*?(\\z|(?=[\\&\\\"]))"); + + @Override + public String exec(Tuple tuple) throws IOException { + if (tuple == null || tuple.size() < 1) { + return null; + } + try { + String refURL = (String) tuple.get(0); + return extractQuery(refURL); + } catch (ExecException ee) { + throw new IOException(ee); + } + } + + private String extractQuery(String url) { + try { + String refURL = url; + if (refURL == null || refURL.isEmpty()) + return refURL; + String query = null; + refURL = refURL.toLowerCase().trim(); + Matcher matcher = queryPattern.matcher(refURL); + if (matcher.find()) { + query = matcher.group(); + query = URLDecoder.decode(query, "UTF-8"); // decode query + query = query.replaceAll("[\\p{Punct}]+", ""); // remove punctuation + query = query.trim().replaceAll("[\\s]+", " "); // remove extra spaces + query = (query.length() > 80) ? query.substring(0, 80) : query; + } + return query; + } catch (UnsupportedEncodingException ignore) { // should never happen + } + return null; + } + + /* (non-Javadoc) + * @see org.apache.pig.EvalFunc#getArgToFuncMapping() + */ + @Override + public List<FuncSpec> getArgToFuncMapping() throws FrontendException { + List<FuncSpec> funcList = new ArrayList<FuncSpec>(); + funcList.add(new FuncSpec(this.getClass().getName(), new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY)))); + return funcList; + } + @Override + public Schema outputSchema(Schema input) { + try { + Schema s = new Schema(); + s.add(new Schema.FieldSchema("query", DataType.CHARARRAY)); + return s; + } catch (Exception e) { + return null; + } + } +} Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java?rev=770445&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java (added) +++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java Thu Apr 30 21:09:08 2009 @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.piggybank.evaluation.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.PriorityQueue; +import org.apache.pig.EvalFunc; +import org.apache.pig.FuncSpec; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.FrontendException; +import org.apache.pig.impl.logicalLayer.schema.Schema; + +/** + * TopN UDF accepts a bag of tuples and returns top-n tuples depending upon the + * tuple field value of type long. Both n and field number needs to be provided + * to the UDF. The UDF iterates through the input bag and just retains top-n + * tuples by storing them in a priority queue of size n+1 where priority is the + * long field. This is efficient as priority queue provides constant time - O(1) + * removal of the least element and O(log n) time for heap restructuring. The + * UDF is especially helpful for turning the nested grouping operation inside + * out and retaining top-n in a nested group. + * + * Sample usage: + * A = LOAD 'test.tsv' as (first: chararray, second: chararray); + * B = GROUP A BY (first, second); + * C = FOREACH B generate FLATTEN(group), COUNT(*) as count; + * D = GROUP C BY first; // again group by first + * topResults = FOREACH D { + * result = Top(10, 2, C); // and retain top 10 occurrences of 'second' in first + * GENERATE FLATTEN(result); + * } + */ +public class Top extends EvalFunc<DataBag> { + + BagFactory mBagFactory = BagFactory.getInstance(); + + static class TupleComparator implements Comparator<Tuple> { + private int fieldNum; + + public TupleComparator(int fieldNum) { + this.fieldNum = fieldNum; + } + + @Override + public int compare(Tuple o1, Tuple o2) { + if (o1 == null) + return -1; + if (o2 == null) + return 1; + int retValue = 1; + try { + long count1 = (Long) o1.get(fieldNum); + long count2 = (Long) o2.get(fieldNum); + retValue = (count1 > count2) ? 1 : ((count1 == count2) ? 0 : -1); + } catch (ExecException e) { + throw new RuntimeException("Error while comparing o1:" + o1 + + " and o2:" + o2, e); + } + return retValue; + } + } + + @Override + public DataBag exec(Tuple tuple) throws IOException { + if (tuple == null || tuple.size() < 3) { + return null; + } + try { + int n = (Integer) tuple.get(0); + int fieldNum = (Integer) tuple.get(1); + DataBag inputBag = (DataBag) tuple.get(2); + PriorityQueue<Tuple> store = new PriorityQueue<Tuple>(n + 1, + new TupleComparator(fieldNum)); + Iterator<Tuple> itr = inputBag.iterator(); + while (itr.hasNext()) { + Tuple t = itr.next(); + store.add(t); + if (store.size() > n) + store.poll(); + } + DataBag outputBag = mBagFactory.newDefaultBag(); + for (Tuple t : store) { + outputBag.add(t); + } + return outputBag; + } catch (ExecException e) { + throw new RuntimeException("ExecException executing function: ", e); + } catch (Exception e) { + throw new RuntimeException("General Exception executing function: " + e); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.pig.EvalFunc#getArgToFuncMapping() + */ + @Override + public List<FuncSpec> getArgToFuncMapping() throws FrontendException { + List<FuncSpec> funcList = new ArrayList<FuncSpec>(); + funcList.add(new FuncSpec(this.getClass().getName(), new Schema( + new Schema.FieldSchema(null, DataType.INTEGER)))); + funcList.add(new FuncSpec(this.getClass().getName(), new Schema( + new Schema.FieldSchema(null, DataType.INTEGER)))); + funcList.add(new FuncSpec(this.getClass().getName(), new Schema( + new Schema.FieldSchema(null, DataType.BAG)))); + return funcList; + } + + @Override + public Schema outputSchema(Schema input) { + try { + if (input.size() < 3) { + return null; + } + Schema.FieldSchema bagFs = new Schema.FieldSchema("bag_of_input_tuples", + input.getField(2).schema, DataType.BAG); + return new Schema(bagFs); + + } catch (Exception e) { + return null; + } + } +} Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestSearchQuery.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestSearchQuery.java?rev=770445&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestSearchQuery.java (added) +++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestSearchQuery.java Thu Apr 30 21:09:08 2009 @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.piggybank.test.evaluation.util; + +import org.apache.pig.piggybank.evaluation.util.SearchQuery; + +import org.apache.pig.data.DefaultTupleFactory; +import org.apache.pig.data.Tuple; +import org.junit.Test; + +import junit.framework.TestCase; + +public class TestSearchQuery extends TestCase { + + @Test + public void testSearchQuery() throws Exception { + String[] searchUrls = { + "http://www.google.co.in/advanced_search?q=google+Advanced+Query&hl=en", + "http://www.google.co.in/search?hl=en&as_q=google+simple+Query&as_epq=&as_oq=&as_eq=&num=10&lr=&as_filetype=&ft=i&as_sitesearch=&as_qdr=all&as_rights=&as_occt=any&cr=&as_nlo=&as_nhi=&safe=images", + "http://search.live.com/results.aspx?q=live+Query&go=&form=QBRE", + "http://search.aol.com/aol/search?s_it=searchbox.webhome&q=aol+query", + "http://search.yahoo.com/search;_ylt=A0geu8zce8NJQxYBmgal87UF?p=Yahoo+query&fr=sfp&fr2=&iscqry=" }; + String[] queries = { "google advanced query", "google simple query", + "live query", "aol query", "yahoo query" }; + SearchQuery sq = new SearchQuery(); + Tuple tuple = DefaultTupleFactory.getInstance().newTuple(1); + for (int i = 0; i < searchUrls.length; i++) { + tuple.set(0, searchUrls[i]); + super.assertEquals(sq.exec(tuple), queries[i]); + } + } +} Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java?rev=770445&view=auto ============================================================================== --- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java (added) +++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java Thu Apr 30 21:09:08 2009 @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.piggybank.test.evaluation.util; + +import java.util.Iterator; + +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DefaultBagFactory; +import org.apache.pig.data.DefaultTupleFactory; +import org.apache.pig.data.Tuple; +import org.apache.pig.piggybank.evaluation.util.Top; +import org.junit.Test; + +import junit.framework.TestCase; + +public class TestTop extends TestCase { + + @Test + public void testTop() throws Exception { + Top top = new Top(); + Tuple inputTuple = DefaultTupleFactory.getInstance().newTuple(3); + // set N = 10 i.e retain top 10 tuples + inputTuple.set(0, 10); + // compare tuples by field number 1 + inputTuple.set(1, 1); + // set the data bag containing the tuples + DataBag dBag = DefaultBagFactory.getInstance().newDefaultBag(); + inputTuple.set(2, dBag); + // generate tuples of the form (group-1, 1), (group-2, 2) ... + for (long i = 0; i < 100; i++) { + Tuple nestedTuple = DefaultTupleFactory.getInstance().newTuple(2); + nestedTuple.set(0, "group-" + i); + nestedTuple.set(1, i); + dBag.add(nestedTuple); + } + + DataBag outBag = top.exec(inputTuple); + super.assertEquals(outBag.size(), 10L); + Iterator<Tuple> itr = outBag.iterator(); + while (itr.hasNext()) { + Tuple next = itr.next(); + Long value = (Long) next.get(1); + super.assertTrue("Value " + value + " exceeded the expected limit", + value > 89); + } + } +}