Hi,

Implementation of aggregation logic has been changed with 0.8.1
(Aggregator.scala)

It is now using AppendOnlyMap as compared to java.util.HashMap in 0.8.0
release.

Aggregator.scala
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K,
C)] = {
    val combiners = new AppendOnlyMap[K, C]
    var kv: Product2[K, V] = null
    val update = (hadValue: Boolean, oldValue: C) => {
      if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
    }
    while (iter.hasNext) {
      kv = iter.next()
      combiners.changeValue(kv._1, update)
    }
    combiners.iterator
  }

I am facing problem that in changeValue function of AppendOnlyMap, it
computes,
val curKey = data(2 * pos)
which is coming as null and eventually giving NPE.

AppendOnlyMap.scala
def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
    val k = key.asInstanceOf[AnyRef]
    if (k.eq(null)) {
      if (!haveNullValue) {
        incrementSize()
      }
      nullValue = updateFunc(haveNullValue, nullValue)
      haveNullValue = true
      return nullValue
    }
    var pos = rehash(k.hashCode) & mask
    var i = 1
    while (true) {
      val curKey = data(2 * pos)
      if (k.eq(curKey) || k.equals(curKey)) {
        val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        return newValue
      } else if (curKey.eq(null)) {
        val newValue = updateFunc(false, null.asInstanceOf[V])
        data(2 * pos) = k
        data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
        incrementSize()
        return newValue
      } else {
        val delta = i
        pos = (pos + delta) & mask
        i += 1
      }
    }
    null.asInstanceOf[V] // Never reached but needed to keep compiler happy
  }


Other info:
1. My code works fine with 0.8.0.
2. I used groupByKey transformation.
3. I replaces the Aggregator.scala with the older version(0.8.0), compiled
it, Restarted Master and Worker, It ran successfully.

Thanks and Regards,
Archit Thakur.

Reply via email to