Author: gates Date: Mon Nov 16 22:18:46 2009 New Revision: 881008 URL: http://svn.apache.org/viewvc?rev=881008&view=rev Log: PIG-1085 checking in files I missed in the last checkin.
Added: hadoop/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java hadoop/pig/trunk/test/org/apache/pig/test/TestUDFContext.java hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestLoader.java Added: hadoop/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java?rev=881008&view=auto ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java (added) +++ hadoop/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java Mon Nov 16 22:18:46 2009 @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.util; + +import java.io.IOException; +//import java.io.Serializable; +import java.util.HashMap; +import java.util.Properties; + +import org.apache.hadoop.mapred.JobConf; + +import org.apache.pig.impl.util.ObjectSerializer; + +public class UDFContext { + + @SuppressWarnings("deprecation") + private JobConf jconf = null; + private HashMap<Integer, Properties> udfConfs; + + private static UDFContext self = null; + + private UDFContext() { + udfConfs = new HashMap<Integer, Properties>(); + } + + public static UDFContext getUDFContext() { + if (self == null) { + self = new UDFContext(); + } + return self; + } + + /** + * Adds the JobConf to this singleton. Will be + * called on the backend by the Map and Reduce + * functions so that UDFs can obtain the JobConf + * on the backend. + */ + @SuppressWarnings("deprecation") + public void addJobConf(JobConf conf) { + jconf = conf; + } + + /** + * Get the JobConf. This should only be called on + * the backend. It will return null on the frontend. + * @return JobConf for this job. This is a copy of the + * JobConf. Nothing written here will be kept by the system. + * getUDFConf should be used for recording UDF specific + * information. + */ + @SuppressWarnings("deprecation") + public JobConf getJobConf() { + if (jconf != null) return new JobConf(jconf); + else return null; + } + + /** + * Get a properties object that is specific to this UDF. + * Note that if a given UDF is called multiple times in a script, + * and each instance passes different arguments, then each will + * be provided with different configuration object. + * This can be used by loaders to pass their input object path + * or URI and separate themselves from other instances of the + * same loader. Constructor arguments could also be used, + * as they are available on both the front and back end. + * + * Note that this can only be used to share information + * across instantiations of the same function in the front end + * and between front end and back end. It cannot be used to + * share information between instantiations (that is, between + * map and/or reduce instances) on the back end at runtime. + * @param c of the UDF obtaining the properties object. + * @param args String arguments that make this instance of + * the UDF unique. + * @return A reference to the properties object specific to + * the calling UDF. This is a reference, not a copy. + * Any changes to this object will automatically be + * propogated to other instances of the UDF calling this + * function. + */ + + @SuppressWarnings("unchecked") + public Properties getUDFProperties(Class c, String[] args) { + Integer k = generateKey(c, args); + Properties p = udfConfs.get(k); + if (p == null) { + p = new Properties(); + udfConfs.put(k, p); + } + return p; + } + + /** + * Get a properties object that is specific to this UDF. + * Note that if a given UDF is called multiple times in a script, + * they will all be provided the same configuration object. It + * is up to the UDF to make sure the multiple instances do not + * stomp on each other. + * + * It is guaranteed that this properties object will be separate + * from that provided to any other UDF. + * + * Note that this can only be used to share information + * across instantiations of the same function in the front end + * and between front end and back end. It cannot be used to + * share information between instantiations (that is, between + * map and/or reduce instances) on the back end at runtime. + * @param c of the UDF obtaining the properties object. + * @return A reference to the properties object specific to + * the calling UDF. This is a reference, not a copy. + * Any changes to this object will automatically be + * propogated to other instances of the UDF calling this + * function. + */ + @SuppressWarnings("unchecked") + public Properties getUDFProperties(Class c) { + Integer k = generateKey(c); + Properties p = udfConfs.get(k); + if (p == null) { + p = new Properties(); + udfConfs.put(k, p); + } + return p; + } + + /** + * Serialize the UDF specific information into an instance + * of JobConf. This function is intended to be called on + * the front end in preparation for sending the data to the + * backend. + * @param conf JobConf to serialize into + * @throws IOException if underlying serialization throws it + */ + public void serialize(JobConf conf) throws IOException { + conf.set("pig.UDFContext", ObjectSerializer.serialize(udfConfs)); + } + + /** + * Populate the udfConfs field. This function is intended to + * be called by Map.configure or Reduce.configure on the backend. + * It assumes that addJobConf has already been called. + * @throws IOException if underlying deseralization throws it + */ + @SuppressWarnings("unchecked") + public void deserialize() throws IOException { + udfConfs = (HashMap<Integer, Properties>)ObjectSerializer.deserialize(jconf.get("pig.UDFContext")); + } + + @SuppressWarnings("unchecked") + private int generateKey(Class c) { + return c.getName().hashCode(); + } + + @SuppressWarnings("unchecked") + private int generateKey(Class c, String[] args) { + int hc = c.getName().hashCode(); + for (int i = 0; i < args.length; i++) { + hc <<= 1; + hc ^= args[i].hashCode(); + } + return hc; + } + +} \ No newline at end of file Added: hadoop/pig/trunk/test/org/apache/pig/test/TestUDFContext.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestUDFContext.java?rev=881008&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestUDFContext.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestUDFContext.java Mon Nov 16 22:18:46 2009 @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Iterator; +import java.util.Properties; + +import org.apache.pig.EvalFunc; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.builtin.PigStorage; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.BufferedPositionedInputStream; +import org.apache.pig.impl.io.FileLocalizer; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.util.UDFContext; +import org.junit.Test; + +import junit.framework.TestCase; + + +public class TestUDFContext extends TestCase { + + static MiniCluster cluster = null; + + @Override + protected void setUp() throws Exception { + cluster = MiniCluster.buildCluster(); + } + + + @Test + public void testUDFContext() throws Exception { + Util.createInputFile(cluster, "a.txt", new String[] { "dumb" }); + Util.createInputFile(cluster, "b.txt", new String[] { "dumber" }); + FileLocalizer.deleteTempFiles(); + PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); + String[] statement = { "A = LOAD 'a.txt' USING org.apache.pig.test.utils.UDFContextTestLoader('joe');", + "B = LOAD 'b.txt' USING org.apache.pig.test.utils.UDFContextTestLoader('jane');", + "C = union A, B;", + "D = FOREACH C GENERATE $0, $1, org.apache.pig.test.utils.UDFContextTestEvalFunc($0), org.apache.pig.test.utils.UDFContextTestEvalFunc2($0);" }; + + File tmpFile = File.createTempFile("temp_jira_851", ".pig"); + FileWriter writer = new FileWriter(tmpFile); + for (String line : statement) { + writer.write(line + "\n"); + } + writer.close(); + + pig.registerScript(tmpFile.getAbsolutePath()); + Iterator<Tuple> iterator = pig.openIterator("D"); + while (iterator.hasNext()) { + Tuple tuple = iterator.next(); + if ("dumb".equals(tuple.get(0).toString())) { + assertEquals(tuple.get(1).toString(), "joe"); + } else if ("dumber".equals(tuple.get(0).toString())) { + assertEquals(tuple.get(1).toString(), "jane"); + } + assertEquals(Integer.valueOf(tuple.get(2).toString()), new Integer(5)); + assertEquals(tuple.get(3).toString(), "five"); + } + } +} Added: hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java?rev=881008&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java Mon Nov 16 22:18:46 2009 @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test.utils; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.pig.EvalFunc; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.util.UDFContext; + +public class UDFContextTestEvalFunc extends EvalFunc<Integer> { + public UDFContextTestEvalFunc() { + Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass()); + p.setProperty("key1", "5"); + } + + @Override + public Integer exec(Tuple input) throws IOException { + String s = (UDFContext.getUDFContext().getUDFProperties(this.getClass()).getProperty("key1")); + return Integer.valueOf(s); + } + +} Added: hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java?rev=881008&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java Mon Nov 16 22:18:46 2009 @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test.utils; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.pig.EvalFunc; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.util.UDFContext; + +public class UDFContextTestEvalFunc2 extends EvalFunc<String> { + + public UDFContextTestEvalFunc2() { + Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass()); + p.setProperty("key1", "five"); + } + + @Override + public String exec(Tuple input) throws IOException { + if (UDFContext.getUDFContext().getJobConf() == null) return "JobConf is null!"; + else return UDFContext.getUDFContext().getUDFProperties(this.getClass()).getProperty("key1"); + } + +} Added: hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestLoader.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestLoader.java?rev=881008&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestLoader.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/utils/UDFContextTestLoader.java Mon Nov 16 22:18:46 2009 @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test.utils; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.pig.builtin.PigStorage; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.BufferedPositionedInputStream; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.util.UDFContext; + +public class UDFContextTestLoader extends PigStorage { + + private String[] vals = new String[1]; + + public UDFContextTestLoader(String v1) { + vals[0] = v1; + Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), vals); + p.setProperty("key1", vals[0]); + } + + @Override + public Tuple getNext() throws IOException { + Tuple t = super.getNext(); + if (t != null) { + if (UDFContext.getUDFContext().getJobConf() == null) { + t.append("JobConf is null!"); + } else { + Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), vals); + t.append(p.getProperty("key1")); + } + } + return t; + } +}