Matt Hayes of Datafu project showed me this code, which works in local mode
and in hadoop mode. This should be folded into getCacheFiles(), imo:
public static final String MODEL_FILE = "MODEL_FILE";
private TokenizerME tokenizer;
public Tokenize(String modelPath) {
this.modelPath = modelPath;
}
@Override
public List<String> getCacheFiles() {
List<String> list = new ArrayList<String>(1);
list.add(this.modelPath + "#" + MODEL_FILE);
return list;
}
public DataBag exec(Tuple input) throws IOException
{
if (this.tokenizer == null) {
initTokenizer();
}
// etc.
}
private void initTokenizer() {
String loadFile = getFilename();
InputStream file = new FileInputStream(loadFile);
InputStream buffer = new BufferedInputStream(file);
TokenizerModel model = new TokenizerModel(buffer);
this.tokenizer = new TokenizerME(model);
}
private String getFilename() throws IOException {
// if the symlink exists, use it, if not, use the raw name if it exists
// note: this is to help with testing, as it seems distributed cache
doesn't work with PigUnit
String loadFile = MODEL_FILE;
if (!new File(loadFile).exists()) {
if (new File(this.filename).exists()) {
loadFile = this.modelPath;
} else {
throw new IOException(String.format("could not load model,
neither symlink %s nor file %s exist", MODEL_FILE, this.modelPath));
}
}
return loadFile;
}
On Mon, Jan 6, 2014 at 12:39 PM, Russell Jurney <[email protected]>wrote:
> According to https://issues.apache.org/jira/browse/PIG-1752 :
>
> "One other note. I didn't include any unit tests with this patch. I don't
> know how to test it in the unit tests since the distributed cache isn't
> used in local mode. I've tested it on a cluster. Any thoughts on how to
> include tests for this in the unit tests are welcome."
>
> getcacheFiles does not work with local mode. This is problematic. How do I
> write a UDF that works in both local mode and hadoop mode?
>
>
> On Mon, Jan 6, 2014 at 12:08 PM, Russell Jurney
> <[email protected]>wrote:
>
>> Question: in local mode, can the path given to getCacheFiles() be on the
>> local filesystem? Or does it have to be on HDFS?
>>
>>
>> On Mon, Jan 6, 2014 at 11:29 AM, Russell Jurney <[email protected]
>> > wrote:
>>
>>> 1. I've also given it an absolute local path. I don't know what you mean
>>> by an absolute cache path. How do I know what that is? The examples use
>>> ./link to access the cached file.
>>> 2. Because all examples do so. What paths should I use to access the
>>> distributed cache from inside exec?
>>>
>>> Exception does say that passed is missing. But as I read the examples,
>>> it should be there.
>>>
>>> On Monday, January 6, 2014, Serega Sheypak wrote:
>>>
>>>> Yes it works. Exception clearly says that ./passwd is missing.
>>>> 1. Try to give absolute path to file, see if it works. It should.
>>>> 2. Then give relative path. Looks like you incorrectly provide relative
>>>> path. why do you put "./" before filename?
>>>>
>>>>
>>>> 2014/1/6 Russell Jurney <[email protected]>
>>>>
>>>> > I have implemented to class below to test the udf cache, and it fails
>>>> in
>>>> > local mode with the error below. That cache should work in local mode
>>>> as
>>>> > well, right?
>>>> >
>>>> > ------------
>>>> >
>>>> > org.apache.pig.backend.executionengine.ExecException: ERROR 2078:
>>>> Caught
>>>> > error from UDF: datafu.pig.text.Udfcachetest [./passwd (No such file
>>>> or
>>>> > directory)]
>>>> >
>>>> > at
>>>> >
>>>> >
>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:358)
>>>> >
>>>> > at
>>>> >
>>>> >
>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNextString(POUserFunc.java:432)
>>>> >
>>>> > at
>>>> >
>>>> >
>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:315)
>>>> >
>>>> > at
>>>> >
>>>> >
>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:378)
>>>> >
>>>> > at
>>>> >
>>>> >
>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:298)
>>>> >
>>>> > at
>>>> >
>>>> >
>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.runPipeline(PigGenericMapBase.java:282)
>>>> >
>>>> > at
>>>> >
>>>> >
>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:277)
>>>> >
>>>> > at
>>>> >
>>>> >
>>>> org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64)
>>>> >
>>>> > at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
>>>> >
>>>> > at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
>>>> >
>>>> > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
>>>> >
>>>> > at
>>>> org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)
>>>> >
>>>> > Caused by: java.io.FileNotFoundException: ./passwd (No such file or
>>>> > directory)
>>>> >
>>>> > at java.io.FileInputStream.open(Native Method)
>>>> >
>>>> > at java.io.FileInputStream.<init>(FileInputStream.java:146)
>>>> >
>>>> > at java.io.FileInputStream.<init>(FileInputStream.java:101)
>>>> >
>>>> > at java.io.FileReader.<init>(FileReader.java:58)
>>>> >
>>>> > at datafu.pig.text.Udfcachetest.exec(Udfcachetest.java:22)
>>>> >
>>>> > at datafu.pig.text.Udfcachetest.exec(Udfcachetest.java:19)
>>>> >
>>>> > at
>>>> >
>>>> >
>>>> org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:330)
>>>> > -----------------------
>>>> >
>>>> > package datafu.pig.text;
>>>> >
>>>> > import org.apache.pig.EvalFunc;
>>>> > import org.apache.pig.data.Tuple;
>>>> >
>>>> > import java.io.BufferedReader;
>>>> > import java.io.FileReader;
>>>> > import java.io.IOException;
>>>> > import java.util.ArrayList;
>>>> > import java.util.List;
>>>> >
>>>> > /**
>>>> > * Created with IntelliJ IDEA.
>>>> > * User: rjurney
>>>> > * Date: 1/5/14
>>>> > * Time: 8:32 PM
>>>> > * To change this template use File | Settings | File Templates.
>>>> > */
>>>> > public class Udfcachetest extends EvalFunc<String> {
>>>> >
>>>> > public String exec(Tuple input) throws IOException {
>>>> > FileReader fr = new FileReader("./passwd");
>>>> > BufferedReader d = new BufferedReader(fr);
>>>> > return d.readLine();
>>>> > }
>>>> >
>>>> > public List<String> getCacheFiles() {
>>>> > List<String> list = new ArrayList<String>(1);
>>>> > list.add("/etc/passwd");
>>>> > return list;
>>>> > }
>>>> > }
>>>> >
>>>> > --
>>>> > Russell Jurney twitter.com/rjurney [email protected]
>>>> > datasyndrome.com
>>>> >
>>>>
>>>
>>>
>>> --
>>> Russell Jurney twitter.com/rjurney [email protected] datasyndrome
>>> .com
>>>
>>
>>
>>
>> --
>> Russell Jurney twitter.com/rjurney [email protected] datasyndrome.
>> com
>>
>
>
>
> --
> Russell Jurney twitter.com/rjurney [email protected] datasyndrome.
> com
>
--
Russell Jurney twitter.com/rjurney [email protected] datasyndrome.com