The example out is missing. Please provide that too. -Jordan
On Nov 17, 2013, at 1:20 PM, Sznajder ForMailingList <[email protected]> wrote: > First at all , thank you for your answer. > > Here is the simple code, I used: > > The producer and queueconsummer are given in the class > > Every 5 minutes, I am printing the the number of processed items, and I see > some drastic differences between the different consumers: > > > > Producer: > =-=-=-=-= > > package com.zk; > > import java.io.Closeable; > import java.io.IOException; > import java.text.DateFormat; > import java.text.SimpleDateFormat; > import java.util.Date; > import java.util.List; > > import org.apache.curator.framework.CuratorFramework; > import org.apache.curator.framework.api.CuratorEvent; > import org.apache.curator.framework.api.CuratorListener; > import org.apache.curator.framework.recipes.queue.DistributedQueue; > import org.apache.curator.test.TestingServer; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > > public class QueueProducer implements Closeable { > > final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class); > > final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss"); > > protected static final String PATH = "/test_queue"; > > protected static final String LOCK_PATH = "/test_lock_queue"; > > private DistributedQueue<CrawlUrl> queue; > > private static final int QUEUE_SIZE = 100000; > > private int items; > > public QueueProducer(CuratorFramework framework) throws Exception { > LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is a > QueueProducer"); > System.out.println(java.net.InetAddress.getLocalHost().getHostName() > + " is a QueueProducer"); > this.queue = Utils.newDistributedQueue(framework, > Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null); > this.queue.start(); > addQueueContent(QUEUE_SIZE); > System.out.println("Done with the initial init"); > > > // We register to the listener for monitoring the number of elements > // in the queue > framework.getCuratorListenable().addListener(new CuratorListener() { > @Override > public void eventReceived(final CuratorFramework framework_, > CuratorEvent event) throws Exception { > if (event.getPath() != null && > event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) { > // this also restores the notification > List<String> children = framework_.getChildren() > .watched().forPath(Utils.CRAWL_QUEUE_PATH); > if (children.size() <= QUEUE_SIZE/2) { > addQueueContent(QUEUE_SIZE - children.size()); > } > } > } > }); > > > while (true) { > List<String> children = > framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH); > if (children.size() <= QUEUE_SIZE/2) { > LOG.info(dateFormat.format(new Date()) + " - In the > while(true) - We call for size " + children.size()); > addQueueContent(QUEUE_SIZE - children.size()); > } > > Thread.sleep(5000); > > } > } > > void addQueueContent(int numberOfItems) { > LOG.info(dateFormat.format(new Date()) + " - addQueueContent " + > numberOfItems); > for (int i = 0; i < numberOfItems; i++) { > try { > CrawlUrl url = new CrawlUrl(""+this.items++); > this.queue.put(url); > } catch (Exception e) { > LOG.error ("Caught an error when adding the item " + i + " in > the initQueueContent()"); > } > } > } > > public static void main(String[] args) { > CrawlerPropertyFile props; > try { > props = new CrawlerPropertyFile(args[0]); > > final String connectString; > System.out.println("DEBUG = " + Utils.DEBUG); > if (props.useZkTestServer()) { > System.out.println("Will launch from zkTestServer"); > TestingServer server = new TestingServer(); > connectString = server.getConnectString(); > } else { > connectString = props.getZkServer(); > } > > final CuratorFramework framework = > Utils.newFramework(connectString); > framework.start(); > > @SuppressWarnings("unused") > QueueProducer producer = new QueueProducer(framework); > } catch (Exception e) { > e.printStackTrace(); > } > > } > > @Override > public void close() throws IOException { > this.queue.close(); > } > > > > } > > > > > Consumer > =-=-=-=-=- > > package com.zk; > > import java.io.Closeable; > import java.io.File; > import java.io.FileWriter; > import java.io.IOException; > import java.text.DateFormat; > import java.text.SimpleDateFormat; > import java.util.Date; > > import org.apache.curator.framework.CuratorFramework; > import org.apache.curator.framework.recipes.queue.DistributedQueue; > import org.apache.curator.framework.recipes.queue.QueueConsumer; > import org.apache.curator.framework.state.ConnectionState; > import org.apache.curator.test.TestingServer; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > > > public class MyQueueConsumer implements Closeable{ > > private DistributedQueue<CrawlUrl> queue; > > String name; > > String id; > > FileWriter timeCounter; > > final static Logger LOG = LoggerFactory.getLogger(MyQueueConsumer.class); > > final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss"); > > int numberOfProcessedURL; > > private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread; > > private class FileWriterThread extends Thread { > > public FileWriterThread() { > // empty ctor > } > > @Override > public void run() { > // We write the stats: > > try { > while (true) { > > MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date()) + " "+ > > "[numberOfProcessed="+MyQueueConsumer.this.numberOfProcessedURL +"]\n") ; > MyQueueConsumer.this.timeCounter.flush(); > > // Sleeps 5 minutes > Thread.sleep(300000); > } > } catch (Exception e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > } > } > > > public MyQueueConsumer(CuratorFramework framework, final String id) > throws Exception { > this.id = id; > this.name = java.net.InetAddress.getLocalHost().getHostName(); > this.timeCounter = new FileWriter(new File("MyQueueConsumer_"+ > this.name + "_" +id + "_timeCounter.txt")); > > // this.timeCounterThread = new FileWriterThread(); > // this.timeCounterThread.start(); > this.queue = Utils.newDistributedQueue(framework, > Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new > QueueConsumer<CrawlUrl>() { > > @Override > public void stateChanged(CuratorFramework client, ConnectionState > newState) { > System.out.println(String.format("[%s] connection state > changed to %s", id, newState)); > } > > @Override > public void consumeMessage(CrawlUrl url) throws Exception { > try { > LOG.info(dateFormat.format(new > Date(System.currentTimeMillis())) + "["+id+ "-" + MyQueueConsumer.this.name+ > "] processed " + url.url); > MyQueueConsumer.this.numberOfProcessedURL++; > } catch (Exception e) { > LOG.error( "["+id+ "-" + MyQueueConsumer.this.name+ "]" + > e.getMessage() + " for url " + url.url ); > } > } > > }); > try { > this.queue.start(); > } catch (Exception e) { > e.printStackTrace(); > } > > } > > public static void main(String[] args) { > try { > CrawlerPropertyFile props = new CrawlerPropertyFile(args[0]); > > final String connectString; > System.out.println("DEBUG = " + Utils.DEBUG); > if (props.useZkTestServer()) { > System.out.println("Will launch from zkTestServer"); > TestingServer server = new TestingServer(); > connectString = server.getConnectString(); > } else { > connectString = props.getZkServer(); > } > > final CuratorFramework framework = > Utils.newFramework(connectString); > framework.start(); > > final MyQueueConsumer[] queueConsumers = new > MyQueueConsumer[props.getNumberOfWorkers()]; > > for (int i = 0; i < queueConsumers.length; i++) { > queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i); > } > > Runtime.getRuntime().addShutdownHook(new Thread() { > @Override > public void run() { > // close workers > Throwable t = null; > LOG.info("We close the workers"); > for (MyQueueConsumer queueConsumer : queueConsumers) { > try { > queueConsumer.close(); > } catch (Throwable th) { > if (t == null) { > t = th; > } > } > } > // throw first exception that we encountered > if (t != null) { > throw new RuntimeException("some workers failed to > close", t); > } > } > }); > > }catch (Exception e ){ > e.printStackTrace(); > } > } > > @Override > public void close() throws IOException { > this.queue.close(); > } > } > > > > > Main > -=-=- > > package com.zk; > > import org.apache.curator.framework.CuratorFramework; > import org.apache.curator.test.TestingServer; > > > > public class QueueTestMain { > > /** > * @param args > */ > public static void main(String[] args) { > CrawlerPropertyFile props; > try { > props = new CrawlerPropertyFile(args[0]); > > final String connectString; > System.out.println("DEBUG = " + Utils.DEBUG); > if (props.useZkTestServer()) { > System.out.println("Will launch from zkTestServer"); > TestingServer server = new TestingServer(); > connectString = server.getConnectString(); > } else { > connectString = props.getZkServer(); > } > > final CuratorFramework framework = > Utils.newFramework(connectString); > framework.start(); > > > if (args[1] != null && args[1].equalsIgnoreCase("true")) { > @SuppressWarnings("unused") > QueueProducer producer = new QueueProducer(framework); > } else { > > final MyQueueConsumer[] queueConsumers = new > MyQueueConsumer[props.getNumberOfWorkers()]; > > for (int i = 0; i < queueConsumers.length; i++) { > queueConsumers[i] = new MyQueueConsumer(framework, > "id_"+i); > } > > Runtime.getRuntime().addShutdownHook(new Thread() { > @Override > public void run() { > // close workers > Throwable t = null; > for (MyQueueConsumer queueConsumer : queueConsumers) { > try { > queueConsumer.close(); > } catch (Throwable th) { > if (t == null) { > t = th; > } > } > } > // throw first exception that we encountered > if (t != null) { > throw new RuntimeException("some workers failed > to close", t); > } > } > }); > > > } > }catch (Exception e ){ > e.printStackTrace(); > } > > } > > } > > > > > Example of output: > > > > > > On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman > <[email protected]> wrote: > Can you produce a test that shows this? Anything else interesting in the log? > Of course, there could be a bug. > > -Jordan > > On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList > <[email protected]> wrote: > > > Hi > > > > I made a short test as following: > > > > - I have a chorum of 3 nodes for Zookeeper. > > - I wrote a class using Curator QueueProducer who produces all the time > > (when the queue is 10% full, it creates new items) , items (random integer) > > - I wrote a simple class using Curator Queue Consumer which simply prints > > to Log "consumed item i". > > > > I tested some different combinations : > > - running the consumers on one, two or three nodes. > > - running one or more consumers in parallel on a given node. > > > > > > But, and here is my question: I see some very strange behavior when I have > > several consummers in parallel on a node. For example, running 5 consumers > > per node on 3 nodes, I see a throughput **very** slow. When looking at my > > Log, I see that most of the consumers are most of the time on an idle > > state.... > > > > Do I mistake somewhere? > > I was expecting to enhance the throughput by augmenting the number of > > consumers, I am surprised to see the opposite.... > > > > Thanks a lot > > > > Benjamin > >
