Hi,
this is how I run the PageRank implementation (mine takes into account
dangling nodes and checks for convergence):
Map<String,String> params = new HashMap<String,String>();
params.put(GiraphJob.WORKER_CONTEXT_CLASS,
"org.apache.jena.grande.giraph.pagerank.PageRankVertexWorkerContext");
params.put(GiraphJob.MASTER_COMPUTE_CLASS,
"org.apache.jena.grande.giraph.pagerank.PageRankVertexMasterCompute");
String[] data = getData ( filename );
Iterable<String> results = InternalVertexRunner.run(
PageRankVertex.class,
PageRankVertexInputFormat.class,
PageRankVertexOutputFormat.class,
params,
data
);
This used to work, however I was registering aggregators in
PageRankVertexWorkerContext (see below).
Now, I am trying to do the same in PageRankVertexMasterCompute which
extends DefaultMasterCompute and has only one method:
@Override
public void initialize() throws InstantiationException, IllegalAccessException {
log.debug("initialize");
registerPersistentAggregator("dangling-current", DoubleSumAggregator.class);
registerPersistentAggregator("error-current", DoubleSumAggregator.class);
registerPersistentAggregator("pagerank-sum", DoubleSumAggregator.class);
registerPersistentAggregator("vertices-count", LongSumAggregator.class);
}
I am not 100% sure about registerAggregator vs. registerPersistentAggregator.
The initialize() method is now being called, I see this on the console:
09:34:46 DEBUG PageRankVertexMasterCompute :: initialize
In PageRankVertexWorkerContext which extends WorkerContex I override
the preApplication() method:
@SuppressWarnings("unchecked")
@Override
public void preApplication() throws InstantiationException,
IllegalAccessException {
log.debug("preApplication()");
System.out.println(((Aggregator<DoubleWritable>)getAggregatedValue("error-current")));
((Aggregator<DoubleWritable>)getAggregatedValue("error-current")).setAggregatedValue(
new DoubleWritable( Double.MAX_VALUE ) );
}
The getAggregatedValue("error-current") above is null and I do not
understand why.
Just to make things even more clear, this is how I used to run the
PageRank implementation locally:
https://github.com/castagna/jena-grande/blob/2fa8a1b879a464d8e3db84e78edd539c70274e7c/src/main/java/org/apache/jena/grande/giraph/pagerank/RunPageRankVertexLocally.java
And this is the WorkerContext I used to have:
https://github.com/castagna/jena-grande/blob/2fa8a1b879a464d8e3db84e78edd539c70274e7c/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankVertexWorkerContext.java
As you can see, it used to call registerAggregator(...) in the
preApplication() method:
@SuppressWarnings("unchecked")
@Override
public void preApplication() throws InstantiationException,
IllegalAccessException {
log.debug("preApplication()");
registerAggregator("dangling-current", SumAggregator.class);
registerAggregator("error-current", SumAggregator.class);
registerAggregator("pagerank-sum", SumAggregator.class);
registerAggregator("vertices-count", LongSumAggregator.class);
((Aggregator<DoubleWritable>)getAggregator("error-current")).setAggregatedValue(
new DoubleWritable( Double.MAX_VALUE ) );
}
The registerAggregator() method in WorkerContext is gone and I am
trying to achieve the same via MasterCompute now.
Regards,
Paolo
On 11 September 2012 00:20, Paolo Castagna <[email protected]> wrote:
> Hi Gianmarco,
> good, that was one problem... but I am not yet back to the green bar.
>
> Here is how I am running it locally now:
>
> Map<String,String> params = new HashMap<String,String>();
> params.put(GiraphJob.WORKER_CONTEXT_CLASS,
> "org.apache.jena.grande.giraph.pagerank.PageRankVertexWorkerContext");
> params.put(GiraphJob.MASTER_COMPUTE_CLASS,
> "org.apache.jena.grande.giraph.pagerank.SimplePageRankVertexMasterCompute");
>
> String[] data = getData ( filename );
> Iterable<String> results = InternalVertexRunner.run(
> PageRankVertex.class,
> PageRankVertexInputFormat.class,
> PageRankVertexOutputFormat.class,
> params,
> data
> );
>
> However, I need to learn more about the MasterComput (and its relation
> with WorkerContext).
>
> Paolo
>
> On 10 September 2012 22:08, Gianmarco De Francisci Morales
> <[email protected]> wrote:
>> Hi Paolo,
>>
>> Are you setting the MasterCompute class?
>> You can do it with this option of bin/giraph
>> -mc,--masterCompute <arg> MasterCompute class
>>
>> Cheers,
>> --
>> Gianmarco
>>
>>
>>
>> On Mon, Sep 10, 2012 at 9:36 PM, Paolo Castagna <[email protected]>
>> wrote:
>>>
>>> Hi,
>>> first and foremost, thanks for all the work and improvements on Giraph.
>>> I went away from computers for a while (personal reasons) and changed
>>> job, now I am back and playing with Giraph when I can.
>>>
>>> I updated my little examples (overall, it was easy and quick, here the
>>> changes [1]. Just in case others are in a similar situation and want
>>> to have a look).
>>>
>>> I am not sure I get the 'new' aggregators and in particular how I can
>>> 'register' them. My tests failing confirm my non understanding! And
>>> forgive me if I come here and ask such a simple question.
>>>
>>> Here is what I used to do [2]:
>>>
>>> public class PageRankVertexWorkerContext extends WorkerContext {
>>>
>>> private static final Logger log =
>>> LoggerFactory.getLogger(PageRankVertexWorkerContext.class);
>>>
>>> public static double errorPrevious = Double.MAX_VALUE;
>>> public static double danglingPrevious = 0d;
>>>
>>> @SuppressWarnings("unchecked")
>>> @Override
>>> public void preApplication() throws InstantiationException,
>>> IllegalAccessException {
>>> log.debug("preApplication()");
>>> registerAggregator("dangling-current", SumAggregator.class);
>>> registerAggregator("error-current", SumAggregator.class);
>>> registerAggregator("pagerank-sum", SumAggregator.class);
>>> registerAggregator("vertices-count", LongSumAggregator.class);
>>>
>>>
>>> ((Aggregator<DoubleWritable>)getAggregator("error-current")).setAggregatedValue(
>>> new DoubleWritable( Double.MAX_VALUE ) );
>>> }
>>>
>>> [...]
>>>
>>>
>>> Here is what I am trying to do now [3]:
>>>
>>> public class PageRankVertexWorkerContext extends WorkerContext {
>>>
>>> private static final Logger log =
>>> LoggerFactory.getLogger(PageRankVertexWorkerContext.class);
>>>
>>> public static double errorPrevious = Double.MAX_VALUE;
>>> public static double danglingPrevious = 0d;
>>>
>>> // TODO: double check this... how is calling initialize()?
>>> public static class SimplePageRankVertexMasterCompute extends
>>> DefaultMasterCompute {
>>> @Override
>>> public void initialize() throws InstantiationException,
>>> IllegalAccessException {
>>> registerAggregator("dangling-current", DoubleSumAggregator.class);
>>> registerAggregator("error-current", DoubleSumAggregator.class);
>>> registerAggregator("pagerank-sum", DoubleSumAggregator.class);
>>> registerAggregator("vertices-count", LongSumAggregator.class);
>>> }
>>> }
>>>
>>> [...]
>>>
>>>
>>> I am not convinced someone is actually calling the initialize() method
>>> and there must be something I am missing (yesterday was late, after a
>>> long day at work).
>>>
>>> Anyway, is there a place/example where I can learn how to use
>>> Aggregators with the new Giraph?
>>>
>>> Thanks again and it's good to see Giraph mailing list and JIRA 'brewing'
>>> ;-)
>>>
>>> Paolo
>>>
>>>
>>> [1]
>>> https://github.com/castagna/jena-grande/commit/3edc0a7780f5e7c25d37956c158d878b590858b5#src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankVertexWorkerContext.java
>>> [2]
>>> https://github.com/castagna/jena-grande/blob/2fa8a1b879a464d8e3db84e78edd539c70274e7c/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankVertexWorkerContext.java
>>> [3]
>>> https://github.com/castagna/jena-grande/blob/3edc0a7780f5e7c25d37956c158d878b590858b5/src/main/java/org/apache/jena/grande/giraph/pagerank/PageRankVertexWorkerContext.java
>>
>>