Hi,
Implementation of aggregation logic has been changed with 0.8.1
(Aggregator.scala)
It is now using AppendOnlyMap as compared to java.util.HashMap in 0.8.0
release.
Aggregator.scala
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K,
C)] = {
val combiners = new AppendOnlyMap[K, C]
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (iter.hasNext) {
kv = iter.next()
combiners.changeValue(kv._1, update)
}
combiners.iterator
}
I am facing problem that in changeValue function of AppendOnlyMap, it
computes,
val curKey = data(2 * pos)
which is coming as null and eventually giving NPE.
AppendOnlyMap.scala
def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
if (!haveNullValue) {
incrementSize()
}
nullValue = updateFunc(haveNullValue, nullValue)
haveNullValue = true
return nullValue
}
var pos = rehash(k.hashCode) & mask
var i = 1
while (true) {
val curKey = data(2 * pos)
if (k.eq(curKey) || k.equals(curKey)) {
val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
return newValue
} else if (curKey.eq(null)) {
val newValue = updateFunc(false, null.asInstanceOf[V])
data(2 * pos) = k
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
incrementSize()
return newValue
} else {
val delta = i
pos = (pos + delta) & mask
i += 1
}
}
null.asInstanceOf[V] // Never reached but needed to keep compiler happy
}
Other info:
1. My code works fine with 0.8.0.
2. I used groupByKey transformation.
3. I replaces the Aggregator.scala with the older version(0.8.0), compiled
it, Restarted Master and Worker, It ran successfully.
Thanks and Regards,
Archit Thakur.