More⦠The call to getChildren() just to find out how many children there are is very expensive. Instead, call checkExists() on the parent path and use the Stat object returned. The Stat object has a getNumChildren() method.
-Jordan On Nov 17, 2013, at 1:20 PM, Sznajder ForMailingList <[email protected]> wrote: > First at all , thank you for your answer. > > Here is the simple code, I used: > > The producer and queueconsummer are given in the class > > Every 5 minutes, I am printing the the number of processed items, and I see > some drastic differences between the different consumers: > > > > Producer: > =-=-=-=-= > > package com.zk; > > import java.io.Closeable; > import java.io.IOException; > import java.text.DateFormat; > import java.text.SimpleDateFormat; > import java.util.Date; > import java.util.List; > > import org.apache.curator.framework.CuratorFramework; > import org.apache.curator.framework.api.CuratorEvent; > import org.apache.curator.framework.api.CuratorListener; > import org.apache.curator.framework.recipes.queue.DistributedQueue; > import org.apache.curator.test.TestingServer; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > > public class QueueProducer implements Closeable { > > final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class); > > final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss"); > > protected static final String PATH = "/test_queue"; > > protected static final String LOCK_PATH = "/test_lock_queue"; > > private DistributedQueue<CrawlUrl> queue; > > private static final int QUEUE_SIZE = 100000; > > private int items; > > public QueueProducer(CuratorFramework framework) throws Exception { > LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is a > QueueProducer"); > System.out.println(java.net.InetAddress.getLocalHost().getHostName() > + " is a QueueProducer"); > this.queue = Utils.newDistributedQueue(framework, > Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null); > this.queue.start(); > addQueueContent(QUEUE_SIZE); > System.out.println("Done with the initial init"); > > > // We register to the listener for monitoring the number of elements > // in the queue > framework.getCuratorListenable().addListener(new CuratorListener() { > @Override > public void eventReceived(final CuratorFramework framework_, > CuratorEvent event) throws Exception { > if (event.getPath() != null && > event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) { > // this also restores the notification > List<String> children = framework_.getChildren() > .watched().forPath(Utils.CRAWL_QUEUE_PATH); > if (children.size() <= QUEUE_SIZE/2) { > addQueueContent(QUEUE_SIZE - children.size()); > } > } > } > }); > > > while (true) { > List<String> children = > framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH); > if (children.size() <= QUEUE_SIZE/2) { > LOG.info(dateFormat.format(new Date()) + " - In the > while(true) - We call for size " + children.size()); > addQueueContent(QUEUE_SIZE - children.size()); > } > > Thread.sleep(5000); > > } > } > > void addQueueContent(int numberOfItems) { > LOG.info(dateFormat.format(new Date()) + " - addQueueContent " + > numberOfItems); > for (int i = 0; i < numberOfItems; i++) { > try { > CrawlUrl url = new CrawlUrl(""+this.items++); > this.queue.put(url); > } catch (Exception e) { > LOG.error ("Caught an error when adding the item " + i + " in > the initQueueContent()"); > } > } > } > > public static void main(String[] args) { > CrawlerPropertyFile props; > try { > props = new CrawlerPropertyFile(args[0]); > > final String connectString; > System.out.println("DEBUG = " + Utils.DEBUG); > if (props.useZkTestServer()) { > System.out.println("Will launch from zkTestServer"); > TestingServer server = new TestingServer(); > connectString = server.getConnectString(); > } else { > connectString = props.getZkServer(); > } > > final CuratorFramework framework = > Utils.newFramework(connectString); > framework.start(); > > @SuppressWarnings("unused") > QueueProducer producer = new QueueProducer(framework); > } catch (Exception e) { > e.printStackTrace(); > } > > } > > @Override > public void close() throws IOException { > this.queue.close(); > } > > > > } > > > > > Consumer > =-=-=-=-=- > > package com.zk; > > import java.io.Closeable; > import java.io.File; > import java.io.FileWriter; > import java.io.IOException; > import java.text.DateFormat; > import java.text.SimpleDateFormat; > import java.util.Date; > > import org.apache.curator.framework.CuratorFramework; > import org.apache.curator.framework.recipes.queue.DistributedQueue; > import org.apache.curator.framework.recipes.queue.QueueConsumer; > import org.apache.curator.framework.state.ConnectionState; > import org.apache.curator.test.TestingServer; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > > > public class MyQueueConsumer implements Closeable{ > > private DistributedQueue<CrawlUrl> queue; > > String name; > > String id; > > FileWriter timeCounter; > > final static Logger LOG = LoggerFactory.getLogger(MyQueueConsumer.class); > > final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss"); > > int numberOfProcessedURL; > > private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread; > > private class FileWriterThread extends Thread { > > public FileWriterThread() { > // empty ctor > } > > @Override > public void run() { > // We write the stats: > > try { > while (true) { > > MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date()) + " "+ > > "[numberOfProcessed="+MyQueueConsumer.this.numberOfProcessedURL +"]\n") ; > MyQueueConsumer.this.timeCounter.flush(); > > // Sleeps 5 minutes > Thread.sleep(300000); > } > } catch (Exception e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > } > } > > > public MyQueueConsumer(CuratorFramework framework, final String id) > throws Exception { > this.id = id; > this.name = java.net.InetAddress.getLocalHost().getHostName(); > this.timeCounter = new FileWriter(new File("MyQueueConsumer_"+ > this.name + "_" +id + "_timeCounter.txt")); > > // this.timeCounterThread = new FileWriterThread(); > // this.timeCounterThread.start(); > this.queue = Utils.newDistributedQueue(framework, > Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new > QueueConsumer<CrawlUrl>() { > > @Override > public void stateChanged(CuratorFramework client, ConnectionState > newState) { > System.out.println(String.format("[%s] connection state > changed to %s", id, newState)); > } > > @Override > public void consumeMessage(CrawlUrl url) throws Exception { > try { > LOG.info(dateFormat.format(new > Date(System.currentTimeMillis())) + "["+id+ "-" + MyQueueConsumer.this.name+ > "] processed " + url.url); > MyQueueConsumer.this.numberOfProcessedURL++; > } catch (Exception e) { > LOG.error( "["+id+ "-" + MyQueueConsumer.this.name+ "]" + > e.getMessage() + " for url " + url.url ); > } > } > > }); > try { > this.queue.start(); > } catch (Exception e) { > e.printStackTrace(); > } > > } > > public static void main(String[] args) { > try { > CrawlerPropertyFile props = new CrawlerPropertyFile(args[0]); > > final String connectString; > System.out.println("DEBUG = " + Utils.DEBUG); > if (props.useZkTestServer()) { > System.out.println("Will launch from zkTestServer"); > TestingServer server = new TestingServer(); > connectString = server.getConnectString(); > } else { > connectString = props.getZkServer(); > } > > final CuratorFramework framework = > Utils.newFramework(connectString); > framework.start(); > > final MyQueueConsumer[] queueConsumers = new > MyQueueConsumer[props.getNumberOfWorkers()]; > > for (int i = 0; i < queueConsumers.length; i++) { > queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i); > } > > Runtime.getRuntime().addShutdownHook(new Thread() { > @Override > public void run() { > // close workers > Throwable t = null; > LOG.info("We close the workers"); > for (MyQueueConsumer queueConsumer : queueConsumers) { > try { > queueConsumer.close(); > } catch (Throwable th) { > if (t == null) { > t = th; > } > } > } > // throw first exception that we encountered > if (t != null) { > throw new RuntimeException("some workers failed to > close", t); > } > } > }); > > }catch (Exception e ){ > e.printStackTrace(); > } > } > > @Override > public void close() throws IOException { > this.queue.close(); > } > } > > > > > Main > -=-=- > > package com.zk; > > import org.apache.curator.framework.CuratorFramework; > import org.apache.curator.test.TestingServer; > > > > public class QueueTestMain { > > /** > * @param args > */ > public static void main(String[] args) { > CrawlerPropertyFile props; > try { > props = new CrawlerPropertyFile(args[0]); > > final String connectString; > System.out.println("DEBUG = " + Utils.DEBUG); > if (props.useZkTestServer()) { > System.out.println("Will launch from zkTestServer"); > TestingServer server = new TestingServer(); > connectString = server.getConnectString(); > } else { > connectString = props.getZkServer(); > } > > final CuratorFramework framework = > Utils.newFramework(connectString); > framework.start(); > > > if (args[1] != null && args[1].equalsIgnoreCase("true")) { > @SuppressWarnings("unused") > QueueProducer producer = new QueueProducer(framework); > } else { > > final MyQueueConsumer[] queueConsumers = new > MyQueueConsumer[props.getNumberOfWorkers()]; > > for (int i = 0; i < queueConsumers.length; i++) { > queueConsumers[i] = new MyQueueConsumer(framework, > "id_"+i); > } > > Runtime.getRuntime().addShutdownHook(new Thread() { > @Override > public void run() { > // close workers > Throwable t = null; > for (MyQueueConsumer queueConsumer : queueConsumers) { > try { > queueConsumer.close(); > } catch (Throwable th) { > if (t == null) { > t = th; > } > } > } > // throw first exception that we encountered > if (t != null) { > throw new RuntimeException("some workers failed > to close", t); > } > } > }); > > > } > }catch (Exception e ){ > e.printStackTrace(); > } > > } > > } > > > > > Example of output: > > > > > > On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman > <[email protected]> wrote: > Can you produce a test that shows this? Anything else interesting in the log? > Of course, there could be a bug. > > -Jordan > > On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList > <[email protected]> wrote: > > > Hi > > > > I made a short test as following: > > > > - I have a chorum of 3 nodes for Zookeeper. > > - I wrote a class using Curator QueueProducer who produces all the time > > (when the queue is 10% full, it creates new items) , items (random integer) > > - I wrote a simple class using Curator Queue Consumer which simply prints > > to Log "consumed item i". > > > > I tested some different combinations : > > - running the consumers on one, two or three nodes. > > - running one or more consumers in parallel on a given node. > > > > > > But, and here is my question: I see some very strange behavior when I have > > several consummers in parallel on a node. For example, running 5 consumers > > per node on 3 nodes, I see a throughput **very** slow. When looking at my > > Log, I see that most of the consumers are most of the time on an idle > > state.... > > > > Do I mistake somewhere? > > I was expecting to enhance the throughput by augmenting the number of > > consumers, I am surprised to see the opposite.... > > > > Thanks a lot > > > > Benjamin > >
