I was looking at QueueProducer:
// We register to the listener for monitoring the number of elements
// in the queue
framework.getCuratorListenable().addListener(new CuratorListener() {
@Override
public void eventReceived(final CuratorFramework framework_,
CuratorEvent event) throws Exception {
if (event.getPath() != null &&
event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) {
// this also restores the notification
List<String> children = framework_.getChildren()
.watched().forPath(Utils.CRAWL_QUEUE_PATH);
if (children.size() <= QUEUE_SIZE/2) {
addQueueContent(QUEUE_SIZE - children.size());
}
}
}
});
On Nov 17, 2013, at 1:53 PM, Sznajder ForMailingList <[email protected]>
wrote:
> Hi Jordan,
>
> Thanks again for your so quick answer.
>
> I would like to be sure about your hint.
>
> Do you mean that I should change the following method
>
> @Override
> public void consumeMessage(CrawlUrl url) throws Exception {
> try {
> LOG.info(dateFormat.format(new
> Date(System.currentTimeMillis())) + "["+id+ "-" + MyQueueConsumer.this.name+
> "] processed " + url.url);
> MyQueueConsumer.this.numberOfProcessedURL++;
> } catch (Exception e) {
> LOG.error( "["+id+ "-" + MyQueueConsumer.this.name+ "]" +
> e.getMessage() + " for url " + url.url );
> }
> }
>
>
>
> or the code in the QueueProducer?
>
> In this last case (code in the QueueProducer), I think it will not solve the
> problem: I can see in my logs that the consumers are idle even when the
> Producer does not do anything...
>
> Best regards
>
> Benjamin
>
>
>
>
> On Sun, Nov 17, 2013 at 11:33 PM, Jordan Zimmerman
> <[email protected]> wrote:
> I’ll look further at this, but the first thing that I notice is that you are
> doing “work” in your Curator Listener. Please read Curator Tech Note 1:
>
> https://cwiki.apache.org/confluence/display/CURATOR/TN1
>
> The quickest fix would be to do the getChildren() as a background operation.
> Alternatively, you can pass in a thread pool when registering the listener.
>
> -Jordan
>
> On Nov 17, 2013, at 1:20 PM, Sznajder ForMailingList
> <[email protected]> wrote:
>
>> First at all , thank you for your answer.
>>
>> Here is the simple code, I used:
>>
>> The producer and queueconsummer are given in the class
>>
>> Every 5 minutes, I am printing the the number of processed items, and I see
>> some drastic differences between the different consumers:
>>
>>
>>
>> Producer:
>> =-=-=-=-=
>>
>> package com.zk;
>>
>> import java.io.Closeable;
>> import java.io.IOException;
>> import java.text.DateFormat;
>> import java.text.SimpleDateFormat;
>> import java.util.Date;
>> import java.util.List;
>>
>> import org.apache.curator.framework.CuratorFramework;
>> import org.apache.curator.framework.api.CuratorEvent;
>> import org.apache.curator.framework.api.CuratorListener;
>> import org.apache.curator.framework.recipes.queue.DistributedQueue;
>> import org.apache.curator.test.TestingServer;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>>
>> public class QueueProducer implements Closeable {
>>
>> final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class);
>>
>> final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>>
>> protected static final String PATH = "/test_queue";
>>
>> protected static final String LOCK_PATH = "/test_lock_queue";
>>
>> private DistributedQueue<CrawlUrl> queue;
>>
>> private static final int QUEUE_SIZE = 100000;
>>
>> private int items;
>>
>> public QueueProducer(CuratorFramework framework) throws Exception {
>> LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is a
>> QueueProducer");
>> System.out.println(java.net.InetAddress.getLocalHost().getHostName()
>> + " is a QueueProducer");
>> this.queue = Utils.newDistributedQueue(framework,
>> Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null);
>> this.queue.start();
>> addQueueContent(QUEUE_SIZE);
>> System.out.println("Done with the initial init");
>>
>>
>> // We register to the listener for monitoring the number of elements
>> // in the queue
>> framework.getCuratorListenable().addListener(new CuratorListener() {
>> @Override
>> public void eventReceived(final CuratorFramework framework_,
>> CuratorEvent event) throws Exception {
>> if (event.getPath() != null &&
>> event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) {
>> // this also restores the notification
>> List<String> children = framework_.getChildren()
>> .watched().forPath(Utils.CRAWL_QUEUE_PATH);
>> if (children.size() <= QUEUE_SIZE/2) {
>> addQueueContent(QUEUE_SIZE - children.size());
>> }
>> }
>> }
>> });
>>
>>
>> while (true) {
>> List<String> children =
>> framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH);
>> if (children.size() <= QUEUE_SIZE/2) {
>> LOG.info(dateFormat.format(new Date()) + " - In the
>> while(true) - We call for size " + children.size());
>> addQueueContent(QUEUE_SIZE - children.size());
>> }
>>
>> Thread.sleep(5000);
>>
>> }
>> }
>>
>> void addQueueContent(int numberOfItems) {
>> LOG.info(dateFormat.format(new Date()) + " - addQueueContent " +
>> numberOfItems);
>> for (int i = 0; i < numberOfItems; i++) {
>> try {
>> CrawlUrl url = new CrawlUrl(""+this.items++);
>> this.queue.put(url);
>> } catch (Exception e) {
>> LOG.error ("Caught an error when adding the item " + i + "
>> in the initQueueContent()");
>> }
>> }
>> }
>>
>> public static void main(String[] args) {
>> CrawlerPropertyFile props;
>> try {
>> props = new CrawlerPropertyFile(args[0]);
>>
>> final String connectString;
>> System.out.println("DEBUG = " + Utils.DEBUG);
>> if (props.useZkTestServer()) {
>> System.out.println("Will launch from zkTestServer");
>> TestingServer server = new TestingServer();
>> connectString = server.getConnectString();
>> } else {
>> connectString = props.getZkServer();
>> }
>>
>> final CuratorFramework framework =
>> Utils.newFramework(connectString);
>> framework.start();
>>
>> @SuppressWarnings("unused")
>> QueueProducer producer = new QueueProducer(framework);
>> } catch (Exception e) {
>> e.printStackTrace();
>> }
>>
>> }
>>
>> @Override
>> public void close() throws IOException {
>> this.queue.close();
>> }
>>
>>
>>
>> }
>>
>>
>>
>>
>> Consumer
>> =-=-=-=-=-
>>
>> package com.zk;
>>
>> import java.io.Closeable;
>> import java.io.File;
>> import java.io.FileWriter;
>> import java.io.IOException;
>> import java.text.DateFormat;
>> import java.text.SimpleDateFormat;
>> import java.util.Date;
>>
>> import org.apache.curator.framework.CuratorFramework;
>> import org.apache.curator.framework.recipes.queue.DistributedQueue;
>> import org.apache.curator.framework.recipes.queue.QueueConsumer;
>> import org.apache.curator.framework.state.ConnectionState;
>> import org.apache.curator.test.TestingServer;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>>
>>
>> public class MyQueueConsumer implements Closeable{
>>
>> private DistributedQueue<CrawlUrl> queue;
>>
>> String name;
>>
>> String id;
>>
>> FileWriter timeCounter;
>>
>> final static Logger LOG = LoggerFactory.getLogger(MyQueueConsumer.class);
>>
>> final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>>
>> int numberOfProcessedURL;
>>
>> private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread;
>>
>> private class FileWriterThread extends Thread {
>>
>> public FileWriterThread() {
>> // empty ctor
>> }
>>
>> @Override
>> public void run() {
>> // We write the stats:
>>
>> try {
>> while (true) {
>>
>> MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date()) + " "+
>>
>> "[numberOfProcessed="+MyQueueConsumer.this.numberOfProcessedURL +"]\n") ;
>> MyQueueConsumer.this.timeCounter.flush();
>>
>> // Sleeps 5 minutes
>> Thread.sleep(300000);
>> }
>> } catch (Exception e) {
>> // TODO Auto-generated catch block
>> e.printStackTrace();
>> }
>> }
>> }
>>
>>
>> public MyQueueConsumer(CuratorFramework framework, final String id)
>> throws Exception {
>> this.id = id;
>> this.name = java.net.InetAddress.getLocalHost().getHostName();
>> this.timeCounter = new FileWriter(new File("MyQueueConsumer_"+
>> this.name + "_" +id + "_timeCounter.txt"));
>>
>> // this.timeCounterThread = new FileWriterThread();
>> // this.timeCounterThread.start();
>> this.queue = Utils.newDistributedQueue(framework,
>> Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new
>> QueueConsumer<CrawlUrl>() {
>>
>> @Override
>> public void stateChanged(CuratorFramework client,
>> ConnectionState newState) {
>> System.out.println(String.format("[%s] connection state
>> changed to %s", id, newState));
>> }
>>
>> @Override
>> public void consumeMessage(CrawlUrl url) throws Exception {
>> try {
>> LOG.info(dateFormat.format(new
>> Date(System.currentTimeMillis())) + "["+id+ "-" + MyQueueConsumer.this.name+
>> "] processed " + url.url);
>> MyQueueConsumer.this.numberOfProcessedURL++;
>> } catch (Exception e) {
>> LOG.error( "["+id+ "-" + MyQueueConsumer.this.name+ "]"
>> + e.getMessage() + " for url " + url.url );
>> }
>> }
>>
>> });
>> try {
>> this.queue.start();
>> } catch (Exception e) {
>> e.printStackTrace();
>> }
>>
>> }
>>
>> public static void main(String[] args) {
>> try {
>> CrawlerPropertyFile props = new CrawlerPropertyFile(args[0]);
>>
>> final String connectString;
>> System.out.println("DEBUG = " + Utils.DEBUG);
>> if (props.useZkTestServer()) {
>> System.out.println("Will launch from zkTestServer");
>> TestingServer server = new TestingServer();
>> connectString = server.getConnectString();
>> } else {
>> connectString = props.getZkServer();
>> }
>>
>> final CuratorFramework framework =
>> Utils.newFramework(connectString);
>> framework.start();
>>
>> final MyQueueConsumer[] queueConsumers = new
>> MyQueueConsumer[props.getNumberOfWorkers()];
>>
>> for (int i = 0; i < queueConsumers.length; i++) {
>> queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i);
>> }
>>
>> Runtime.getRuntime().addShutdownHook(new Thread() {
>> @Override
>> public void run() {
>> // close workers
>> Throwable t = null;
>> LOG.info("We close the workers");
>> for (MyQueueConsumer queueConsumer : queueConsumers) {
>> try {
>> queueConsumer.close();
>> } catch (Throwable th) {
>> if (t == null) {
>> t = th;
>> }
>> }
>> }
>> // throw first exception that we encountered
>> if (t != null) {
>> throw new RuntimeException("some workers failed to
>> close", t);
>> }
>> }
>> });
>>
>> }catch (Exception e ){
>> e.printStackTrace();
>> }
>> }
>>
>> @Override
>> public void close() throws IOException {
>> this.queue.close();
>> }
>> }
>>
>>
>>
>>
>> Main
>> -=-=-
>>
>> package com.zk;
>>
>> import org.apache.curator.framework.CuratorFramework;
>> import org.apache.curator.test.TestingServer;
>>
>>
>>
>> public class QueueTestMain {
>>
>> /**
>> * @param args
>> */
>> public static void main(String[] args) {
>> CrawlerPropertyFile props;
>> try {
>> props = new CrawlerPropertyFile(args[0]);
>>
>> final String connectString;
>> System.out.println("DEBUG = " + Utils.DEBUG);
>> if (props.useZkTestServer()) {
>> System.out.println("Will launch from zkTestServer");
>> TestingServer server = new TestingServer();
>> connectString = server.getConnectString();
>> } else {
>> connectString = props.getZkServer();
>> }
>>
>> final CuratorFramework framework =
>> Utils.newFramework(connectString);
>> framework.start();
>>
>>
>> if (args[1] != null && args[1].equalsIgnoreCase("true")) {
>> @SuppressWarnings("unused")
>> QueueProducer producer = new QueueProducer(framework);
>> } else {
>>
>> final MyQueueConsumer[] queueConsumers = new
>> MyQueueConsumer[props.getNumberOfWorkers()];
>>
>> for (int i = 0; i < queueConsumers.length; i++) {
>> queueConsumers[i] = new MyQueueConsumer(framework,
>> "id_"+i);
>> }
>>
>> Runtime.getRuntime().addShutdownHook(new Thread() {
>> @Override
>> public void run() {
>> // close workers
>> Throwable t = null;
>> for (MyQueueConsumer queueConsumer : queueConsumers)
>> {
>> try {
>> queueConsumer.close();
>> } catch (Throwable th) {
>> if (t == null) {
>> t = th;
>> }
>> }
>> }
>> // throw first exception that we encountered
>> if (t != null) {
>> throw new RuntimeException("some workers failed
>> to close", t);
>> }
>> }
>> });
>>
>>
>> }
>> }catch (Exception e ){
>> e.printStackTrace();
>> }
>>
>> }
>>
>> }
>>
>>
>>
>>
>> Example of output:
>>
>>
>>
>>
>>
>> On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman
>> <[email protected]> wrote:
>> Can you produce a test that shows this? Anything else interesting in the
>> log? Of course, there could be a bug.
>>
>> -Jordan
>>
>> On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList
>> <[email protected]> wrote:
>>
>> > Hi
>> >
>> > I made a short test as following:
>> >
>> > - I have a chorum of 3 nodes for Zookeeper.
>> > - I wrote a class using Curator QueueProducer who produces all the time
>> > (when the queue is 10% full, it creates new items) , items (random integer)
>> > - I wrote a simple class using Curator Queue Consumer which simply prints
>> > to Log "consumed item i".
>> >
>> > I tested some different combinations :
>> > - running the consumers on one, two or three nodes.
>> > - running one or more consumers in parallel on a given node.
>> >
>> >
>> > But, and here is my question: I see some very strange behavior when I have
>> > several consummers in parallel on a node. For example, running 5 consumers
>> > per node on 3 nodes, I see a throughput **very** slow. When looking at my
>> > Log, I see that most of the consumers are most of the time on an idle
>> > state....
>> >
>> > Do I mistake somewhere?
>> > I was expecting to enhance the throughput by augmenting the number of
>> > consumers, I am surprised to see the opposite....
>> >
>> > Thanks a lot
>> >
>> > Benjamin
>>
>>
>
>