Thanks Aaron,
I'll see if I can get started over the weekend. I take your point about
forcing use of a List.
Our solution didn't require anything particular fancy to solve the problem,
and as it implemented a version of List it too maintains the deterministic
ordering:
public class MendeleyReduceDriver<KeyIn, ValueIn extends Writable, KeyOut,
ValueOut> extends ReduceDriver<KeyIn, ValueIn, KeyOut, ValueOut> {
/**
* A version of {...@link ArrayList} that behaves like the {...@link
Iterable}
in a Hadoop reducer, i.e. the iterator always returns the same {...@link
Writable}, but
* the Writable's contents change to reflect the current value of the
iterator. This could probably be achieved more efficiently but will do for
now.
* @author james
*
* @param <T>
*/
private static class HadoopStyleIterable<T extends Writable> extends
ArrayList<T> {
private static final long serialVersionUID = 1L;
private T currentValueHolder;
private List<T> list;
@SuppressWarnings("unchecked")
public HadoopStyleIterable(List<T> list) {
super(list);
this.list = list;
if (!list.isEmpty()) {
T first = list.get(0);
Class<T> klass = (Class<T>) first.getClass();
this.currentValueHolder = ReflectionUtils.newInstance(klass,
new Configuration());
}
}
@Override
public Iterator<T> iterator() {
return new HadoopStyleIterator<T>(list.iterator(),
currentValueHolder);
}
private static class HadoopStyleIterator<T extends Writable>
implements Iterator<T> {
private T currentValue;
private Iterator<T> iterator;
public HadoopStyleIterator(
Iterator<T> normalIterator, T instance) {
iterator = normalIterator;
currentValue = instance;
}
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public T next() {
// This method always returns the same object, modifying the
object to hold the contents of next.
// Throws a NullPointerException if iterator.next() returns
null.
// Throws an IOError if serialisation and deserialisation of
iterator.next() goes wrong.
ByteArrayOutputStream baos = new ByteArrayOutputStream();
T next = iterator.next();
try {
next.write(new DataOutputStream(baos));
currentValue.readFields(new DataInputStream(new
ByteArrayInputStream(baos.toByteArray())));
return currentValue;
} catch (IOException e) {
throw new IOError(e);
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
}
public MendeleyReduceDriver(Reducer<KeyIn, ValueIn, KeyOut, ValueOut>
reducer) {
super(reducer);
}
@Override
public List<Pair<KeyOut, ValueOut>> run() throws IOException {
HadoopStyleIterable<ValueIn> hsi =
new HadoopStyleIterable<ValueIn>(inputValues);
inputValues = hsi;
return super.run();
}
Note that the Iterable and Iterator were hidden from the outside world.
I'll have a look at the MapOutputCollector to see if there are better ways
of doing the deserialisation.
Regards,
James
On Fri, Dec 10, 2010 at 1:25 AM, Aaron Kimball <[email protected]> wrote:
> Hi James,
>
> The ReduceDriver is configured to receive a list of inputs because
> lists have ordering guarantees whereas other Iterables/Collections do
> not; for determinism's sake, it is best to guarantee that you're
> calling reduce() with an ordered set of values when testing.
>
> It would be stellar if you could improve the ReduceDriver to reuse a
> writable instance between calls. You'll need to infer the appropriate
> container class type from the first instance you see in the reducer's
> output, and use the serialization API to make a copy. If you look at
> o.a.h.mrunit.mock.MockOutputCollector, this will show a pattern you
> can work on.
>
> Cheers,
> - Aaron
>
> On Thu, Dec 9, 2010 at 2:21 AM, James Hammerton
> <[email protected]> wrote:
> > Hi,
> >
> > This relates to a bug we had a while back.
> >
> > When running a reducer, if you want to buffer the values, you normally
> need
> > to take a copy of each value as you iterate through them. This is because
> > the iterator always returns the same object but the contents of the
> object
> > get filled with each value as the iterator steps through.
> >
> > However this behaviour is not reproduced by the reducer drivers in MR
> unit.
> > Even if you give the reduce driver a List (why do we have to give a List
> > when reducer specifies merely an Iterable?) designed to behave this way,
> MR
> > unit copies the values into a normal List before presenting them to the
> > reducer. At least this is the case with the 0.20.1 install we have.
> >
> > Anyway, in order to test our bug fix we extended the ReduceDriver class
> to
> > actually copy the values into an iterable that does reproduce the
> behaviour
> > so that we can test for bugs caused by failing to copy the values. In
> more
> > recent versions of Hadoop (we use 0.20.1) is the behaviour of the reduce
> > drivers altered to match that of actual running reducers in this respect?
> > Are there any plans to do this? Alternatively, I'd be willing to fix this
> in
> > the Hadoop codebase myself if necessary.
> >
> > Regards,
> >
> > James
> >
> > --
> > James Hammerton | Senior Data Mining Engineer
> > www.mendeley.com/profiles/james-hammerton
> >
> > Mendeley Limited | London, UK | www.mendeley.com
> > Registered in England and Wales | Company Number 6419015
> >
> >
> >
> >
>
--
James Hammerton | Senior Data Mining Engineer
www.mendeley.com/profiles/james-hammerton
Mendeley Limited | London, UK | www.mendeley.com
Registered in England and Wales | Company Number 6419015