Author: olga
Date: Thu Apr 30 21:09:08 2009
New Revision: 770445
URL: http://svn.apache.org/viewvc?rev=770445&view=rev
Log:
PIG-732: additional UDFs
Added:
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/SearchQuery.java
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestSearchQuery.java
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java
Modified:
hadoop/pig/trunk/contrib/CHANGES.txt
Modified: hadoop/pig/trunk/contrib/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/CHANGES.txt?rev=770445&r1=770444&r2=770445&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/CHANGES.txt Thu Apr 30 21:09:08 2009
@@ -1,3 +1,4 @@
+PIG-273: addition of Top and SearchQuery UDFs (ankur via olgan)
PIG-246: created UDF repository (olgan)
PIG-245: UDF wrappers for Java Math functions (ajaygarg via olgan)
PIG-277: UDF for computing correlation and covariance between data sets
(ajaygarg via olgan)
Added:
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/SearchQuery.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/SearchQuery.java?rev=770445&view=auto
==============================================================================
---
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/SearchQuery.java
(added)
+++
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/SearchQuery.java
Thu Apr 30 21:09:08 2009
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.piggybank.evaluation.util;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * This small UDF takes a search engine URL (Google/Yahoo/AOL/Live) containing
+ * the search query and extracts it. The URL is assumed to be encoded. The
query
+ * is normalized, converting it to lower-case, removing punctuations, removing
+ * extra spaces.
+ */
+public class SearchQuery extends EvalFunc<String> {
+ private static Pattern queryPattern = Pattern
+ .compile("(?<=([\\&\\?](as_)?[pq]=)).*?(\\z|(?=[\\&\\\"]))");
+
+ @Override
+ public String exec(Tuple tuple) throws IOException {
+ if (tuple == null || tuple.size() < 1) {
+ return null;
+ }
+ try {
+ String refURL = (String) tuple.get(0);
+ return extractQuery(refURL);
+ } catch (ExecException ee) {
+ throw new IOException(ee);
+ }
+ }
+
+ private String extractQuery(String url) {
+ try {
+ String refURL = url;
+ if (refURL == null || refURL.isEmpty())
+ return refURL;
+ String query = null;
+ refURL = refURL.toLowerCase().trim();
+ Matcher matcher = queryPattern.matcher(refURL);
+ if (matcher.find()) {
+ query = matcher.group();
+ query = URLDecoder.decode(query, "UTF-8"); // decode query
+ query = query.replaceAll("[\\p{Punct}]+", ""); // remove punctuation
+ query = query.trim().replaceAll("[\\s]+", " "); // remove extra spaces
+ query = (query.length() > 80) ? query.substring(0, 80) : query;
+ }
+ return query;
+ } catch (UnsupportedEncodingException ignore) { // should never happen
+ }
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.EvalFunc#getArgToFuncMapping()
+ */
+ @Override
+ public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
+ List<FuncSpec> funcList = new ArrayList<FuncSpec>();
+ funcList.add(new FuncSpec(this.getClass().getName(), new Schema(new
Schema.FieldSchema(null, DataType.CHARARRAY))));
+ return funcList;
+ }
+ @Override
+ public Schema outputSchema(Schema input) {
+ try {
+ Schema s = new Schema();
+ s.add(new Schema.FieldSchema("query", DataType.CHARARRAY));
+ return s;
+ } catch (Exception e) {
+ return null;
+ }
+ }
+}
Added:
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java?rev=770445&view=auto
==============================================================================
---
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java
(added)
+++
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java
Thu Apr 30 21:09:08 2009
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.piggybank.evaluation.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * TopN UDF accepts a bag of tuples and returns top-n tuples depending upon the
+ * tuple field value of type long. Both n and field number needs to be provided
+ * to the UDF. The UDF iterates through the input bag and just retains top-n
+ * tuples by storing them in a priority queue of size n+1 where priority is the
+ * long field. This is efficient as priority queue provides constant time -
O(1)
+ * removal of the least element and O(log n) time for heap restructuring. The
+ * UDF is especially helpful for turning the nested grouping operation inside
+ * out and retaining top-n in a nested group.
+ *
+ * Sample usage:
+ * A = LOAD 'test.tsv' as (first: chararray, second: chararray);
+ * B = GROUP A BY (first, second);
+ * C = FOREACH B generate FLATTEN(group), COUNT(*) as count;
+ * D = GROUP C BY first; // again group by first
+ * topResults = FOREACH D {
+ * result = Top(10, 2, C); // and retain top 10 occurrences of
'second' in first
+ * GENERATE FLATTEN(result);
+ * }
+ */
+public class Top extends EvalFunc<DataBag> {
+
+ BagFactory mBagFactory = BagFactory.getInstance();
+
+ static class TupleComparator implements Comparator<Tuple> {
+ private int fieldNum;
+
+ public TupleComparator(int fieldNum) {
+ this.fieldNum = fieldNum;
+ }
+
+ @Override
+ public int compare(Tuple o1, Tuple o2) {
+ if (o1 == null)
+ return -1;
+ if (o2 == null)
+ return 1;
+ int retValue = 1;
+ try {
+ long count1 = (Long) o1.get(fieldNum);
+ long count2 = (Long) o2.get(fieldNum);
+ retValue = (count1 > count2) ? 1 : ((count1 == count2) ? 0 : -1);
+ } catch (ExecException e) {
+ throw new RuntimeException("Error while comparing o1:" + o1
+ + " and o2:" + o2, e);
+ }
+ return retValue;
+ }
+ }
+
+ @Override
+ public DataBag exec(Tuple tuple) throws IOException {
+ if (tuple == null || tuple.size() < 3) {
+ return null;
+ }
+ try {
+ int n = (Integer) tuple.get(0);
+ int fieldNum = (Integer) tuple.get(1);
+ DataBag inputBag = (DataBag) tuple.get(2);
+ PriorityQueue<Tuple> store = new PriorityQueue<Tuple>(n + 1,
+ new TupleComparator(fieldNum));
+ Iterator<Tuple> itr = inputBag.iterator();
+ while (itr.hasNext()) {
+ Tuple t = itr.next();
+ store.add(t);
+ if (store.size() > n)
+ store.poll();
+ }
+ DataBag outputBag = mBagFactory.newDefaultBag();
+ for (Tuple t : store) {
+ outputBag.add(t);
+ }
+ return outputBag;
+ } catch (ExecException e) {
+ throw new RuntimeException("ExecException executing function: ", e);
+ } catch (Exception e) {
+ throw new RuntimeException("General Exception executing function: " + e);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.pig.EvalFunc#getArgToFuncMapping()
+ */
+ @Override
+ public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
+ List<FuncSpec> funcList = new ArrayList<FuncSpec>();
+ funcList.add(new FuncSpec(this.getClass().getName(), new Schema(
+ new Schema.FieldSchema(null, DataType.INTEGER))));
+ funcList.add(new FuncSpec(this.getClass().getName(), new Schema(
+ new Schema.FieldSchema(null, DataType.INTEGER))));
+ funcList.add(new FuncSpec(this.getClass().getName(), new Schema(
+ new Schema.FieldSchema(null, DataType.BAG))));
+ return funcList;
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ try {
+ if (input.size() < 3) {
+ return null;
+ }
+ Schema.FieldSchema bagFs = new Schema.FieldSchema("bag_of_input_tuples",
+ input.getField(2).schema, DataType.BAG);
+ return new Schema(bagFs);
+
+ } catch (Exception e) {
+ return null;
+ }
+ }
+}
Added:
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestSearchQuery.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestSearchQuery.java?rev=770445&view=auto
==============================================================================
---
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestSearchQuery.java
(added)
+++
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestSearchQuery.java
Thu Apr 30 21:09:08 2009
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.piggybank.test.evaluation.util;
+
+import org.apache.pig.piggybank.evaluation.util.SearchQuery;
+
+import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.Tuple;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestSearchQuery extends TestCase {
+
+ @Test
+ public void testSearchQuery() throws Exception {
+ String[] searchUrls = {
+
"http://www.google.co.in/advanced_search?q=google+Advanced+Query&hl=en",
+
"http://www.google.co.in/search?hl=en&as_q=google+simple+Query&as_epq=&as_oq=&as_eq=&num=10&lr=&as_filetype=&ft=i&as_sitesearch=&as_qdr=all&as_rights=&as_occt=any&cr=&as_nlo=&as_nhi=&safe=images",
+ "http://search.live.com/results.aspx?q=live+Query&go=&form=QBRE",
+ "http://search.aol.com/aol/search?s_it=searchbox.webhome&q=aol+query",
+
"http://search.yahoo.com/search;_ylt=A0geu8zce8NJQxYBmgal87UF?p=Yahoo+query&fr=sfp&fr2=&iscqry="
};
+ String[] queries = { "google advanced query", "google simple query",
+ "live query", "aol query", "yahoo query" };
+ SearchQuery sq = new SearchQuery();
+ Tuple tuple = DefaultTupleFactory.getInstance().newTuple(1);
+ for (int i = 0; i < searchUrls.length; i++) {
+ tuple.set(0, searchUrls[i]);
+ super.assertEquals(sq.exec(tuple), queries[i]);
+ }
+ }
+}
Added:
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java?rev=770445&view=auto
==============================================================================
---
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java
(added)
+++
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java
Thu Apr 30 21:09:08 2009
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.piggybank.test.evaluation.util;
+
+import java.util.Iterator;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.piggybank.evaluation.util.Top;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestTop extends TestCase {
+
+ @Test
+ public void testTop() throws Exception {
+ Top top = new Top();
+ Tuple inputTuple = DefaultTupleFactory.getInstance().newTuple(3);
+ // set N = 10 i.e retain top 10 tuples
+ inputTuple.set(0, 10);
+ // compare tuples by field number 1
+ inputTuple.set(1, 1);
+ // set the data bag containing the tuples
+ DataBag dBag = DefaultBagFactory.getInstance().newDefaultBag();
+ inputTuple.set(2, dBag);
+ // generate tuples of the form (group-1, 1), (group-2, 2) ...
+ for (long i = 0; i < 100; i++) {
+ Tuple nestedTuple = DefaultTupleFactory.getInstance().newTuple(2);
+ nestedTuple.set(0, "group-" + i);
+ nestedTuple.set(1, i);
+ dBag.add(nestedTuple);
+ }
+
+ DataBag outBag = top.exec(inputTuple);
+ super.assertEquals(outBag.size(), 10L);
+ Iterator<Tuple> itr = outBag.iterator();
+ while (itr.hasNext()) {
+ Tuple next = itr.next();
+ Long value = (Long) next.get(1);
+ super.assertTrue("Value " + value + " exceeded the expected limit",
+ value > 89);
+ }
+ }
+}