Ah, ok, that's interesting then. The train() method just ends up calling
observe() on one or more of the cluster models and this just adds values
to their counter vectors (usually s0, s1 and s2 in AbstractCluster), so
if you can synchronize this activity you should not affect the
correctness of the algorithms. I will be interested in what you
discover. You can learn more about the way delegation is used between
the various clustering policies, clusters, ClusterClassifier, etc. by
single-stepping through a run of the ClusterIterator.iterate() method.
It employs all of the same delegation strategies and will speed your
learning. There is a unit test that does exactly that.
Jeff
On 12/21/12 7:50 PM, yunming zhang wrote:
Thanks a lot Jeff!
I think you are right, the train() method is probably going to be my focus
for now. Initially I wasn't sure if the changes to make it thread safe
would affect the correctness of the algorithm as I am not a machine
learning expert.
Actually, I am not trying to get parallelism out of the body of the map()
function, which as you described is not beneficial since you are processing
only one input vector. However, there is a run() method in the Hadoop
mapper that you can override to customize your processing of all of the
input vectors to the mapper. The default implementation is processing one
input vector at a time, but you can customized it to process multiple input
vector simultaneously. Thus exploiting more parallelism within a mapper,
Thanks again everyone!
Yunming
On Sat, Dec 22, 2012 at 12:48 AM, Jeff Eastman
<[email protected]>wrote:
Hi Yunming,
The problem I see with what you are proposing is that Hadoop only gives
you a single input vector per call of CIMapper.map(). Using multiple
threads to perform the body of that method would not be of benefit. If you
want to experiment with thread-based concurrent execution of that
clustering implementation, I'd suggest you look at the serial,
all-in-memory implementation (ClusterIterator.iterate()) first. I think you
might find that the classifier.classify() and policy.select() calls are
read-only and the classifier.train() method would be the one to
synchronize. This is just an educated guess, however. You are really on
your own in this endeavor.
But good luck if you decide to experiment,
Jeff
On 12/21/12 10:00 AM, Yunming Zhang wrote:
Hi,
I am trying to compare performance between using parallelism by using
more mappers (the way you suggested with reducing the max input split size)
and using possible parallelism within the Mapper, there can be advantage to
using fewer number of mappers,
Does anyone have any idea on where to start to make the CIMapper thread
safe ? Do I have to make changes to every application or I could just
change some implementation in the general classes used by all applications?
It would be really helpful if someone could point me to the right direction,
Thanks
Yunming
On Dec 20, 2012, at 10:54 PM, Marty Kube <martykube@**
beavercreekconsulting.com <[email protected]>> wrote:
Writing thread safe code is hard. Don't do it unless you have too.
On Dec 20, 2012, at 4:28 AM, Sean Owen <[email protected]> wrote:
... but making the implementation thread-safe won't make it be used by
multiple threads. If you want more parallelism, suggest to Hadoop to
use more mappers by reducing the max input split size. But this is
still not going to require your mappers to be thread-safe.
if you mean you are making your own parallelism in miniature by
writing a multi-threaded mapper, I wouldn't bother. Just use more
parallelism via Hadoop.
On Thu, Dec 20, 2012 at 3:31 AM, Yunming Zhang
<[email protected]> wrote:
Thanks Marty, Sean,
yeah, I took a look at the source code yesterday and realized that it
is not thread safe as well,
I am working on a high performance mapper that require making the
mapper thread safe so I could exploit the data parallelism that comes with
processing multiple input <key, val> pairs to a single mapper,
I am currently researching into if there is any easy way that I could
make the CIMapper implementation thread safe by may be making a few key
data structures that are thread safe, like the OpenIntDoubleHashMap, and
hopefully this won't screw up the correctness of the algorithm itself,
Yunming
On Dec 20, 2012, at 9:07 AM, Marty Kube <martykube@**
beavercreekconsulting.com <[email protected]>>
wrote:
Sean is right, most MR code is not and does not need to be thread
safe.
Why are you writing a multi-threaded mapper?
On 12/19/2012 07:50 PM, Sean Owen wrote:
Hadoop will only use one thread with one Mapper or Reducer instance.
Unless
you are somehow spawning threads on your own concurrency should not
be an
issue. I don't known if this behavior is guaranteed but seems to be
how it
always works.
On Dec 19, 2012 4:03 PM, "Yunming Zhang" <[email protected]>
wrote:
Hi ,
I am developing a custom mapper that is somewhat similar to the
multithreaded mapper that came with Hadoop, and I am getting weird
errors
when running using multiple threads processing multiple input key,
value
pairs simultaneously, here is the stack trace, I looked into
OpenIntDoubleHashMap, and it seems to be stemmed from null values
stored in
the tables,
attempt_201212190955_0004_m_**000000_0:
java.lang.**ArrayIndexOutOfBoundsException**: 24
attempt_201212190955_0004_m_**000000_0: at
org.apache.mahout.math.map.**OpenIntDoubleHashMap.**indexOfKey(**
OpenIntDoubleHashMap.java:278)
attempt_201212190955_0004_m_**000000_0: at
org.apache.mahout.math.map.**OpenIntDoubleHashMap.get(**
OpenIntDoubleHashMap.java:198)
attempt_201212190955_0004_m_**000000_0: at
org.apache.mahout.math.**RandomAccessSparseVector.**getQuick(**
RandomAccessSparseVector.java:**130)
attempt_201212190955_0004_m_**000000_0: at
org.apache.mahout.math.**AbstractVector.assign(**
AbstractVector.java:738)
attempt_201212190955_0004_m_**000000_0: at
org.apache.mahout.clustering.**AbstractCluster.observe(**
AbstractCluster.java:263)
attempt_201212190955_0004_m_**000000_0: at
org.apache.mahout.clustering.**AbstractCluster.observe(**
AbstractCluster.java:234)
attempt_201212190955_0004_m_**000000_0: at
org.apache.mahout.clustering.**AbstractCluster.observe(**
AbstractCluster.java:229)
attempt_201212190955_0004_m_**000000_0: at
org.apache.mahout.clustering.**AbstractCluster.observe(**
AbstractCluster.java:37)
attempt_201212190955_0004_m_**000000_0: at
org.apache.mahout.clustering.**classify.ClusterClassifier.**
train(ClusterClassifier.java:**158)
attempt_201212190955_0004_m_**000000_0: at
org.apache.mahout.clustering.**iterator.CIMapper.map(**
CIMapper.java:46)
attempt_201212190955_0004_m_**000000_0: at
org.apache.mahout.clustering.**iterator.CIMapper.map(**
CIMapper.java:18)
Not sure if anyone knows if it is inherently thread safe to process
multiple input key, val pair to the mapper simultaneously ?
Thanks
Yunming