Author: gates
Date: Thu Nov 19 22:34:46 2009
New Revision: 882339
URL: http://svn.apache.org/viewvc?rev=882339&view=rev
Log:
Pass JobConf and UDF specific configuration information to UDFs.
Added:
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/util/UDFContext.java
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestUDFContext.java
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestLoader.java
Modified:
hadoop/pig/branches/branch-0.6/CHANGES.txt
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=882339&r1=882338&r2=882339&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/CHANGES.txt Thu Nov 19 22:34:46 2009
@@ -24,6 +24,9 @@
IMPROVEMENTS
+PIG-1085: Pass JobConf and UDF specific configuration information to UDFs
+ (gates)
+
PIG-1089: Pig 0.6.0 Documentation (chandec via olgan)
PIG-958: Splitting output data on key field (ankur via pradeepkth)
Modified:
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=882339&r1=882338&r2=882339&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Thu Nov 19 22:34:46 2009
@@ -81,6 +81,7 @@
import org.apache.pig.impl.util.JarManager;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.UDFContext;
/**
* This is compiler class that takes an MROperPlan and converts
@@ -596,6 +597,9 @@
jobConf.setOutputCommitter(PigOutputCommitter.class);
Job job = new Job(jobConf);
jobStoreMap.put(job,new Pair<List<POStore>, Path>(storeLocations,
tmpLocation));
+
+ // Serialize the UDF specific context info.
+ UDFContext.getUDFContext().serialize(jobConf);
return job;
} catch (JobCreationException jce) {
throw jce;
Modified:
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=882339&r1=882338&r2=882339&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
(original)
+++
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
Thu Nov 19 22:34:46 2009
@@ -49,6 +49,7 @@
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.impl.util.UDFContext;
public abstract class PigMapBase extends MapReduceBase{
private static final Tuple DUMMYTUPLE = null;
@@ -166,6 +167,12 @@
keyType =
((byte[])ObjectSerializer.deserialize(job.get("pig.map.keytype")))[0];
pigReporter = new ProgressableReporter();
+
+ // Get the UDF specific context
+ UDFContext udfc = UDFContext.getUDFContext();
+ udfc.addJobConf(job);
+ udfc.deserialize();
+
if(!(mp.isEmpty())) {
List<OperatorKey> targetOpKeys =
(ArrayList<OperatorKey>)ObjectSerializer.deserialize(job.get("map.target.ops"));
@@ -178,7 +185,6 @@
}
-
} catch (IOException ioe) {
String msg = "Problem while configuring map plan.";
throw new RuntimeException(msg, ioe);
Modified:
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=882339&r1=882338&r2=882339&view=diff
==============================================================================
---
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
(original)
+++
hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
Thu Nov 19 22:34:46 2009
@@ -57,6 +57,7 @@
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.WrappedIOException;
import org.apache.pig.data.DataBag;
@@ -301,6 +302,12 @@
roots = rp.getRoots().toArray(new PhysicalOperator[1]);
leaf = rp.getLeaves().get(0);
}
+
+ // Get the UDF specific context
+ UDFContext udfc = UDFContext.getUDFContext();
+ udfc.addJobConf(jConf);
+ udfc.deserialize();
+
} catch (IOException ioe) {
String msg = "Problem while configuring reduce plan.";
throw new RuntimeException(msg, ioe);
Added:
hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/util/UDFContext.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/util/UDFContext.java?rev=882339&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/util/UDFContext.java
(added)
+++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/impl/util/UDFContext.java
Thu Nov 19 22:34:46 2009
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.util;
+
+import java.io.IOException;
+//import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Properties;
+
+import org.apache.hadoop.mapred.JobConf;
+
+import org.apache.pig.impl.util.ObjectSerializer;
+
+...@suppresswarnings("deprecation")
+public class UDFContext {
+
+ private JobConf jconf = null;
+ private HashMap<Integer, Properties> udfConfs;
+
+ private static UDFContext self = null;
+
+ private UDFContext() {
+ udfConfs = new HashMap<Integer, Properties>();
+ }
+
+ public static UDFContext getUDFContext() {
+ if (self == null) {
+ self = new UDFContext();
+ }
+ return self;
+ }
+
+ /**
+ * Adds the JobConf to this singleton. Will be
+ * called on the backend by the Map and Reduce
+ * functions so that UDFs can obtain the JobConf
+ * on the backend.
+ */
+ public void addJobConf(JobConf conf) {
+ jconf = conf;
+ }
+
+ /**
+ * Get the JobConf. This should only be called on
+ * the backend. It will return null on the frontend.
+ * @return JobConf for this job. This is a copy of the
+ * JobConf. Nothing written here will be kept by the system.
+ * getUDFConf should be used for recording UDF specific
+ * information.
+ */
+ public JobConf getJobConf() {
+ if (jconf != null) return new JobConf(jconf);
+ else return null;
+ }
+
+ /**
+ * Get a properties object that is specific to this UDF.
+ * Note that if a given UDF is called multiple times in a script,
+ * and each instance passes different arguments, then each will
+ * be provided with different configuration object.
+ * This can be used by loaders to pass their input object path
+ * or URI and separate themselves from other instances of the
+ * same loader. Constructor arguments could also be used,
+ * as they are available on both the front and back end.
+ *
+ * Note that this can only be used to share information
+ * across instantiations of the same function in the front end
+ * and between front end and back end. It cannot be used to
+ * share information between instantiations (that is, between
+ * map and/or reduce instances) on the back end at runtime.
+ * @param c of the UDF obtaining the properties object.
+ * @param args String arguments that make this instance of
+ * the UDF unique.
+ * @return A reference to the properties object specific to
+ * the calling UDF. This is a reference, not a copy.
+ * Any changes to this object will automatically be
+ * propogated to other instances of the UDF calling this
+ * function.
+ */
+
+ @SuppressWarnings("unchecked")
+ public Properties getUDFProperties(Class c, String[] args) {
+ Integer k = generateKey(c, args);
+ Properties p = udfConfs.get(k);
+ if (p == null) {
+ p = new Properties();
+ udfConfs.put(k, p);
+ }
+ return p;
+ }
+
+ /**
+ * Get a properties object that is specific to this UDF.
+ * Note that if a given UDF is called multiple times in a script,
+ * they will all be provided the same configuration object. It
+ * is up to the UDF to make sure the multiple instances do not
+ * stomp on each other.
+ *
+ * It is guaranteed that this properties object will be separate
+ * from that provided to any other UDF.
+ *
+ * Note that this can only be used to share information
+ * across instantiations of the same function in the front end
+ * and between front end and back end. It cannot be used to
+ * share information between instantiations (that is, between
+ * map and/or reduce instances) on the back end at runtime.
+ * @param c of the UDF obtaining the properties object.
+ * @return A reference to the properties object specific to
+ * the calling UDF. This is a reference, not a copy.
+ * Any changes to this object will automatically be
+ * propogated to other instances of the UDF calling this
+ * function.
+ */
+ @SuppressWarnings("unchecked")
+ public Properties getUDFProperties(Class c) {
+ Integer k = generateKey(c);
+ Properties p = udfConfs.get(k);
+ if (p == null) {
+ p = new Properties();
+ udfConfs.put(k, p);
+ }
+ return p;
+ }
+
+ /**
+ * Serialize the UDF specific information into an instance
+ * of JobConf. This function is intended to be called on
+ * the front end in preparation for sending the data to the
+ * backend.
+ * @param conf JobConf to serialize into
+ * @throws IOException if underlying serialization throws it
+ */
+ public void serialize(JobConf conf) throws IOException {
+ conf.set("pig.UDFContext", ObjectSerializer.serialize(udfConfs));
+ }
+
+ /**
+ * Populate the udfConfs field. This function is intended to
+ * be called by Map.configure or Reduce.configure on the backend.
+ * It assumes that addJobConf has already been called.
+ * @throws IOException if underlying deseralization throws it
+ */
+ @SuppressWarnings("unchecked")
+ public void deserialize() throws IOException {
+ udfConfs = (HashMap<Integer,
Properties>)ObjectSerializer.deserialize(jconf.get("pig.UDFContext"));
+ }
+
+ @SuppressWarnings("unchecked")
+ private int generateKey(Class c) {
+ return c.getName().hashCode();
+ }
+
+ @SuppressWarnings("unchecked")
+ private int generateKey(Class c, String[] args) {
+ int hc = c.getName().hashCode();
+ for (int i = 0; i < args.length; i++) {
+ hc <<= 1;
+ hc ^= args[i].hashCode();
+ }
+ return hc;
+ }
+
+}
\ No newline at end of file
Added:
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestUDFContext.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestUDFContext.java?rev=882339&view=auto
==============================================================================
--- hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestUDFContext.java
(added)
+++ hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/TestUDFContext.java
Thu Nov 19 22:34:46 2009
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.UDFContext;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+
+public class TestUDFContext extends TestCase {
+
+ static MiniCluster cluster = null;
+
+ @Override
+ protected void setUp() throws Exception {
+ cluster = MiniCluster.buildCluster();
+ }
+
+
+ @Test
+ public void testUDFContext() throws Exception {
+ Util.createInputFile(cluster, "a.txt", new String[] { "dumb" });
+ Util.createInputFile(cluster, "b.txt", new String[] { "dumber" });
+ FileLocalizer.deleteTempFiles();
+ PigServer pig = new PigServer(ExecType.MAPREDUCE,
cluster.getProperties());
+ String[] statement = { "A = LOAD 'a.txt' USING
org.apache.pig.test.utils.UDFContextTestLoader('joe');",
+ "B = LOAD 'b.txt' USING
org.apache.pig.test.utils.UDFContextTestLoader('jane');",
+ "C = union A, B;",
+ "D = FOREACH C GENERATE $0, $1,
org.apache.pig.test.utils.UDFContextTestEvalFunc($0),
org.apache.pig.test.utils.UDFContextTestEvalFunc2($0);" };
+
+ File tmpFile = File.createTempFile("temp_jira_851", ".pig");
+ FileWriter writer = new FileWriter(tmpFile);
+ for (String line : statement) {
+ writer.write(line + "\n");
+ }
+ writer.close();
+
+ pig.registerScript(tmpFile.getAbsolutePath());
+ Iterator<Tuple> iterator = pig.openIterator("D");
+ while (iterator.hasNext()) {
+ Tuple tuple = iterator.next();
+ if ("dumb".equals(tuple.get(0).toString())) {
+ assertEquals(tuple.get(1).toString(), "joe");
+ } else if ("dumber".equals(tuple.get(0).toString())) {
+ assertEquals(tuple.get(1).toString(), "jane");
+ }
+ assertEquals(Integer.valueOf(tuple.get(2).toString()), new
Integer(5));
+ assertEquals(tuple.get(3).toString(), "five");
+ }
+ }
+}
Added:
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java?rev=882339&view=auto
==============================================================================
---
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java
(added)
+++
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestEvalFunc.java
Thu Nov 19 22:34:46 2009
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+
+public class UDFContextTestEvalFunc extends EvalFunc<Integer> {
+ public UDFContextTestEvalFunc() {
+ Properties p =
UDFContext.getUDFContext().getUDFProperties(this.getClass());
+ p.setProperty("key1", "5");
+ }
+
+ @Override
+ public Integer exec(Tuple input) throws IOException {
+ String s =
(UDFContext.getUDFContext().getUDFProperties(this.getClass()).getProperty("key1"));
+ return Integer.valueOf(s);
+ }
+
+}
Added:
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java?rev=882339&view=auto
==============================================================================
---
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java
(added)
+++
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestEvalFunc2.java
Thu Nov 19 22:34:46 2009
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+
+public class UDFContextTestEvalFunc2 extends EvalFunc<String> {
+
+ public UDFContextTestEvalFunc2() {
+ Properties p =
UDFContext.getUDFContext().getUDFProperties(this.getClass());
+ p.setProperty("key1", "five");
+ }
+
+ @Override
+ public String exec(Tuple input) throws IOException {
+ if (UDFContext.getUDFContext().getJobConf() == null) return "JobConf
is null!";
+ else return
UDFContext.getUDFContext().getUDFProperties(this.getClass()).getProperty("key1");
+ }
+
+}
Added:
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestLoader.java?rev=882339&view=auto
==============================================================================
---
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestLoader.java
(added)
+++
hadoop/pig/branches/branch-0.6/test/org/apache/pig/test/utils/UDFContextTestLoader.java
Thu Nov 19 22:34:46 2009
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test.utils;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.UDFContext;
+
+public class UDFContextTestLoader extends PigStorage {
+
+ private String[] vals = new String[1];
+
+ public UDFContextTestLoader(String v1) {
+ vals[0] = v1;
+ Properties p =
UDFContext.getUDFContext().getUDFProperties(this.getClass(), vals);
+ p.setProperty("key1", vals[0]);
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ Tuple t = super.getNext();
+ if (t != null) {
+ if (UDFContext.getUDFContext().getJobConf() == null) {
+ t.append("JobConf is null!");
+ } else {
+ Properties p =
UDFContext.getUDFContext().getUDFProperties(this.getClass(), vals);
+ t.append(p.getProperty("key1"));
+ }
+ }
+ return t;
+ }
+}