Hi Meraj You can control the # of URLs per segment with
<property> <name>generate.max.count</name> <value>-1</value> <description>The maximum number of urls in a single fetchlist. -1 if unlimited. The urls are counted according to the value of the parameter generator.count.mode. </description> </property> <property> <name>generate.count.mode</name> <value>host</value> <description>Determines how the URLs are counted for generator.max.count. Default value is 'host' but can be 'domain'. Note that we do not count per IP in the new version of the Generator. </description> </property> the urls are grouped into inputs for the map tasks accordingly. Julien On 26 October 2014 19:08, Meraj A. Khan <[email protected]> wrote: > Julien, > > On further analysis , I found that it was not a delay at reduce time , but > a long running fetch map task , when I have multiple fetch map tasks > running on a single segment , I see that one of the map tasks runs for a > excessively longer period of time than the other fetch map tasks ,it seems > this is happening because of the disproportionate distribution of urls per > map task, meaning if I have topN of 10,00,000 and 10 fetch map tasks , it > seems its not guaranteed that each fetch map tasks will have 100,000 urls > to fetch. > > Is is possible to set the an upper limit on the max number of URLs per > fetch map task, along with the collective topN for the whole Fetch phase ? > > Thanks, > Meraj. > > On Sat, Oct 18, 2014 at 2:28 AM, Julien Nioche < > [email protected]> wrote: > > > Hi Meraj, > > > > What do the logs for the map tasks tell you about the URLs being fetched? > > > > J. > > > > On 17 October 2014 19:08, Meraj A. Khan <[email protected]> wrote: > > > > > Julien, > > > > > > Thanks for your suggestion , I looked at the jstack thread dumps , and > I > > > could see that the fetcher threads are in a waiting state and actually > > the > > > map phase is not yet complete looking at the JobClient console. > > > > > > 14/10/15 12:09:48 INFO mapreduce.Job: map 95% reduce 31% > > > 14/10/16 07:11:20 INFO mapreduce.Job: map 96% reduce 31% > > > 14/10/17 01:20:56 INFO mapreduce.Job: map 97% reduce 31% > > > > > > And the following is the kind of statements I see in the jstack thread > > > dump for Hadoop child processes, is it possible that these map tasks > are > > > actually waiting on a particular host with some excessive crawl-delay > , I > > > already had the fetcher.threads.per.queue to 5 , fetcher.server.delay > to > > 0, > > > fetcher.max.crawl.delay to 10 and http.max.delays to 1000 . > > > > > > Please see the jstack log info for the child processes below. > > > > > > Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.51-b03 mixed > > mode): > > > > > > "Attach Listener" daemon prio=10 tid=0x00007fecf8c58000 nid=0x32e5 > > waiting > > > on condition [0x0000000000000000] > > > java.lang.Thread.State: RUNNABLE > > > > > > "IPC Client (638223659) connection to /170.75.153.162:40980 from > > > job_1413149941617_0059" daemon prio=10 tid=0x0000000001a5c000 nid=0xce8 > > in > > > Object.wait() [0x00007fecdf80e000] > > > java.lang.Thread.State: TIMED_WAITING (on object monitor) > > > at java.lang.Object.wait(Native Method) > > > - waiting on <0x0000000099f8bf48> (a > > > org.apache.hadoop.ipc.Client$Connection) > > > at > > org.apache.hadoop.ipc.Client$Connection.waitForWork(Client.java:899) > > > - locked <0x0000000099f8bf48> (a > > > org.apache.hadoop.ipc.Client$Connection) > > > at org.apache.hadoop.ipc.Client$Connection.run(Client.java:944) > > > > > > "fetcher#5" daemon prio=10 tid=0x00007fecf8c49000 nid=0xce7 in > > > Object.wait() [0x00007fecdf90f000] > > > java.lang.Thread.State: WAITING (on object monitor) > > > at java.lang.Object.wait(Native Method) > > > - waiting on <0x0000000099f62a68> (a > > > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl) > > > at java.lang.Object.wait(Object.java:503) > > > at > > > > > > > > > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl.getHost(ShuffleSchedulerImpl.java:368) > > > - locked <0x0000000099f62a68> (a > > > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl) > > > at > > > org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:161) > > > > > > "fetcher#4" daemon prio=10 tid=0x00007fecf8c47000 nid=0xce6 in > > > Object.wait() [0x00007fecdfa10000] > > > java.lang.Thread.State: WAITING (on object monitor) > > > at java.lang.Object.wait(Native Method) > > > - waiting on <0x0000000099f62a68> (a > > > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl) > > > at java.lang.Object.wait(Object.java:503) > > > at > > > > > > > > > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl.getHost(ShuffleSchedulerImpl.java:368) > > > - locked <0x0000000099f62a68> (a > > > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl) > > > at > > > org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:161) > > > > > > "fetcher#3" daemon prio=10 tid=0x00007fecf8c45800 nid=0xce5 in > > > Object.wait() [0x00007fecdfb11000] > > > java.lang.Thread.State: WAITING (on object monitor) > > > at java.lang.Object.wait(Native Method) > > > - waiting on <0x0000000099f62a68> (a > > > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl) > > > at java.lang.Object.wait(Object.java:503) > > > at > > > > > > > > > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl.getHost(ShuffleSchedulerImpl.java:368) > > > - locked <0x0000000099f62a68> (a > > > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl) > > > at > > > org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:161) > > > > > > "fetcher#2" daemon prio=10 tid=0x00007fecf8c31800 nid=0xce4 in > > > Object.wait() [0x00007fecdfc12000] > > > java.lang.Thread.State: WAITING (on object monitor) > > > at java.lang.Object.wait(Native Method) > > > - waiting on <0x0000000099f62a68> (a > > > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl) > > > at java.lang.Object.wait(Object.java:503) > > > at > > > > > > > > > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl.getHost(ShuffleSchedulerImpl.java:368) > > > - locked <0x0000000099f62a68> (a > > > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl) > > > at > > > org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:161) > > > > > > "fetcher#1" daemon prio=10 tid=0x00007fecf8c2f800 nid=0xce3 in > > > Object.wait() [0x00007fecdfd13000] > > > java.lang.Thread.State: WAITING (on object monitor) > > > at java.lang.Object.wait(Native Method) > > > - waiting on <0x0000000099f62a68> (a > > > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl) > > > at java.lang.Object.wait(Object.java:503) > > > at > > > > > > > > > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl.getHost(ShuffleSchedulerImpl.java:368) > > > - locked <0x0000000099f62a68> (a > > > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl) > > > at > > > org.apache.hadoop.mapreduce.task.reduce.Fetcher.run(Fetcher.java:161) > > > > > > "EventFetcher for fetching Map Completion Events" daemon prio=10 > > > tid=0x00007fecf8c2d800 nid=0xce2 waiting on condition > > [0x00007fecdfe14000] > > > java.lang.Thread.State: TIMED_WAITING (sleeping) > > > at java.lang.Thread.sleep(Native Method) > > > at > > > > > > > > > org.apache.hadoop.mapreduce.task.reduce.EventFetcher.run(EventFetcher.java:73) > > > > > > "OnDiskMerger - Thread to merge on-disk map-outputs" daemon prio=10 > > > tid=0x00007fecf8c26800 nid=0xce1 in Object.wait() [0x00007fecdff15000] > > > java.lang.Thread.State: WAITING (on object monitor) > > > at java.lang.Object.wait(Native Method) > > > - waiting on <0x0000000099f63918> (a java.util.LinkedList) > > > at java.lang.Object.wait(Object.java:503) > > > at > > > > > > > > > org.apache.hadoop.mapreduce.task.reduce.MergeThread.run(MergeThread.java:87) > > > - locked <0x0000000099f63918> (a java.util.LinkedList) > > > > > > "InMemoryMerger - Thread to merge in-memory shuffled map-outputs" > daemon > > > prio=10 tid=0x00007fecf8c22800 nid=0xce0 in Object.wait() > > > [0x00007fece0016000] > > > java.lang.Thread.State: WAITING (on object monitor) > > > at java.lang.Object.wait(Native Method) > > > - waiting on <0x0000000099f61f90> (a java.util.LinkedList) > > > at java.lang.Object.wait(Object.java:503) > > > at > > > > > > > > > org.apache.hadoop.mapreduce.task.reduce.MergeThread.run(MergeThread.java:87) > > > - locked <0x0000000099f61f90> (a java.util.LinkedList) > > > > > > "ShufflePenaltyReferee" daemon prio=10 tid=0x00007fecf8c1a000 nid=0xcdf > > > waiting on condition [0x00007fece0117000] > > > java.lang.Thread.State: WAITING (parking) > > > at sun.misc.Unsafe.park(Native Method) > > > - parking to wait for <0x0000000099f63100> (a > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > > at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > > > at > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) > > > at java.util.concurrent.DelayQueue.take(DelayQueue.java:209) > > > at > > > > > > > > > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl$Referee.run(ShuffleSchedulerImpl.java:488) > > > > > > "communication thread" daemon prio=10 tid=0x00007fecf8c0b000 nid=0xcd8 > in > > > Object.wait() [0x00007fece022e000] > > > java.lang.Thread.State: TIMED_WAITING (on object monitor) > > > at java.lang.Object.wait(Native Method) > > > - waiting on <0x0000000099f5ef30> (a java.lang.Object) > > > at org.apache.hadoop.mapred.Task$TaskReporter.run(Task.java:719) > > > - locked <0x0000000099f5ef30> (a java.lang.Object) > > > at java.lang.Thread.run(Thread.java:744) > > > > > > "Thread for syncLogs" daemon prio=10 tid=0x00007fecf8a93800 nid=0xcd2 > > > waiting on condition [0x00007fece0430000] > > > java.lang.Thread.State: TIMED_WAITING (parking) > > > at sun.misc.Unsafe.park(Native Method) > > > - parking to wait for <0x0000000099b9c710> (a > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > > at > > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) > > > at > > > > > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082) > > > at > > > > > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090) > > > at > > > > > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807) > > > at > > > > > > > > > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068) > > > at > > > > > > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) > > > at > > > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > > at java.lang.Thread.run(Thread.java:744) > > > > > > "IPC Parameter Sending Thread #0" daemon prio=10 tid=0x00007fecf8a9f000 > > > nid=0xcc7 waiting on condition [0x00007fece032f000] > > > java.lang.Thread.State: TIMED_WAITING (parking) > > > at sun.misc.Unsafe.park(Native Method) > > > - parking to wait for <0x0000000099b526c0> (a > > > java.util.concurrent.SynchronousQueue$TransferStack) > > > at > > > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226) > > > at > > > > > > > > > java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460) > > > at > > > > > > > > > java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:359) > > > at > > > java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:942) > > > at > > > > > > > > > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068) > > > at > > > > > > > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) > > > at > > > > > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > > at java.lang.Thread.run(Thread.java:744) > > > > > > "Timer for 'ReduceTask' metrics system" daemon prio=10 > > > tid=0x00007fecf8958000 nid=0xcc1 in Object.wait() [0x00007fece0b98000] > > > java.lang.Thread.State: TIMED_WAITING (on object monitor) > > > at java.lang.Object.wait(Native Method) > > > - waiting on <0x0000000099b9cce0> (a java.util.TaskQueue) > > > at java.util.TimerThread.mainLoop(Timer.java:552) > > > - locked <0x0000000099b9cce0> (a java.util.TaskQueue) > > > at java.util.TimerThread.run(Timer.java:505) > > > > > > "Service Thread" daemon prio=10 tid=0x00007fecf80a4800 nid=0xc6b > runnable > > > [0x0000000000000000] > > > java.lang.Thread.State: RUNNABLE > > > > > > "C2 CompilerThread1" daemon prio=10 tid=0x00007fecf80a2800 nid=0xc6a > > > waiting on condition [0x0000000000000000] > > > java.lang.Thread.State: RUNNABLE > > > > > > "C2 CompilerThread0" daemon prio=10 tid=0x00007fecf809f800 nid=0xc69 > > > waiting on condition [0x0000000000000000] > > > java.lang.Thread.State: RUNNABLE > > > > > > "Signal Dispatcher" daemon prio=10 tid=0x00007fecf8095000 nid=0xc68 > > > runnable [0x0000000000000000] > > > java.lang.Thread.State: RUNNABLE > > > > > > "Finalizer" daemon prio=10 tid=0x00007fecf807e000 nid=0xc60 in > > > Object.wait() [0x00007fecec83c000] > > > java.lang.Thread.State: WAITING (on object monitor) > > > at java.lang.Object.wait(Native Method) > > > - waiting on <0x0000000099a1e040> (a > > java.lang.ref.ReferenceQueue$Lock) > > > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135) > > > - locked <0x0000000099a1e040> (a java.lang.ref.ReferenceQueue$Lock) > > > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:151) > > > at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:189) > > > > > > "Reference Handler" daemon prio=10 tid=0x00007fecf807a000 nid=0xc5f in > > > Object.wait() [0x00007fecec93d000] > > > java.lang.Thread.State: WAITING (on object monitor) > > > at java.lang.Object.wait(Native Method) > > > - waiting on <0x0000000099aeb3e0> (a java.lang.ref.Reference$Lock) > > > at java.lang.Object.wait(Object.java:503) > > > at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:133) > > > - locked <0x0000000099aeb3e0> (a java.lang.ref.Reference$Lock) > > > > > > "main" prio=10 tid=0x00007fecf800f800 nid=0xc4f in Object.wait() > > > [0x00007fed00948000] > > > java.lang.Thread.State: TIMED_WAITING (on object monitor) > > > at java.lang.Object.wait(Native Method) > > > - waiting on <0x0000000099f62a68> (a > > > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl) > > > at > > > > > > > > > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl.waitUntilDone(ShuffleSchedulerImpl.java:443) > > > - locked <0x0000000099f62a68> (a > > > org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl) > > > at > > > org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:129) > > > at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:376) > > > at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168) > > > at java.security.AccessController.doPrivileged(Native Method) > > > at javax.security.auth.Subject.doAs(Subject.java:415) > > > at > > > > > > > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) > > > at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163) > > > > > > "VM Thread" prio=10 tid=0x00007fecf8077800 nid=0xc5e runnable > > > > > > "GC task thread#0 (ParallelGC)" prio=10 tid=0x00007fecf8025800 > nid=0xc54 > > > runnable > > > > > > "GC task thread#1 (ParallelGC)" prio=10 tid=0x00007fecf8027000 > nid=0xc55 > > > runnable > > > > > > "GC task thread#2 (ParallelGC)" prio=10 tid=0x00007fecf8029000 > nid=0xc56 > > > runnable > > > > > > "GC task thread#3 (ParallelGC)" prio=10 tid=0x00007fecf802b000 > nid=0xc57 > > > runnable > > > > > > "GC task thread#4 (ParallelGC)" prio=10 tid=0x00007fecf802c800 > nid=0xc58 > > > runnable > > > > > > "GC task thread#5 (ParallelGC)" prio=10 tid=0x00007fecf802e800 > nid=0xc59 > > > runnable > > > > > > "GC task thread#6 (ParallelGC)" prio=10 tid=0x00007fecf8030800 > nid=0xc5a > > > runnable > > > > > > "GC task thread#7 (ParallelGC)" prio=10 tid=0x00007fecf8032800 > nid=0xc5b > > > runnable > > > > > > "VM Periodic Task Thread" prio=10 tid=0x00007fecf80af800 nid=0xc6c > > waiting > > > on condition > > > > > > JNI global references: 255 > > > > > > > > > > > > On Thu, Oct 16, 2014 at 5:20 AM, Julien Nioche < > > > [email protected]> wrote: > > > > > > > Hi Meraj > > > > > > > > You could call jstack on the Java process a couple of times to see > what > > > it > > > > is busy doing, that will be a simple of way of checking that this is > > > indeed > > > > the source of the problem. > > > > See https://issues.apache.org/jira/browse/NUTCH-1314 for a possible > > > > solution > > > > > > > > J. > > > > > > > > On 16 October 2014 06:08, Meraj A. Khan <[email protected]> wrote: > > > > > > > > > Hi All, > > > > > > > > > > I am running into a situation where the reduce phase of the fetch > job > > > > with > > > > > parsing enabled at the time of fetch is taking excessively long > > amount > > > of > > > > > time , I have seen recommendations to filter the URLs based on > length > > > to > > > > > avoid normalization related delays ,I am not filtering any URLs > based > > > on > > > > > length , could that be an issue ? > > > > > > > > > > Can anyone share if they faced this issue and what the resolution > > was, > > > I > > > > am > > > > > running Nutch 1.7 on Hadoop YARN. > > > > > > > > > > The issue was previously inconclusively discussed here. > > > > > > > > > > > > > > > > > > > > > > > > > http://markmail.org/message/p6dzvvycpfzbaugr#query:+page:1+mid:p6dzvvycpfzbaugr+state:results > > > > > > > > > > Thanks. > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Open Source Solutions for Text Engineering > > > > > > > > http://digitalpebble.blogspot.com/ > > > > http://www.digitalpebble.com > > > > http://twitter.com/digitalpebble > > > > > > > > > > > > > > > -- > > > > Open Source Solutions for Text Engineering > > > > http://digitalpebble.blogspot.com/ > > http://www.digitalpebble.com > > http://twitter.com/digitalpebble > > > -- Open Source Solutions for Text Engineering http://digitalpebble.blogspot.com/ http://www.digitalpebble.com http://twitter.com/digitalpebble

