> I haven't set topology.max.spout.pending. Using the default value, but > I'm watching a internal Set to check the size of beeing processed tuple. >
Found it in the snippet you posted earlier. This duplicates what topology.max.spout.pending does so unless you have another use for that internal set, it would be a good idea to rely on the default mechanism instead. 200000 is a very large value. Do you have any idea of how diverse your URLs are in terms of hostname / domain / IP? Bear in mind that the FetcherBolts (both implementations) are polite and will block URLs until the minimum configured amount of time has elapsed since the previous call to the same server has completed. also if you have more URLs in flight than fetching threads then they will sit and wait in the queues. Either way, these might trigger a timeout after a while (30 secs by default => https://github.com/apache/storm/blob/master/conf/defaults.yaml#L228) which could explain what you are experiencing. > I've logged : nextTuple, fail and ack method. And on the log, sometimes, > about thirty seconds none of these methods are called... seems that the > Thread is busy doing other things > See above. > > Yeah I'm using your SDK, very great btw. I've just change the outlinks > indexer to store data on redis :-) > Do you mean a StatusUpdaterBolt <https://github.com/DigitalPebble/storm-crawler/blob/master/core/src/main/java/com/digitalpebble/storm/crawler/persistence/AbstractStatusUpdaterBolt.java> ? What does the rest of your topology look like? This thread is quite specific to StormCrawler and might not be of interest for other Storm users, feel free to continue to [ http://groups.google.com/group/digitalpebble] or use the tag stormcrawler <http://stackoverflow.com/questions/tagged/stormcrawler> on StackOverflow. HTH Julien > > 2016-05-09 22:27 GMT+02:00 Julien Nioche <[email protected]>: > >> Hi Adrien >> >> Did you set a value to max spout pending? Could it be that you have >> reached the max number of tuples in process? Do you see acks or fails >> happen during that period? >> >> Great to hear that you are using StormCrawler BTW >> >> Julien >> >> >> On 9 May 2016 at 20:48, Adrien Carreira <[email protected]> wrote: >> >>> I think the problem is when My topology is working the thread calling >>> nextTuple seems to be busy... Why the method isn't called ? >>> >>> Someone can guid me to some documentation or the code calling nextTuple >>> just to understand what is blocking.... >>> >>> Thank you guys >>> >>> 2016-05-09 9:57 GMT+02:00 Adrien Carreira <[email protected]>: >>> >>>> Hi there, >>>> >>>> I'm using Storm to build a web-crawler, using Storm Crawler SDK. >>>> >>>> I'm also using Redis to store new links discovered. >>>> >>>> I've a Spout to consume those url. After many debug , I've built the >>>> Spout like this : >>>> >>>> public class OutlinkSpoutRedis extends BaseRichSpout { >>>> >>>> private static final Logger LOG = LoggerFactory >>>> .getLogger(OutlinkSpoutRedis.class); >>>> private LinkedBlockingQueue<Values> queue = new >>>> LinkedBlockingQueue<>(); >>>> private LinkedBlockingQueue<String> ackQueue = new >>>> LinkedBlockingQueue<>(); >>>> private LinkedBlockingQueue<String> failQueue = new >>>> LinkedBlockingQueue<>(); >>>> >>>> >>>> @Override >>>> public void nextTuple() { >>>> LOG.info(">>> Calling nextTuple"); >>>> >>>> if (beingProcessed.size() >= 200000) { >>>> LOG.info("Too much beeing processed"); >>>> Utils.sleep(50); >>>> return; >>>> } >>>> >>>> LOG.info("Pooling from queue"); >>>> Values ret = queue.poll(); >>>> >>>> if (ret == null) { >>>> LOG.info("Pooling from queue = null"); >>>> Utils.sleep(50); >>>> return; >>>> } >>>> >>>> LOG.info("Emitting one url"); >>>> >>>> String url = ret.get(0).toString(); >>>> beingProcessed.put(url, ""); >>>> >>>> this._collector.emit(ret, url); >>>> } >>>> >>>> @Override >>>> public void ack(Object msgId) { >>>> LOG.info("Acking"); >>>> this.beingProcessed.remove(msgId); >>>> this.ackQueue.offer((String) msgId); >>>> } >>>> >>>> @Override >>>> public void fail(Object msgId) { >>>> LOG.error("Fail tuple {}", msgId); >>>> this.beingProcessed.remove(msgId); >>>> this.failQueue.offer((String) msgId); >>>> } >>>> >>>> private class ProducerThread extends Thread { >>>> @Override >>>> public void run() { >>>> while (activated) { >>>> try { >>>> if (this.queue.size() <= 1000) { >>>> this.populateQueue(); >>>> } >>>> >>>> Utils.sleep(100); >>>> } catch (Exception e) { >>>> LOG.error("Error reading queues from redis", e); >>>> } >>>> } >>>> } >>>> >>>> private void populateQueue() { >>>> // Calling Redis to populate Queue >>>> queue.offer(new Values(url, metadata)); >>>> } >>>> } >>>> >>>> private abstract class AckFailThread extends Thread { >>>> @Override >>>> public void run() { >>>> while (activated) { >>>> String message = queue.poll(1, TimeUnit.SECONDS); >>>> >>>> if (message != null) { >>>> this.handleMessage(message); >>>> } >>>> } >>>> } >>>> >>>> protected abstract void handleMessage(String message); >>>> }} >>>> >>>> >>>> I've remove unnecessary code. >>>> To understand : nextTuple is polling from a queue (populated on another >>>> thread) and ack,fail are emitting to a queue, consumed in two another >>>> thread. So, those three methods are not blocking. >>>> >>>> My problem is on running state, my spout is not called sometimes about >>>> thirty seconds, but there still message on nextTuple queue to be consumed. >>>> The spout is not acking or failling, So why the Spout thread is >>>> blocked ? >>>> >>>> Thank >>>> >>>> >>>> >>>> >>> >> >> >> -- >> >> *Open Source Solutions for Text Engineering* >> >> http://www.digitalpebble.com >> http://digitalpebble.blogspot.com/ >> #digitalpebble <http://twitter.com/digitalpebble> >> > > -- *Open Source Solutions for Text Engineering* http://www.digitalpebble.com http://digitalpebble.blogspot.com/ #digitalpebble <http://twitter.com/digitalpebble>
