Author: gates
Date: Mon Nov 16 22:18:46 2009
New Revision: 881008
URL: http://svn.apache.org/viewvc?rev=881008&view=rev
Log:
PIG-1085 checking in files I missed in the last checkin.
Added:
hadoop/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java
hadoop/pig/trunk/test/org/apache/pig/test/TestUDFContext.java
hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java
hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java
hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestLoader.java
Added: hadoop/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java?rev=881008&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java Mon Nov 16
22:18:46 2009
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.util;
+
+import java.io.IOException;
+//import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Properties;
+
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.pig.impl.util.ObjectSerializer;
+
+public class UDFContext {
+
+ @SuppressWarnings("deprecation")
+ private JobConf jconf = null;
+ private HashMap<Integer, Properties> udfConfs;
+
+ private static UDFContext self = null;
+
+ private UDFContext() {
+ udfConfs = new HashMap<Integer, Properties>();
+ }
+
+ public static UDFContext getUDFContext() {
+ if (self == null) {
+ self = new UDFContext();
+ }
+ return self;
+ }
+
+ /**
+ * Adds the JobConf to this singleton. Will be
+ * called on the backend by the Map and Reduce
+ * functions so that UDFs can obtain the JobConf
+ * on the backend.
+ */
+ @SuppressWarnings("deprecation")
+ public void addJobConf(JobConf conf) {
+ jconf = conf;
+ }
+
+ /**
+ * Get the JobConf. This should only be called on
+ * the backend. It will return null on the frontend.
+ * @return JobConf for this job. This is a copy of the
+ * JobConf. Nothing written here will be kept by the system.
+ * getUDFConf should be used for recording UDF specific
+ * information.
+ */
+ @SuppressWarnings("deprecation")
+ public JobConf getJobConf() {
+ if (jconf != null) return new JobConf(jconf);
+ else return null;
+ }
+
+ /**
+ * Get a properties object that is specific to this UDF.
+ * Note that if a given UDF is called multiple times in a script,
+ * and each instance passes different arguments, then each will
+ * be provided with different configuration object.
+ * This can be used by loaders to pass their input object path
+ * or URI and separate themselves from other instances of the
+ * same loader. Constructor arguments could also be used,
+ * as they are available on both the front and back end.
+ *
+ * Note that this can only be used to share information
+ * across instantiations of the same function in the front end
+ * and between front end and back end. It cannot be used to
+ * share information between instantiations (that is, between
+ * map and/or reduce instances) on the back end at runtime.
+ * @param c of the UDF obtaining the properties object.
+ * @param args String arguments that make this instance of
+ * the UDF unique.
+ * @return A reference to the properties object specific to
+ * the calling UDF. This is a reference, not a copy.
+ * Any changes to this object will automatically be
+ * propogated to other instances of the UDF calling this
+ * function.
+ */
+
+ @SuppressWarnings("unchecked")
+ public Properties getUDFProperties(Class c, String[] args) {
+ Integer k = generateKey(c, args);
+ Properties p = udfConfs.get(k);
+ if (p == null) {
+ p = new Properties();
+ udfConfs.put(k, p);
+ }
+ return p;
+ }
+
+ /**
+ * Get a properties object that is specific to this UDF.
+ * Note that if a given UDF is called multiple times in a script,
+ * they will all be provided the same configuration object. It
+ * is up to the UDF to make sure the multiple instances do not
+ * stomp on each other.
+ *
+ * It is guaranteed that this properties object will be separate
+ * from that provided to any other UDF.
+ *
+ * Note that this can only be used to share information
+ * across instantiations of the same function in the front end
+ * and between front end and back end. It cannot be used to
+ * share information between instantiations (that is, between
+ * map and/or reduce instances) on the back end at runtime.
+ * @param c of the UDF obtaining the properties object.
+ * @return A reference to the properties object specific to
+ * the calling UDF. This is a reference, not a copy.
+ * Any changes to this object will automatically be
+ * propogated to other instances of the UDF calling this
+ * function.
+ */
+ @SuppressWarnings("unchecked")
+ public Properties getUDFProperties(Class c) {
+ Integer k = generateKey(c);
+ Properties p = udfConfs.get(k);
+ if (p == null) {
+ p = new Properties();
+ udfConfs.put(k, p);
+ }
+ return p;
+ }
+
+ /**
+ * Serialize the UDF specific information into an instance
+ * of JobConf. This function is intended to be called on
+ * the front end in preparation for sending the data to the
+ * backend.
+ * @param conf JobConf to serialize into
+ * @throws IOException if underlying serialization throws it
+ */
+ public void serialize(JobConf conf) throws IOException {
+ conf.set("pig.UDFContext", ObjectSerializer.serialize(udfConfs));
+ }
+
+ /**
+ * Populate the udfConfs field. This function is intended to
+ * be called by Map.configure or Reduce.configure on the backend.
+ * It assumes that addJobConf has already been called.
+ * @throws IOException if underlying deseralization throws it
+ */
+ @SuppressWarnings("unchecked")
+ public void deserialize() throws IOException {
+ udfConfs = (HashMap<Integer,
Properties>)ObjectSerializer.deserialize(jconf.get("pig.UDFContext"));
+ }
+
+ @SuppressWarnings("unchecked")
+ private int generateKey(Class c) {
+ return c.getName().hashCode();
+ }
+
+ @SuppressWarnings("unchecked")
+ private int generateKey(Class c, String[] args) {
+ int hc = c.getName().hashCode();
+ for (int i = 0; i < args.length; i++) {
+ hc <<= 1;
+ hc ^= args[i].hashCode();
+ }
+ return hc;
+ }
+
+}
\ No newline at end of file
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestUDFContext.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestUDFContext.java?rev=881008&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestUDFContext.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestUDFContext.java Mon Nov 16
22:18:46 2009
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.UDFContext;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+
+public class TestUDFContext extends TestCase {
+
+ static MiniCluster cluster = null;
+
+ @Override
+ protected void setUp() throws Exception {
+ cluster = MiniCluster.buildCluster();
+ }
+
+
+ @Test
+ public void testUDFContext() throws Exception {
+ Util.createInputFile(cluster, "a.txt", new String[] { "dumb" });
+ Util.createInputFile(cluster, "b.txt", new String[] { "dumber" });
+ FileLocalizer.deleteTempFiles();
+ PigServer pig = new PigServer(ExecType.MAPREDUCE,
cluster.getProperties());
+ String[] statement = { "A = LOAD 'a.txt' USING
org.apache.pig.test.utils.UDFContextTestLoader('joe');",
+ "B = LOAD 'b.txt' USING
org.apache.pig.test.utils.UDFContextTestLoader('jane');",
+ "C = union A, B;",
+ "D = FOREACH C GENERATE $0, $1,
org.apache.pig.test.utils.UDFContextTestEvalFunc($0),
org.apache.pig.test.utils.UDFContextTestEvalFunc2($0);" };
+
+ File tmpFile = File.createTempFile("temp_jira_851", ".pig");
+ FileWriter writer = new FileWriter(tmpFile);
+ for (String line : statement) {
+ writer.write(line + "\n");
+ }
+ writer.close();
+
+ pig.registerScript(tmpFile.getAbsolutePath());
+ Iterator<Tuple> iterator = pig.openIterator("D");
+ while (iterator.hasNext()) {
+ Tuple tuple = iterator.next();
+ if ("dumb".equals(tuple.get(0).toString())) {
+ assertEquals(tuple.get(1).toString(), "joe");
+ } else if ("dumber".equals(tuple.get(0).toString())) {
+ assertEquals(tuple.get(1).toString(), "jane");
+ }
+ assertEquals(Integer.valueOf(tuple.get(2).toString()), new
Integer(5));
+ assertEquals(tuple.get(3).toString(), "five");
+ }
+ }
+}
Added:
hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java?rev=881008&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java
(added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java
Mon Nov 16 22:18:46 2009
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+
+public class UDFContextTestEvalFunc extends EvalFunc<Integer> {
+ public UDFContextTestEvalFunc() {
+ Properties p =
UDFContext.getUDFContext().getUDFProperties(this.getClass());
+ p.setProperty("key1", "5");
+ }
+
+ @Override
+ public Integer exec(Tuple input) throws IOException {
+ String s =
(UDFContext.getUDFContext().getUDFProperties(this.getClass()).getProperty("key1"));
+ return Integer.valueOf(s);
+ }
+
+}
Added:
hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java?rev=881008&view=auto
==============================================================================
---
hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java
(added)
+++
hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java
Mon Nov 16 22:18:46 2009
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+
+public class UDFContextTestEvalFunc2 extends EvalFunc<String> {
+
+ public UDFContextTestEvalFunc2() {
+ Properties p =
UDFContext.getUDFContext().getUDFProperties(this.getClass());
+ p.setProperty("key1", "five");
+ }
+
+ @Override
+ public String exec(Tuple input) throws IOException {
+ if (UDFContext.getUDFContext().getJobConf() == null) return "JobConf
is null!";
+ else return
UDFContext.getUDFContext().getUDFProperties(this.getClass()).getProperty("key1");
+ }
+
+}
Added: hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestLoader.java?rev=881008&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestLoader.java
(added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestLoader.java
Mon Nov 16 22:18:46 2009
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.UDFContext;
+
+public class UDFContextTestLoader extends PigStorage {
+
+ private String[] vals = new String[1];
+
+ public UDFContextTestLoader(String v1) {
+ vals[0] = v1;
+ Properties p =
UDFContext.getUDFContext().getUDFProperties(this.getClass(), vals);
+ p.setProperty("key1", vals[0]);
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ Tuple t = super.getNext();
+ if (t != null) {
+ if (UDFContext.getUDFContext().getJobConf() == null) {
+ t.append("JobConf is null!");
+ } else {
+ Properties p =
UDFContext.getUDFContext().getUDFProperties(this.getClass(), vals);
+ t.append(p.getProperty("key1"));
+ }
+ }
+ return t;
+ }
+}