---------- Forwarded message ----------
From: Jyoti Yadav <[email protected]>
Date: Fri, Jan 10, 2014 at 11:57 AM
Subject: Writing my own aggregator..
To: [email protected]
Hi Ameya..
I am badly stuck while implementing my custom aggregator..
In my program i want to send each vertex id to master.
For that i took an arraylist, in which each vertex is adding its own
id.while running the program,each vertex calls aggregate() function..As per
my observation it is working fine in vertex compute method.But while
retrieving back in master compute function.arraylist is not reflected back
to master compute function.
I am attaching two files below..You are requested to please check it once..
*1.MyArrayWritable.java*
package org.apache.giraph.examples.utils;
import java.io.*;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.WritableComparator;
import java.util.Arrays;
import java.util.*;
public class MyArrayWritable implements Writable {
private long item;
private ArrayList<Long> arraylist=new ArrayList<Long>(5);
public MyArrayWritable()
{
item=0;
//arraylist=new ArrayList<Long>(5);
arraylist.add(item);
}
public MyArrayWritable(long item1)
{
item=item1;
//arraylist=new ArrayList<Long>(5);
arraylist.add(item);
}
public ArrayList<Long> get_arraylist() { return arraylist; }
public void set_arraylist(ArrayList al)
{
//this.arraylist=new ArrayList<Long>(al);
this.arraylist=al;
}
public long get_item(){return item;}
@Override
public void readFields(DataInput in) throws IOException {
item=in.readLong();
int size=arraylist.size();
size=in.readInt();
arraylist=new ArrayList<Long>(5);
for(int i=0;i<size;i++)
{
arraylist.add(in.readLong());
}
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(item);
out.writeInt(arraylist.size());
for(int i=0;i<arraylist.size();i++)
{
out.writeLong(arraylist.get(i));
}
}
@Override
public String toString()
{
return "output is "+ Long.toString(item) + "\n";
}
}
2.MyArrayAggregator.java
package org.apache.giraph.examples.utils;
import org.apache.giraph.aggregators.BasicAggregator;
import java.util.*;
public class MyArrayAggregator extends BasicAggregator<MyArrayWritable> {
@Override
public void aggregate(MyArrayWritable value) {
ArrayList<Long> al=new ArrayList<Long>();
(getAggregatedValue().get_arraylist()).add(value.get_item());
al=getAggregatedValue().get_arraylist();
getAggregatedValue().set_arraylist(al);
}
@Override
public MyArrayWritable createInitialValue() {
return new MyArrayWritable();
}
}
Thanks in advance ...
Jyoti