Matt Hayes of Datafu project showed me this code, which works in local mode
and in hadoop mode. This should be folded into getCacheFiles(), imo:

public static final String MODEL_FILE = "MODEL_FILE";
private TokenizerME tokenizer;

public Tokenize(String modelPath) {
  this.modelPath = modelPath;
}

@Override
public List<String> getCacheFiles() {
  List<String> list = new ArrayList<String>(1);
  list.add(this.modelPath + "#" + MODEL_FILE);
  return list;
}

public DataBag exec(Tuple input) throws IOException
{
  if (this.tokenizer == null) {
    initTokenizer();
  }

  // etc.
}

private void initTokenizer() {
  String loadFile = getFilename();
  InputStream file = new FileInputStream(loadFile);
  InputStream buffer = new BufferedInputStream(file);
  TokenizerModel model = new TokenizerModel(buffer);
  this.tokenizer = new TokenizerME(model);
}

private String getFilename() throws IOException {
  // if the symlink exists, use it, if not, use the raw name if it exists
  // note: this is to help with testing, as it seems distributed cache
doesn't work with PigUnit
  String loadFile = MODEL_FILE;
  if (!new File(loadFile).exists()) {
    if (new File(this.filename).exists()) {
      loadFile = this.modelPath;
    } else {
      throw new IOException(String.format("could not load model,
neither symlink %s nor file %s exist", MODEL_FILE, this.modelPath));
    }
  }
  return loadFile;
}



On Mon, Jan 6, 2014 at 12:39 PM, Russell Jurney <[email protected]>wrote:

> According to https://issues.apache.org/jira/browse/PIG-1752 :
>
> "One other note. I didn't include any unit tests with this patch. I don't
> know how to test it in the unit tests since the distributed cache isn't
> used in local mode. I've tested it on a cluster. Any thoughts on how to
> include tests for this in the unit tests are welcome."
>
> getcacheFiles does not work with local mode. This is problematic. How do I
> write a UDF that works in both local mode and hadoop mode?
>
>
> On Mon, Jan 6, 2014 at 12:08 PM, Russell Jurney 
> <[email protected]>wrote:
>
>> Question: in local mode, can the path given to getCacheFiles() be on the
>> local filesystem? Or does it have to be on HDFS?
>>
>>
>> On Mon, Jan 6, 2014 at 11:29 AM, Russell Jurney <[email protected]
>> > wrote:
>>
>>> 1. I've also given it an absolute local path. I don't know what you mean
>>> by an absolute cache path. How do I know what that is? The examples use
>>> ./link to access the cached file.
>>> 2. Because all examples do so. What paths should I use to access the
>>> distributed cache from inside exec?
>>>
>>> Exception does say that passed is missing. But as I read the examples,
>>> it should be there.
>>>
>>> On Monday, January 6, 2014, Serega Sheypak wrote:
>>>
>>>> Yes it works. Exception clearly says that ./passwd is missing.
>>>> 1. Try to give absolute path to file, see if it works. It should.
>>>> 2. Then give relative path. Looks like you incorrectly provide relative
>>>> path. why do you put "./" before filename?
>>>>
>>>>
>>>> 2014/1/6 Russell Jurney <[email protected]>
>>>>
>>>> > I have implemented to class below to test the udf cache, and it fails
>>>> in
>>>> > local mode with the error below. That cache should work in local mode
>>>> as
>>>> > well, right?
>>>> >
>>>> > ------------
>>>> >
>>>> > org.apache.pig.backend.executionengine.ExecException: ERROR 2078:
>>>> Caught
>>>> > error from UDF: datafu.pig.text.Udfcachetest [./passwd (No such file
>>>> or
>>>> > directory)]
>>>> >
>>>> > at
>>>> >
>>>> >
>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:358)
>>>> >
>>>> > at
>>>> >
>>>> >
>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNextString(POUserFunc.java:432)
>>>> >
>>>> > at
>>>> >
>>>> >
>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:315)
>>>> >
>>>> > at
>>>> >
>>>> >
>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:378)
>>>> >
>>>> > at
>>>> >
>>>> >
>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:298)
>>>> >
>>>> > at
>>>> >
>>>> >
>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:282)
>>>> >
>>>> > at
>>>> >
>>>> >
>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:277)
>>>> >
>>>> > at
>>>> >
>>>> >
>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64)
>>>> >
>>>> > at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
>>>> >
>>>> > at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
>>>> >
>>>> > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
>>>> >
>>>> > at
>>>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)
>>>> >
>>>> > Caused by: java.io.FileNotFoundException: ./passwd (No such file or
>>>> > directory)
>>>> >
>>>> > at java.io.FileInputStream.open(Native Method)
>>>> >
>>>> > at java.io.FileInputStream.<init>(FileInputStream.java:146)
>>>> >
>>>> > at java.io.FileInputStream.<init>(FileInputStream.java:101)
>>>> >
>>>> > at java.io.FileReader.<init>(FileReader.java:58)
>>>> >
>>>> > at datafu.pig.text.Udfcachetest.exec(Udfcachetest.java:22)
>>>> >
>>>> > at datafu.pig.text.Udfcachetest.exec(Udfcachetest.java:19)
>>>> >
>>>> > at
>>>> >
>>>> >
>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:330)
>>>> > -----------------------
>>>> >
>>>> > package datafu.pig.text;
>>>> >
>>>> > import org.apache.pig.EvalFunc;
>>>> > import org.apache.pig.data.Tuple;
>>>> >
>>>> > import java.io.BufferedReader;
>>>> > import java.io.FileReader;
>>>> > import java.io.IOException;
>>>> > import java.util.ArrayList;
>>>> > import java.util.List;
>>>> >
>>>> > /**
>>>> >  * Created with IntelliJ IDEA.
>>>> >  * User: rjurney
>>>> >  * Date: 1/5/14
>>>> >  * Time: 8:32 PM
>>>> >  * To change this template use File | Settings | File Templates.
>>>> >  */
>>>> > public class Udfcachetest extends EvalFunc<String> {
>>>> >
>>>> >     public String exec(Tuple input) throws IOException {
>>>> >         FileReader fr = new FileReader("./passwd");
>>>> >         BufferedReader d = new BufferedReader(fr);
>>>> >         return d.readLine();
>>>> >     }
>>>> >
>>>> >     public List<String> getCacheFiles() {
>>>> >         List<String> list = new ArrayList<String>(1);
>>>> >         list.add("/etc/passwd");
>>>> >         return list;
>>>> >     }
>>>> > }
>>>> >
>>>> > --
>>>> > Russell Jurney twitter.com/rjurney [email protected]
>>>> > datasyndrome.com
>>>> >
>>>>
>>>
>>>
>>> --
>>> Russell Jurney twitter.com/rjurney [email protected] datasyndrome
>>> .com
>>>
>>
>>
>>
>> --
>> Russell Jurney twitter.com/rjurney [email protected] datasyndrome.
>> com
>>
>
>
>
> --
> Russell Jurney twitter.com/rjurney [email protected] datasyndrome.
> com
>



-- 
Russell Jurney twitter.com/rjurney [email protected] datasyndrome.com

Reply via email to