Hi Jordan,
Thanks again for your so quick answer.
I would like to be sure about your hint.
Do you mean that I should change the following method
@Override
public void consumeMessage(CrawlUrl url) throws Exception {
try {
LOG.info(dateFormat.format(new
Date(System.currentTimeMillis())) + "["+id+ "-" + MyQueueConsumer.this.name+
"] processed " + url.url);
MyQueueConsumer.this.numberOfProcessedURL++;
} catch (Exception e) {
LOG.error( "["+id+ "-" + MyQueueConsumer.this.name+ "]"
+ e.getMessage() + " for url " + url.url );
}
}
or the code in the QueueProducer?
In this last case (code in the QueueProducer), I think it will not solve
the problem: I can see in my logs that the consumers are idle even when the
Producer does not do anything...
Best regards
Benjamin
On Sun, Nov 17, 2013 at 11:33 PM, Jordan Zimmerman <
[email protected]> wrote:
> I’ll look further at this, but the first thing that I notice is that you
> are doing “work” in your Curator Listener. Please read Curator Tech Note 1:
>
> https://cwiki.apache.org/confluence/display/CURATOR/TN1
>
> The quickest fix would be to do the getChildren() as a background
> operation. Alternatively, you can pass in a thread pool when registering
> the listener.
>
> -Jordan
>
> On Nov 17, 2013, at 1:20 PM, Sznajder ForMailingList <
> [email protected]> wrote:
>
> First at all , thank you for your answer.
>
> Here is the simple code, I used:
>
> The producer and queueconsummer are given in the class
>
> Every 5 minutes, I am printing the the number of processed items, and I
> see some drastic differences between the different consumers:
>
>
>
> Producer:
> =-=-=-=-=
>
> package com.zk;
>
> import java.io.Closeable;
> import java.io.IOException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.Date;
> import java.util.List;
>
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.api.CuratorEvent;
> import org.apache.curator.framework.api.CuratorListener;
> import org.apache.curator.framework.recipes.queue.DistributedQueue;
> import org.apache.curator.test.TestingServer;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
>
> public class QueueProducer implements Closeable {
>
> final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class);
>
> final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>
> protected static final String PATH = "/test_queue";
>
> protected static final String LOCK_PATH = "/test_lock_queue";
>
> private DistributedQueue<CrawlUrl> queue;
>
> private static final int QUEUE_SIZE = 100000;
>
> private int items;
>
> public QueueProducer(CuratorFramework framework) throws Exception {
> LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is
> a QueueProducer");
>
> System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is
> a QueueProducer");
> this.queue = Utils.newDistributedQueue(framework,
> Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null);
> this.queue.start();
> addQueueContent(QUEUE_SIZE);
> System.out.println("Done with the initial init");
>
>
> // We register to the listener for monitoring the number of
> elements
> // in the queue
> framework.getCuratorListenable().addListener(new CuratorListener()
> {
> @Override
> public void eventReceived(final CuratorFramework framework_,
> CuratorEvent event) throws Exception {
> if (event.getPath() != null &&
> event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) {
> // this also restores the notification
> List<String> children = framework_.getChildren()
> .watched().forPath(Utils.CRAWL_QUEUE_PATH);
> if (children.size() <= QUEUE_SIZE/2) {
> addQueueContent(QUEUE_SIZE - children.size());
> }
> }
> }
> });
>
>
> while (true) {
> List<String> children =
> framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH);
> if (children.size() <= QUEUE_SIZE/2) {
> LOG.info(dateFormat.format(new Date()) + " - In the
> while(true) - We call for size " + children.size());
> addQueueContent(QUEUE_SIZE - children.size());
> }
>
> Thread.sleep(5000);
>
> }
> }
>
> void addQueueContent(int numberOfItems) {
> LOG.info(dateFormat.format(new Date()) + " - addQueueContent " +
> numberOfItems);
> for (int i = 0; i < numberOfItems; i++) {
> try {
> CrawlUrl url = new CrawlUrl(""+this.items++);
> this.queue.put(url);
> } catch (Exception e) {
> LOG.error ("Caught an error when adding the item " + i + "
> in the initQueueContent()");
> }
> }
> }
>
> public static void main(String[] args) {
> CrawlerPropertyFile props;
> try {
> props = new CrawlerPropertyFile(args[0]);
>
> final String connectString;
> System.out.println("DEBUG = " + Utils.DEBUG);
> if (props.useZkTestServer()) {
> System.out.println("Will launch from zkTestServer");
> TestingServer server = new TestingServer();
> connectString = server.getConnectString();
> } else {
> connectString = props.getZkServer();
> }
>
> final CuratorFramework framework =
> Utils.newFramework(connectString);
> framework.start();
>
> @SuppressWarnings("unused")
> QueueProducer producer = new QueueProducer(framework);
> } catch (Exception e) {
> e.printStackTrace();
> }
>
> }
>
> @Override
> public void close() throws IOException {
> this.queue.close();
> }
>
>
>
> }
>
>
>
>
> Consumer
> =-=-=-=-=-
>
> package com.zk;
>
> import java.io.Closeable;
> import java.io.File;
> import java.io.FileWriter;
> import java.io.IOException;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.Date;
>
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.framework.recipes.queue.DistributedQueue;
> import org.apache.curator.framework.recipes.queue.QueueConsumer;
> import org.apache.curator.framework.state.ConnectionState;
> import org.apache.curator.test.TestingServer;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
>
>
> public class MyQueueConsumer implements Closeable{
>
> private DistributedQueue<CrawlUrl> queue;
>
> String name;
>
> String id;
>
> FileWriter timeCounter;
>
> final static Logger LOG =
> LoggerFactory.getLogger(MyQueueConsumer.class);
>
> final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>
> int numberOfProcessedURL;
>
> private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread;
>
> private class FileWriterThread extends Thread {
>
> public FileWriterThread() {
> // empty ctor
> }
>
> @Override
> public void run() {
> // We write the stats:
>
> try {
> while (true) {
>
> MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date()) + " "+
>
> "[numberOfProcessed="+MyQueueConsumer.this.numberOfProcessedURL +"]\n") ;
> MyQueueConsumer.this.timeCounter.flush();
>
> // Sleeps 5 minutes
> Thread.sleep(300000);
> }
> } catch (Exception e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
> }
>
>
> public MyQueueConsumer(CuratorFramework framework, final String id)
> throws Exception {
> this.id = id;
> this.name = java.net.InetAddress.getLocalHost().getHostName();
> this.timeCounter = new FileWriter(new File("MyQueueConsumer_"+
> this.name + "_" +id + "_timeCounter.txt"));
>
> // this.timeCounterThread = new FileWriterThread();
> // this.timeCounterThread.start();
> this.queue = Utils.newDistributedQueue(framework,
> Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new
> QueueConsumer<CrawlUrl>() {
>
> @Override
> public void stateChanged(CuratorFramework client,
> ConnectionState newState) {
> System.out.println(String.format("[%s] connection state
> changed to %s", id, newState));
> }
>
> @Override
> public void consumeMessage(CrawlUrl url) throws Exception {
> try {
> LOG.info(dateFormat.format(new
> Date(System.currentTimeMillis())) + "["+id+ "-" +
> MyQueueConsumer.this.name <http://myqueueconsumer.this.name/>+ "]
> processed " + url.url);
> MyQueueConsumer.this.numberOfProcessedURL++;
> } catch (Exception e) {
> LOG.error( "["+id+ "-" +
> MyQueueConsumer.this.name<http://myqueueconsumer.this.name/>+
> "]" + e.getMessage() + " for url " + url.url );
> }
> }
>
> });
> try {
> this.queue.start();
> } catch (Exception e) {
> e.printStackTrace();
> }
>
> }
>
> public static void main(String[] args) {
> try {
> CrawlerPropertyFile props = new CrawlerPropertyFile(args[0]);
>
> final String connectString;
> System.out.println("DEBUG = " + Utils.DEBUG);
> if (props.useZkTestServer()) {
> System.out.println("Will launch from zkTestServer");
> TestingServer server = new TestingServer();
> connectString = server.getConnectString();
> } else {
> connectString = props.getZkServer();
> }
>
> final CuratorFramework framework =
> Utils.newFramework(connectString);
> framework.start();
>
> final MyQueueConsumer[] queueConsumers = new
> MyQueueConsumer[props.getNumberOfWorkers()];
>
> for (int i = 0; i < queueConsumers.length; i++) {
> queueConsumers[i] = new MyQueueConsumer(framework,
> "id_"+i);
> }
>
> Runtime.getRuntime().addShutdownHook(new Thread() {
> @Override
> public void run() {
> // close workers
> Throwable t = null;
> LOG.info("We close the workers");
> for (MyQueueConsumer queueConsumer : queueConsumers) {
> try {
> queueConsumer.close();
> } catch (Throwable th) {
> if (t == null) {
> t = th;
> }
> }
> }
> // throw first exception that we encountered
> if (t != null) {
> throw new RuntimeException("some workers failed to
> close", t);
> }
> }
> });
>
> }catch (Exception e ){
> e.printStackTrace();
> }
> }
>
> @Override
> public void close() throws IOException {
> this.queue.close();
> }
> }
>
>
>
>
> Main
> -=-=-
>
> package com.zk;
>
> import org.apache.curator.framework.CuratorFramework;
> import org.apache.curator.test.TestingServer;
>
>
>
> public class QueueTestMain {
>
> /**
> * @param args
> */
> public static void main(String[] args) {
> CrawlerPropertyFile props;
> try {
> props = new CrawlerPropertyFile(args[0]);
>
> final String connectString;
> System.out.println("DEBUG = " + Utils.DEBUG);
> if (props.useZkTestServer()) {
> System.out.println("Will launch from zkTestServer");
> TestingServer server = new TestingServer();
> connectString = server.getConnectString();
> } else {
> connectString = props.getZkServer();
> }
>
> final CuratorFramework framework =
> Utils.newFramework(connectString);
> framework.start();
>
>
> if (args[1] != null && args[1].equalsIgnoreCase("true")) {
> @SuppressWarnings("unused")
> QueueProducer producer = new QueueProducer(framework);
> } else {
>
> final MyQueueConsumer[] queueConsumers = new
> MyQueueConsumer[props.getNumberOfWorkers()];
>
> for (int i = 0; i < queueConsumers.length; i++) {
> queueConsumers[i] = new MyQueueConsumer(framework,
> "id_"+i);
> }
>
> Runtime.getRuntime().addShutdownHook(new Thread() {
> @Override
> public void run() {
> // close workers
> Throwable t = null;
> for (MyQueueConsumer queueConsumer :
> queueConsumers) {
> try {
> queueConsumer.close();
> } catch (Throwable th) {
> if (t == null) {
> t = th;
> }
> }
> }
> // throw first exception that we encountered
> if (t != null) {
> throw new RuntimeException("some workers
> failed to close", t);
> }
> }
> });
>
>
> }
> }catch (Exception e ){
> e.printStackTrace();
> }
>
> }
>
> }
>
>
>
>
> Example of output:
>
>
>
>
>
> On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman <
> [email protected]> wrote:
>
>> Can you produce a test that shows this? Anything else interesting in the
>> log? Of course, there could be a bug.
>>
>> -Jordan
>>
>> On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList <
>> [email protected]> wrote:
>>
>> > Hi
>> >
>> > I made a short test as following:
>> >
>> > - I have a chorum of 3 nodes for Zookeeper.
>> > - I wrote a class using Curator QueueProducer who produces all the time
>> (when the queue is 10% full, it creates new items) , items (random integer)
>> > - I wrote a simple class using Curator Queue Consumer which simply
>> prints to Log "consumed item i".
>> >
>> > I tested some different combinations :
>> > - running the consumers on one, two or three nodes.
>> > - running one or more consumers in parallel on a given node.
>> >
>> >
>> > But, and here is my question: I see some very strange behavior when I
>> have several consummers in parallel on a node. For example, running 5
>> consumers per node on 3 nodes, I see a throughput **very** slow. When
>> looking at my Log, I see that most of the consumers are most of the time on
>> an idle state....
>> >
>> > Do I mistake somewhere?
>> > I was expecting to enhance the throughput by augmenting the number of
>> consumers, I am surprised to see the opposite....
>> >
>> > Thanks a lot
>> >
>> > Benjamin
>>
>>
>
>