The Mahout wrapper class is as follows:
@riffle.process.Process
public class VectorSequenceFileGenerator {
private Path inputFilePath;
private Path outputFilePath;
private Configuration configuration;
public VectorSequenceFileGenerator(String inputFileName, String
outputFileName, Configuration configuration) {
this(new Path(inputFileName), new Path(outputFileName), configuration);
}
public VectorSequenceFileGenerator(Path inputFilePath, Path
outputFilePath, Configuration configuration) {
this.inputFilePath = inputFilePath;
this.outputFilePath = outputFilePath;
this.configuration = configuration;
}
@DependencyOutgoing
public Path getOutgoing() {
return outputFilePath;
}
@DependencyIncoming
public Path getIncoming() {
return inputFilePath;
}
@ProcessStart
public void start() {
try {
DictionaryVectorizer.createTermFrequencyVectors(
inputFilePath,
outputFilePath,
configuration,
1,
2,
0.5f,
100,
true,
0,
4000,
true,
true);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@ProcessComplete
public void complete() {
start();
}
Meanwhile, this is integrated into a Cascade in Groovy as follows:
def vectorizerFlow = new ProcessFlow("mahoutVectorizer",
binding.variables.get("vectorSequenceFileGenerator"))
def connector = new CascadeConnector()
def cascade = connector.connect( firstFlow, vectorizerFlow )
cascade.complete();
The stack trace goes like this:
java.lang.RuntimeException: java.lang.NullPointerException
at
com.myapp.VectorSequenceFileGenerator.start(VectorSequenceFileGenerator.jav
a:66)
at
com.myapp.VectorSequenceFileGenerator.complete(VectorSequenceFileGenerator.
java:72)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:3
9)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImp
l.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
riffle.process.scheduler.ProcessWrapper.invokeMethod(ProcessWrapper.java:17
8)
at
riffle.process.scheduler.ProcessWrapper.findInvoke(ProcessWrapper.java:166)
at
riffle.process.scheduler.ProcessWrapper.complete(ProcessWrapper.java:147)
at cascading.flow.ProcessFlow.complete(ProcessFlow.java:172)
at cascading.cascade.Cascade$CascadeJob.call(Cascade.java:705)
at cascading.cascade.Cascade$CascadeJob.call(Cascade.java:653)
Thanks.
On 12/19/11 3:10 PM, "Ken Krugler" <[email protected]> wrote:
>I haven't fooled around with Riffle, but I have done some extraction in
>the past of Mahout components to use with Cascading.
>
>And I'm interested in using Cascading 2.0 (APL vs. GPLv3 license) with
>Mahout, so if you can share more details I'd be happy to take a look.
>
>Regards,
>
>-- Ken
>
>On Dec 19, 2011, at 12:01pm, Neil Chaudhuri wrote:
>
>> Does anyone have any code to share about how to use Riffle (and
>>Cascading) with Mahout? I have a class wrapping a Mahout operation, but
>>I am getting a NullPointerException when I add this class to my Cascade.
>>I think the key line is this:
>>
>> 11/12/19 14:50:14 INFO flow.Flow: [mahoutVectorizer] atleast one sink
>>does not exist
>>
>> This is despite having a method annotated as follows:
>>
>> @DependencyOutgoing
>> public Path getOutgoing() {
>> return outputFilePath;
>> }
>>
>> Any insight is appreciated.
>>
>> Thanks.
>>
>
>--------------------------
>Ken Krugler
>http://www.scaleunlimited.com
>custom big data solutions & training
>Hadoop, Cascading, Mahout & Solr
>
>
>
>