doing cleanup in an iterator like that assumes the iterator always gets
fully read, which is not necessary the case (for example RDD.take does not).
instead i would use mapPartitionsWithContext, in which case you can write a
function of the form.
f: (TaskContext, Iterator[T]) = Iterator[U]
now you can register a cleanup with the task context, like this:
context.addTaskCompletionListener(new TaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = dosomething
)
and after that proceed with an iterator transformation as usual
On Thu, Jul 31, 2014 at 4:35 AM, Sean Owen so...@cloudera.com wrote:
On Thu, Jul 31, 2014 at 2:11 AM, Tobias Pfeiffer t...@preferred.jp wrote:
rdd.mapPartitions { partition =
// Some setup code here
val result = partition.map(yourfunction)
// Some cleanup code here
result
}
Yes, I realized that after I hit send. You definitely have to store
and return the result from the mapping!
rdd.mapPartitions { partition =
if (!partition.isEmpty) {
// Some setup code here
partition.map(item = {
val output = yourfunction(item)
if (!partition.hasNext) {
// Some cleanup code here
}
output
})
} else {
// return an empty Iterator of your return type
}
}
Great point, yeah. If you knew the number of values were small you
could collect them and process locally, but this is the right general
way to do it.