Hi, so did you solve the problem?
I suppose you understand the idea of distirubted cache. It doesn't matter
is it local or distributed mode. The idea is that you access local file
system.
It's better to use Oozie in prod, it does place files to distributec cache
for you.

Here is an example:

<action name="an-action-with-pig-script">
        <pig>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${output_path}" />
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>default</value>
                </property>
                <property>
                    <name>pig.exec.reducers.bytes.per.reducer</name>
                    <value>50000000</value>
                </property>
                <!-- more conf .... ->
            </configuration>

            <script>pig/my_script.pig</script>

            <!-- See file tag -->
            <param>urlPath=./source_url</param>

            <param>in_dir=${in_dir}</param>
            <param>output=${output_path}</param>

            <param>udf=my_jython_udf.py</param> <!-- put your udf to dist
cache -->
            <file>pig/udf/my_jython_udf.py#my_jython_udf.py</file>

            <!-- put your file to dist cache see urlPath=./source_url -->
            <file>${source_url_in_dir}/part-r-00000.avro#source_url</file>

        </pig>
        <ok to="some-next-action"/>
        <error to="kill"/>
    </action>




2014/1/7 Russell Jurney <[email protected]>

> Matt Hayes of Datafu project showed me this code, which works in local mode
> and in hadoop mode. This should be folded into getCacheFiles(), imo:
>
> public static final String MODEL_FILE = "MODEL_FILE";
> private TokenizerME tokenizer;
>
> public Tokenize(String modelPath) {
>   this.modelPath = modelPath;
> }
>
> @Override
> public List<String> getCacheFiles() {
>   List<String> list = new ArrayList<String>(1);
>   list.add(this.modelPath + "#" + MODEL_FILE);
>   return list;
> }
>
> public DataBag exec(Tuple input) throws IOException
> {
>   if (this.tokenizer == null) {
>     initTokenizer();
>   }
>
>   // etc.
> }
>
> private void initTokenizer() {
>   String loadFile = getFilename();
>   InputStream file = new FileInputStream(loadFile);
>   InputStream buffer = new BufferedInputStream(file);
>   TokenizerModel model = new TokenizerModel(buffer);
>   this.tokenizer = new TokenizerME(model);
> }
>
> private String getFilename() throws IOException {
>   // if the symlink exists, use it, if not, use the raw name if it exists
>   // note: this is to help with testing, as it seems distributed cache
> doesn't work with PigUnit
>   String loadFile = MODEL_FILE;
>   if (!new File(loadFile).exists()) {
>     if (new File(this.filename).exists()) {
>       loadFile = this.modelPath;
>     } else {
>       throw new IOException(String.format("could not load model,
> neither symlink %s nor file %s exist", MODEL_FILE, this.modelPath));
>     }
>   }
>   return loadFile;
> }
>
>
>
> On Mon, Jan 6, 2014 at 12:39 PM, Russell Jurney <[email protected]
> >wrote:
>
> > According to https://issues.apache.org/jira/browse/PIG-1752 :
> >
> > "One other note. I didn't include any unit tests with this patch. I don't
> > know how to test it in the unit tests since the distributed cache isn't
> > used in local mode. I've tested it on a cluster. Any thoughts on how to
> > include tests for this in the unit tests are welcome."
> >
> > getcacheFiles does not work with local mode. This is problematic. How do
> I
> > write a UDF that works in both local mode and hadoop mode?
> >
> >
> > On Mon, Jan 6, 2014 at 12:08 PM, Russell Jurney <
> [email protected]>wrote:
> >
> >> Question: in local mode, can the path given to getCacheFiles() be on the
> >> local filesystem? Or does it have to be on HDFS?
> >>
> >>
> >> On Mon, Jan 6, 2014 at 11:29 AM, Russell Jurney <
> [email protected]
> >> > wrote:
> >>
> >>> 1. I've also given it an absolute local path. I don't know what you
> mean
> >>> by an absolute cache path. How do I know what that is? The examples use
> >>> ./link to access the cached file.
> >>> 2. Because all examples do so. What paths should I use to access the
> >>> distributed cache from inside exec?
> >>>
> >>> Exception does say that passed is missing. But as I read the examples,
> >>> it should be there.
> >>>
> >>> On Monday, January 6, 2014, Serega Sheypak wrote:
> >>>
> >>>> Yes it works. Exception clearly says that ./passwd is missing.
> >>>> 1. Try to give absolute path to file, see if it works. It should.
> >>>> 2. Then give relative path. Looks like you incorrectly provide
> relative
> >>>> path. why do you put "./" before filename?
> >>>>
> >>>>
> >>>> 2014/1/6 Russell Jurney <[email protected]>
> >>>>
> >>>> > I have implemented to class below to test the udf cache, and it
> fails
> >>>> in
> >>>> > local mode with the error below. That cache should work in local
> mode
> >>>> as
> >>>> > well, right?
> >>>> >
> >>>> > ------------
> >>>> >
> >>>> > org.apache.pig.backend.executionengine.ExecException: ERROR 2078:
> >>>> Caught
> >>>> > error from UDF: datafu.pig.text.Udfcachetest [./passwd (No such file
> >>>> or
> >>>> > directory)]
> >>>> >
> >>>> > at
> >>>> >
> >>>> >
> >>>>
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:358)
> >>>> >
> >>>> > at
> >>>> >
> >>>> >
> >>>>
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNextString(POUserFunc.java:432)
> >>>> >
> >>>> > at
> >>>> >
> >>>> >
> >>>>
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:315)
> >>>> >
> >>>> > at
> >>>> >
> >>>> >
> >>>>
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:378)
> >>>> >
> >>>> > at
> >>>> >
> >>>> >
> >>>>
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:298)
> >>>> >
> >>>> > at
> >>>> >
> >>>> >
> >>>>
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:282)
> >>>> >
> >>>> > at
> >>>> >
> >>>> >
> >>>>
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:277)
> >>>> >
> >>>> > at
> >>>> >
> >>>> >
> >>>>
> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64)
> >>>> >
> >>>> > at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
> >>>> >
> >>>> > at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
> >>>> >
> >>>> > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
> >>>> >
> >>>> > at
> >>>>
> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)
> >>>> >
> >>>> > Caused by: java.io.FileNotFoundException: ./passwd (No such file or
> >>>> > directory)
> >>>> >
> >>>> > at java.io.FileInputStream.open(Native Method)
> >>>> >
> >>>> > at java.io.FileInputStream.<init>(FileInputStream.java:146)
> >>>> >
> >>>> > at java.io.FileInputStream.<init>(FileInputStream.java:101)
> >>>> >
> >>>> > at java.io.FileReader.<init>(FileReader.java:58)
> >>>> >
> >>>> > at datafu.pig.text.Udfcachetest.exec(Udfcachetest.java:22)
> >>>> >
> >>>> > at datafu.pig.text.Udfcachetest.exec(Udfcachetest.java:19)
> >>>> >
> >>>> > at
> >>>> >
> >>>> >
> >>>>
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:330)
> >>>> > -----------------------
> >>>> >
> >>>> > package datafu.pig.text;
> >>>> >
> >>>> > import org.apache.pig.EvalFunc;
> >>>> > import org.apache.pig.data.Tuple;
> >>>> >
> >>>> > import java.io.BufferedReader;
> >>>> > import java.io.FileReader;
> >>>> > import java.io.IOException;
> >>>> > import java.util.ArrayList;
> >>>> > import java.util.List;
> >>>> >
> >>>> > /**
> >>>> >  * Created with IntelliJ IDEA.
> >>>> >  * User: rjurney
> >>>> >  * Date: 1/5/14
> >>>> >  * Time: 8:32 PM
> >>>> >  * To change this template use File | Settings | File Templates.
> >>>> >  */
> >>>> > public class Udfcachetest extends EvalFunc<String> {
> >>>> >
> >>>> >     public String exec(Tuple input) throws IOException {
> >>>> >         FileReader fr = new FileReader("./passwd");
> >>>> >         BufferedReader d = new BufferedReader(fr);
> >>>> >         return d.readLine();
> >>>> >     }
> >>>> >
> >>>> >     public List<String> getCacheFiles() {
> >>>> >         List<String> list = new ArrayList<String>(1);
> >>>> >         list.add("/etc/passwd");
> >>>> >         return list;
> >>>> >     }
> >>>> > }
> >>>> >
> >>>> > --
> >>>> > Russell Jurney twitter.com/rjurney [email protected]
> >>>> > datasyndrome.com
> >>>> >
> >>>>
> >>>
> >>>
> >>> --
> >>> Russell Jurney twitter.com/rjurney [email protected]
> >>> .com
> >>>
> >>
> >>
> >>
> >> --
> >> Russell Jurney twitter.com/rjurney [email protected].
> >> com
> >>
> >
> >
> >
> > --
> > Russell Jurney twitter.com/rjurney [email protected].
> > com
> >
>
>
>
> --
> Russell Jurney twitter.com/rjurney [email protected]
> datasyndrome.com
>

Reply via email to