Julien,

On further analysis , I found that it was not a delay at reduce time , but
a long running fetch map task , when I have multiple fetch map tasks
running on a single segment , I see  that one of the map tasks runs for a
excessively longer period of time than the other fetch map tasks ,it seems
this is happening because of the disproportionate distribution of urls per
map task, meaning if I have topN of 10,00,000 and 10 fetch map tasks , it
seems its not guaranteed that each fetch map tasks will have 100,000 urls
to fetch.

Is is possible to set the an upper limit on the max number of URLs per
fetch map task, along with the collective topN for the whole Fetch phase ?

Thanks,
Meraj.

On Sat, Oct 18, 2014 at 2:28 AM, Julien Nioche <
[email protected]> wrote:

> Hi Meraj,
>
> What do the logs for the map tasks tell you about the URLs being fetched?
>
> J.
>
> On 17 October 2014 19:08, Meraj A. Khan <[email protected]> wrote:
>
> > Julien,
> >
> > Thanks for your suggestion , I looked at the jstack thread dumps , and I
> > could see that the fetcher threads are in a waiting state and actually
> the
> > map phase is not yet complete looking at the JobClient console.
> >
> > 14/10/15 12:09:48 INFO mapreduce.Job:  map 95% reduce 31%
> > 14/10/16 07:11:20 INFO mapreduce.Job:  map 96% reduce 31%
> > 14/10/17 01:20:56 INFO mapreduce.Job:  map 97% reduce 31%
> >
> > And the following is the kind of statements I see in the jstack thread
> > dump  for Hadoop child processes, is it possible that these map tasks are
> > actually waiting on a particular host with some excessive crawl-delay , I
> > already had the fetcher.threads.per.queue to 5 , fetcher.server.delay to
> 0,
> > fetcher.max.crawl.delay to 10  and http.max.delays to 1000 .
> >
> > Please see the jstack  log info for the child processes below.
> >
> > Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.51-b03 mixed
> mode):
> >
> > "Attach Listener" daemon prio=10 tid=0x00007fecf8c58000 nid=0x32e5
> waiting
> > on condition [0x0000000000000000]
> >    java.lang.Thread.State: RUNNABLE
> >
> > "IPC Client (638223659) connection to /170.75.153.162:40980 from
> > job_1413149941617_0059" daemon prio=10 tid=0x0000000001a5c000 nid=0xce8
> in
> > Object.wait() [0x00007fecdf80e000]
> >    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> >     at java.lang.Object.wait(Native Method)
> >     - waiting on <0x0000000099f8bf48> (a
> > org.apache.hadoop.ipc.Client$Connection)
> >     at
> org.apache.hadoop.ipc.Client$Connection.waitForWork(Client.java:899)
> >     - locked <0x0000000099f8bf48> (a
> > org.apache.hadoop.ipc.Client$Connection)
> >     at org.apache.hadoop.ipc.Client$Connection.run(Client.java:944)
> >
> > "fetcher#5" daemon prio=10 tid=0x00007fecf8c49000 nid=0xce7 in
> > Object.wait() [0x00007fecdf90f000]
> >    java.lang.Thread.State: WAITING (on object monitor)
> >     at java.lang.Object.wait(Native Method)
> >     - waiting on <0x0000000099f62a68> (a
> > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl)
> >     at java.lang.Object.wait(Object.java:503)
> >     at
> >
> >
> org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl.getHost(ShuffleSchedulerImpl.java:368)
> >     - locked <0x0000000099f62a68> (a
> > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl)
> >     at
> > org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:161)
> >
> > "fetcher#4" daemon prio=10 tid=0x00007fecf8c47000 nid=0xce6 in
> > Object.wait() [0x00007fecdfa10000]
> >    java.lang.Thread.State: WAITING (on object monitor)
> >     at java.lang.Object.wait(Native Method)
> >     - waiting on <0x0000000099f62a68> (a
> > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl)
> >     at java.lang.Object.wait(Object.java:503)
> >     at
> >
> >
> org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl.getHost(ShuffleSchedulerImpl.java:368)
> >     - locked <0x0000000099f62a68> (a
> > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl)
> >     at
> > org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:161)
> >
> > "fetcher#3" daemon prio=10 tid=0x00007fecf8c45800 nid=0xce5 in
> > Object.wait() [0x00007fecdfb11000]
> >    java.lang.Thread.State: WAITING (on object monitor)
> >     at java.lang.Object.wait(Native Method)
> >     - waiting on <0x0000000099f62a68> (a
> > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl)
> >     at java.lang.Object.wait(Object.java:503)
> >     at
> >
> >
> org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl.getHost(ShuffleSchedulerImpl.java:368)
> >     - locked <0x0000000099f62a68> (a
> > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl)
> >     at
> > org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:161)
> >
> > "fetcher#2" daemon prio=10 tid=0x00007fecf8c31800 nid=0xce4 in
> > Object.wait() [0x00007fecdfc12000]
> >    java.lang.Thread.State: WAITING (on object monitor)
> >     at java.lang.Object.wait(Native Method)
> >     - waiting on <0x0000000099f62a68> (a
> > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl)
> >     at java.lang.Object.wait(Object.java:503)
> >     at
> >
> >
> org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl.getHost(ShuffleSchedulerImpl.java:368)
> >     - locked <0x0000000099f62a68> (a
> > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl)
> >     at
> > org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:161)
> >
> > "fetcher#1" daemon prio=10 tid=0x00007fecf8c2f800 nid=0xce3 in
> > Object.wait() [0x00007fecdfd13000]
> >    java.lang.Thread.State: WAITING (on object monitor)
> >     at java.lang.Object.wait(Native Method)
> >     - waiting on <0x0000000099f62a68> (a
> > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl)
> >     at java.lang.Object.wait(Object.java:503)
> >     at
> >
> >
> org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl.getHost(ShuffleSchedulerImpl.java:368)
> >     - locked <0x0000000099f62a68> (a
> > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl)
> >     at
> > org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:161)
> >
> > "EventFetcher for fetching Map Completion Events" daemon prio=10
> > tid=0x00007fecf8c2d800 nid=0xce2 waiting on condition
> [0x00007fecdfe14000]
> >    java.lang.Thread.State: TIMED_WAITING (sleeping)
> >     at java.lang.Thread.sleep(Native Method)
> >     at
> >
> >
> org.apache.hadoop.mapreduce.task.reduce.EventFetcher.run(EventFetcher.java:73)
> >
> > "OnDiskMerger - Thread to merge on-disk map-outputs" daemon prio=10
> > tid=0x00007fecf8c26800 nid=0xce1 in Object.wait() [0x00007fecdff15000]
> >    java.lang.Thread.State: WAITING (on object monitor)
> >     at java.lang.Object.wait(Native Method)
> >     - waiting on <0x0000000099f63918> (a java.util.LinkedList)
> >     at java.lang.Object.wait(Object.java:503)
> >     at
> >
> >
> org.apache.hadoop.mapreduce.task.reduce.MergeThread.run(MergeThread.java:87)
> >     - locked <0x0000000099f63918> (a java.util.LinkedList)
> >
> > "InMemoryMerger - Thread to merge in-memory shuffled map-outputs" daemon
> > prio=10 tid=0x00007fecf8c22800 nid=0xce0 in Object.wait()
> > [0x00007fece0016000]
> >    java.lang.Thread.State: WAITING (on object monitor)
> >     at java.lang.Object.wait(Native Method)
> >     - waiting on <0x0000000099f61f90> (a java.util.LinkedList)
> >     at java.lang.Object.wait(Object.java:503)
> >     at
> >
> >
> org.apache.hadoop.mapreduce.task.reduce.MergeThread.run(MergeThread.java:87)
> >     - locked <0x0000000099f61f90> (a java.util.LinkedList)
> >
> > "ShufflePenaltyReferee" daemon prio=10 tid=0x00007fecf8c1a000 nid=0xcdf
> > waiting on condition [0x00007fece0117000]
> >    java.lang.Thread.State: WAITING (parking)
> >     at sun.misc.Unsafe.park(Native Method)
> >     - parking to wait for  <0x0000000099f63100> (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> >     at
> >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> >     at java.util.concurrent.DelayQueue.take(DelayQueue.java:209)
> >     at
> >
> >
> org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl$Referee.run(ShuffleSchedulerImpl.java:488)
> >
> > "communication thread" daemon prio=10 tid=0x00007fecf8c0b000 nid=0xcd8 in
> > Object.wait() [0x00007fece022e000]
> >    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> >     at java.lang.Object.wait(Native Method)
> >     - waiting on <0x0000000099f5ef30> (a java.lang.Object)
> >     at org.apache.hadoop.mapred.Task$TaskReporter.run(Task.java:719)
> >     - locked <0x0000000099f5ef30> (a java.lang.Object)
> >     at java.lang.Thread.run(Thread.java:744)
> >
> > "Thread for syncLogs" daemon prio=10 tid=0x00007fecf8a93800 nid=0xcd2
> > waiting on condition [0x00007fece0430000]
> >    java.lang.Thread.State: TIMED_WAITING (parking)
> >     at sun.misc.Unsafe.park(Native Method)
> >     - parking to wait for  <0x0000000099b9c710> (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >     at
> > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> >     at
> >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> >     at
> >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
> >     at
> >
> >
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
> >     at
> >
> >
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
> >     at
> >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
> >     at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >     at java.lang.Thread.run(Thread.java:744)
> >
> > "IPC Parameter Sending Thread #0" daemon prio=10 tid=0x00007fecf8a9f000
> > nid=0xcc7 waiting on condition [0x00007fece032f000]
> >    java.lang.Thread.State: TIMED_WAITING (parking)
> >     at sun.misc.Unsafe.park(Native Method)
> >     - parking to wait for  <0x0000000099b526c0> (a
> > java.util.concurrent.SynchronousQueue$TransferStack)
> >     at
> > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> >     at
> >
> >
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
> >     at
> >
> >
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:359)
> >     at
> > java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:942)
> >     at
> >
> >
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
> >     at
> >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
> >     at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >     at java.lang.Thread.run(Thread.java:744)
> >
> > "Timer for 'ReduceTask' metrics system" daemon prio=10
> > tid=0x00007fecf8958000 nid=0xcc1 in Object.wait() [0x00007fece0b98000]
> >    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> >     at java.lang.Object.wait(Native Method)
> >     - waiting on <0x0000000099b9cce0> (a java.util.TaskQueue)
> >     at java.util.TimerThread.mainLoop(Timer.java:552)
> >     - locked <0x0000000099b9cce0> (a java.util.TaskQueue)
> >     at java.util.TimerThread.run(Timer.java:505)
> >
> > "Service Thread" daemon prio=10 tid=0x00007fecf80a4800 nid=0xc6b runnable
> > [0x0000000000000000]
> >    java.lang.Thread.State: RUNNABLE
> >
> > "C2 CompilerThread1" daemon prio=10 tid=0x00007fecf80a2800 nid=0xc6a
> > waiting on condition [0x0000000000000000]
> >    java.lang.Thread.State: RUNNABLE
> >
> > "C2 CompilerThread0" daemon prio=10 tid=0x00007fecf809f800 nid=0xc69
> > waiting on condition [0x0000000000000000]
> >    java.lang.Thread.State: RUNNABLE
> >
> > "Signal Dispatcher" daemon prio=10 tid=0x00007fecf8095000 nid=0xc68
> > runnable [0x0000000000000000]
> >    java.lang.Thread.State: RUNNABLE
> >
> > "Finalizer" daemon prio=10 tid=0x00007fecf807e000 nid=0xc60 in
> > Object.wait() [0x00007fecec83c000]
> >    java.lang.Thread.State: WAITING (on object monitor)
> >     at java.lang.Object.wait(Native Method)
> >     - waiting on <0x0000000099a1e040> (a
> java.lang.ref.ReferenceQueue$Lock)
> >     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135)
> >     - locked <0x0000000099a1e040> (a java.lang.ref.ReferenceQueue$Lock)
> >     at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:151)
> >     at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:189)
> >
> > "Reference Handler" daemon prio=10 tid=0x00007fecf807a000 nid=0xc5f in
> > Object.wait() [0x00007fecec93d000]
> >    java.lang.Thread.State: WAITING (on object monitor)
> >     at java.lang.Object.wait(Native Method)
> >     - waiting on <0x0000000099aeb3e0> (a java.lang.ref.Reference$Lock)
> >     at java.lang.Object.wait(Object.java:503)
> >     at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:133)
> >     - locked <0x0000000099aeb3e0> (a java.lang.ref.Reference$Lock)
> >
> > "main" prio=10 tid=0x00007fecf800f800 nid=0xc4f in Object.wait()
> > [0x00007fed00948000]
> >    java.lang.Thread.State: TIMED_WAITING (on object monitor)
> >     at java.lang.Object.wait(Native Method)
> >     - waiting on <0x0000000099f62a68> (a
> > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl)
> >     at
> >
> >
> org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl.waitUntilDone(ShuffleSchedulerImpl.java:443)
> >     - locked <0x0000000099f62a68> (a
> > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl)
> >     at
> > org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:129)
> >     at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:376)
> >     at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
> >     at java.security.AccessController.doPrivileged(Native Method)
> >     at javax.security.auth.Subject.doAs(Subject.java:415)
> >     at
> >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
> >     at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
> >
> > "VM Thread" prio=10 tid=0x00007fecf8077800 nid=0xc5e runnable
> >
> > "GC task thread#0 (ParallelGC)" prio=10 tid=0x00007fecf8025800 nid=0xc54
> > runnable
> >
> > "GC task thread#1 (ParallelGC)" prio=10 tid=0x00007fecf8027000 nid=0xc55
> > runnable
> >
> > "GC task thread#2 (ParallelGC)" prio=10 tid=0x00007fecf8029000 nid=0xc56
> > runnable
> >
> > "GC task thread#3 (ParallelGC)" prio=10 tid=0x00007fecf802b000 nid=0xc57
> > runnable
> >
> > "GC task thread#4 (ParallelGC)" prio=10 tid=0x00007fecf802c800 nid=0xc58
> > runnable
> >
> > "GC task thread#5 (ParallelGC)" prio=10 tid=0x00007fecf802e800 nid=0xc59
> > runnable
> >
> > "GC task thread#6 (ParallelGC)" prio=10 tid=0x00007fecf8030800 nid=0xc5a
> > runnable
> >
> > "GC task thread#7 (ParallelGC)" prio=10 tid=0x00007fecf8032800 nid=0xc5b
> > runnable
> >
> > "VM Periodic Task Thread" prio=10 tid=0x00007fecf80af800 nid=0xc6c
> waiting
> > on condition
> >
> > JNI global references: 255
> >
> >
> >
> > On Thu, Oct 16, 2014 at 5:20 AM, Julien Nioche <
> > [email protected]> wrote:
> >
> > > Hi Meraj
> > >
> > > You could call jstack on the Java process a couple of times to see what
> > it
> > > is busy doing, that will be a simple of way of checking that this is
> > indeed
> > > the source of the problem.
> > > See https://issues.apache.org/jira/browse/NUTCH-1314 for a possible
> > > solution
> > >
> > > J.
> > >
> > > On 16 October 2014 06:08, Meraj A. Khan <[email protected]> wrote:
> > >
> > > > Hi All,
> > > >
> > > > I am running into a situation where the reduce phase of the fetch job
> > > with
> > > > parsing enabled at the time of fetch is taking excessively long
> amount
> > of
> > > > time , I have seen recommendations to filter the URLs based on length
> > to
> > > > avoid normalization related delays ,I am not filtering any URLs based
> > on
> > > > length , could that be an issue ?
> > > >
> > > > Can anyone share if they faced this issue and what the resolution
> was,
> > I
> > > am
> > > > running Nutch 1.7 on Hadoop YARN.
> > > >
> > > > The issue was previously inconclusively discussed here.
> > > >
> > > >
> > > >
> > >
> >
> http://markmail.org/message/p6dzvvycpfzbaugr#query:+page:1+mid:p6dzvvycpfzbaugr+state:results
> > > >
> > > > Thanks.
> > > >
> > >
> > >
> > >
> > > --
> > >
> > > Open Source Solutions for Text Engineering
> > >
> > > http://digitalpebble.blogspot.com/
> > > http://www.digitalpebble.com
> > > http://twitter.com/digitalpebble
> > >
> >
>
>
>
> --
>
> Open Source Solutions for Text Engineering
>
> http://digitalpebble.blogspot.com/
> http://www.digitalpebble.com
> http://twitter.com/digitalpebble
>

Reply via email to