Github user nfergu commented on the pull request:
https://github.com/apache/spark/pull/2438#issuecomment-62641279
Hi @pwendell
I can't think of any obvious way to implement this in user-space without
wrapping each RDD operation. In fact, this is the approach that I'm taking at
the moment. I'm wrapping each RDD operation in something like the following:
```scala
def myMap[U: ClassTag](f: T => U): RDD[U] = {
val registry = Metrics.Accumulable.value
self.map((t: T) => {
// Get the value that we obtained from thread-local storage in the
driver, and put
// it back into thread local storage while executing this function
Metrics.Accumulable.withValue(registry) {
f(t)
}
})
}
```
Where `Metrics.Accumulable` is a DynamicVariable (ThreadLocal).
The main problem with this is, of course, that it involves wrapping every
RDD operation. Can you think of a nicer way to implement this in user-space?
There are a couple of other minor problems with this approach: 1 it
messes-up the callsite, but that would be easily solved by a small modification
to `Utils.getCallSite`; 2 it doesn't work for operations like
`saveAsNewAPIHadoopFile` where there's no function that gets passed.
One option, if we want to go ahead with this patch, would be to add a flag
to Accumulables which determines whether they get broadcast. Having said that,
my need for this patch is not as critical as it was, now that I'm taking the
approach mentioned of wrapping the RDD operation.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]