Hi Jordan
Following your advice, I moved the processing of the event outside of the
listener.
However, I do not catch any event ....
What did I write wrong?
Thanks a lot!
package com..hrl.zk;
import java.io.Closeable;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.test.TestingServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com...crawler.CrawlUrl;
import com...crawler.Utils;
import com...main.CrawlerPropertyFile;
public class QueueProducer implements Closeable {
final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class);
final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
DistributedQueue<CrawlUrl> queue;
private static final int QUEUE_SIZE = 100;
int items;
int cnt;
private class QueueAdder extends Thread {
private int size;
public QueueAdder(int size) {
System.out.println("QueueAdder " + size);
this.size = size;
}
@Override
public void run() {
LOG.info("QueueAdder ! " );
for (int i = 0; i < this.size; i++ ) {
try {
QueueProducer.this.queue.put(new
CrawlUrl(""+QueueProducer.this.items++));
} catch (Exception e) {
e.printStackTrace();
}
}
LOG.info("DONE!");
}
}
public QueueProducer(CuratorFramework framework) throws Exception {
LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is a
QueueProducer");
System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is
a QueueProducer");
this.queue = Utils.newDistributedQueue(framework,
Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null);
this.queue.start();
addQueueContent(QUEUE_SIZE);
System.out.println("Done with the initial init");
// We register to the listener for monitoring the number of elements
// in the queue
framework.getCuratorListenable().addListener(new CuratorListener() {
@Override
public void eventReceived(final CuratorFramework framework_,
CuratorEvent event) throws Exception {
LOG.info("CNT " + QueueProducer.this.cnt);
if (QueueProducer.this.cnt++ <= QUEUE_SIZE /2) {
QueueAdder queueAdder = new QueueAdder(QUEUE_SIZE -
QueueProducer.this.cnt );
queueAdder.start();
}
}
});
}
void addQueueContent(int numberOfItems) {
LOG.info(dateFormat.format(new Date()) + " - addQueueContent " +
numberOfItems);
for (int i = 0; i < numberOfItems; i++) {
try {
CrawlUrl url = new CrawlUrl(""+this.items++);
this.queue.put(url);
} catch (Exception e) {
LOG.error ("Caught an error when adding the item " + i + "
in the initQueueContent()");
}
}
}
@Override
public void close() throws IOException {
this.queue.close();
}
}
On Mon, Nov 18, 2013 at 12:59 AM, Jordan Zimmerman <
[email protected]> wrote:
> I don’t think there’s any reason to assume that each consumer will process
> an equal number of messages.
>
> -JZ
>
> On Nov 17, 2013, at 2:51 PM, Sznajder ForMailingList <
> [email protected]> wrote:
>
> Hi Jordan..
>
> Regarding the output:
>
> As you can see the LOG prints the name of the consumer and "processed "
> and the item....
>
> I am running the program with 4 servers in my chorum:
> ir-hadoop1 server is a producer
> ir-hadoop2--> ir-hadoop4 are consumers.
>
> After 14 minutes, I simply count the number of procssed items on each one
> of the consumer (a simplistic grep on the LOG file) and I get the folliwng:
>
> ir-hadoop2 : 3042 processed items
> ir-hadoop3 : 1276 processed items
> ir-hadoop4 : 830 processed items...
>
> If I have a look at the procssed times, I can see that ir-hadoop4 , is
> most of the time idle... I attach here the LOG corresponding to ir-hadoop4
> for example
>
> Benjamin
>
>
> On Mon, Nov 18, 2013 at 12:03 AM, Jordan Zimmerman <
> [email protected]> wrote:
>
>> The example out is missing. Please provide that too.
>>
>> -Jordan
>>
>> On Nov 17, 2013, at 1:20 PM, Sznajder ForMailingList <
>> [email protected]> wrote:
>>
>> First at all , thank you for your answer.
>>
>> Here is the simple code, I used:
>>
>> The producer and queueconsummer are given in the class
>>
>> Every 5 minutes, I am printing the the number of processed items, and I
>> see some drastic differences between the different consumers:
>>
>>
>>
>> Producer:
>> =-=-=-=-=
>>
>> package com.zk;
>>
>> import java.io.Closeable;
>> import java.io.IOException;
>> import java.text.DateFormat;
>> import java.text.SimpleDateFormat;
>> import java.util.Date;
>> import java.util.List;
>>
>> import org.apache.curator.framework.CuratorFramework;
>> import org.apache.curator.framework.api.CuratorEvent;
>> import org.apache.curator.framework.api.CuratorListener;
>> import org.apache.curator.framework.recipes.queue.DistributedQueue;
>> import org.apache.curator.test.TestingServer;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>>
>> public class QueueProducer implements Closeable {
>>
>> final static Logger LOG =
>> LoggerFactory.getLogger(QueueProducer.class);
>>
>> final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>>
>> protected static final String PATH = "/test_queue";
>>
>> protected static final String LOCK_PATH = "/test_lock_queue";
>>
>> private DistributedQueue<CrawlUrl> queue;
>>
>> private static final int QUEUE_SIZE = 100000;
>>
>> private int items;
>>
>> public QueueProducer(CuratorFramework framework) throws Exception {
>> LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is
>> a QueueProducer");
>>
>> System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is
>> a QueueProducer");
>> this.queue = Utils.newDistributedQueue(framework,
>> Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH,
>> null);
>> this.queue.start();
>> addQueueContent(QUEUE_SIZE);
>> System.out.println("Done with the initial init");
>>
>>
>> // We register to the listener for monitoring the number of
>> elements
>> // in the queue
>> framework.getCuratorListenable().addListener(new
>> CuratorListener() {
>> @Override
>> public void eventReceived(final CuratorFramework framework_,
>> CuratorEvent event) throws Exception {
>> if (event.getPath() != null &&
>> event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) {
>> // this also restores the notification
>> List<String> children = framework_.getChildren()
>> .watched().forPath(Utils.CRAWL_QUEUE_PATH);
>> if (children.size() <= QUEUE_SIZE/2) {
>> addQueueContent(QUEUE_SIZE - children.size());
>> }
>> }
>> }
>> });
>>
>>
>> while (true) {
>> List<String> children =
>> framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH);
>> if (children.size() <= QUEUE_SIZE/2) {
>> LOG.info(dateFormat.format(new Date()) + " - In the
>> while(true) - We call for size " + children.size());
>> addQueueContent(QUEUE_SIZE - children.size());
>> }
>>
>> Thread.sleep(5000);
>>
>> }
>> }
>>
>> void addQueueContent(int numberOfItems) {
>> LOG.info(dateFormat.format(new Date()) + " - addQueueContent " +
>> numberOfItems);
>> for (int i = 0; i < numberOfItems; i++) {
>> try {
>> CrawlUrl url = new CrawlUrl(""+this.items++);
>> this.queue.put(url);
>> } catch (Exception e) {
>> LOG.error ("Caught an error when adding the item " + i +
>> " in the initQueueContent()");
>> }
>> }
>> }
>>
>> public static void main(String[] args) {
>> CrawlerPropertyFile props;
>> try {
>> props = new CrawlerPropertyFile(args[0]);
>>
>> final String connectString;
>> System.out.println("DEBUG = " + Utils.DEBUG);
>> if (props.useZkTestServer()) {
>> System.out.println("Will launch from zkTestServer");
>> TestingServer server = new TestingServer();
>> connectString = server.getConnectString();
>> } else {
>> connectString = props.getZkServer();
>> }
>>
>> final CuratorFramework framework =
>> Utils.newFramework(connectString);
>> framework.start();
>>
>> @SuppressWarnings("unused")
>> QueueProducer producer = new QueueProducer(framework);
>> } catch (Exception e) {
>> e.printStackTrace();
>> }
>>
>> }
>>
>> @Override
>> public void close() throws IOException {
>> this.queue.close();
>> }
>>
>>
>>
>> }
>>
>>
>>
>>
>> Consumer
>> =-=-=-=-=-
>>
>> package com.zk;
>>
>> import java.io.Closeable;
>> import java.io.File;
>> import java.io.FileWriter;
>> import java.io.IOException;
>> import java.text.DateFormat;
>> import java.text.SimpleDateFormat;
>> import java.util.Date;
>>
>> import org.apache.curator.framework.CuratorFramework;
>> import org.apache.curator.framework.recipes.queue.DistributedQueue;
>> import org.apache.curator.framework.recipes.queue.QueueConsumer;
>> import org.apache.curator.framework.state.ConnectionState;
>> import org.apache.curator.test.TestingServer;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>>
>>
>> public class MyQueueConsumer implements Closeable{
>>
>> private DistributedQueue<CrawlUrl> queue;
>>
>> String name;
>>
>> String id;
>>
>> FileWriter timeCounter;
>>
>> final static Logger LOG =
>> LoggerFactory.getLogger(MyQueueConsumer.class);
>>
>> final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
>>
>> int numberOfProcessedURL;
>>
>> private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread;
>>
>> private class FileWriterThread extends Thread {
>>
>> public FileWriterThread() {
>> // empty ctor
>> }
>>
>> @Override
>> public void run() {
>> // We write the stats:
>>
>> try {
>> while (true) {
>>
>> MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date()) + " "+
>>
>> "[numberOfProcessed="+MyQueueConsumer.this.numberOfProcessedURL +"]\n") ;
>> MyQueueConsumer.this.timeCounter.flush();
>>
>> // Sleeps 5 minutes
>> Thread.sleep(300000);
>> }
>> } catch (Exception e) {
>> // TODO Auto-generated catch block
>> e.printStackTrace();
>> }
>> }
>> }
>>
>>
>> public MyQueueConsumer(CuratorFramework framework, final String id)
>> throws Exception {
>> this.id = id;
>> this.name = java.net.InetAddress.getLocalHost().getHostName();
>> this.timeCounter = new FileWriter(new File("MyQueueConsumer_"+
>> this.name + "_" +id + "_timeCounter.txt"));
>>
>> // this.timeCounterThread = new FileWriterThread();
>> // this.timeCounterThread.start();
>> this.queue = Utils.newDistributedQueue(framework,
>> Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new
>> QueueConsumer<CrawlUrl>() {
>>
>> @Override
>> public void stateChanged(CuratorFramework client,
>> ConnectionState newState) {
>> System.out.println(String.format("[%s] connection state
>> changed to %s", id, newState));
>> }
>>
>> @Override
>> public void consumeMessage(CrawlUrl url) throws Exception {
>> try {
>> LOG.info(dateFormat.format(new
>> Date(System.currentTimeMillis())) + "["+id+ "-" +
>> MyQueueConsumer.this.name <http://myqueueconsumer.this.name/>+ "]
>> processed " + url.url);
>> MyQueueConsumer.this.numberOfProcessedURL++;
>> } catch (Exception e) {
>> LOG.error( "["+id+ "-" +
>> MyQueueConsumer.this.name<http://myqueueconsumer.this.name/>+
>> "]" + e.getMessage() + " for url " + url.url );
>> }
>> }
>>
>> });
>> try {
>> this.queue.start();
>> } catch (Exception e) {
>> e.printStackTrace();
>> }
>>
>> }
>>
>> public static void main(String[] args) {
>> try {
>> CrawlerPropertyFile props = new CrawlerPropertyFile(args[0]);
>>
>> final String connectString;
>> System.out.println("DEBUG = " + Utils.DEBUG);
>> if (props.useZkTestServer()) {
>> System.out.println("Will launch from zkTestServer");
>> TestingServer server = new TestingServer();
>> connectString = server.getConnectString();
>> } else {
>> connectString = props.getZkServer();
>> }
>>
>> final CuratorFramework framework =
>> Utils.newFramework(connectString);
>> framework.start();
>>
>> final MyQueueConsumer[] queueConsumers = new
>> MyQueueConsumer[props.getNumberOfWorkers()];
>>
>> for (int i = 0; i < queueConsumers.length; i++) {
>> queueConsumers[i] = new MyQueueConsumer(framework,
>> "id_"+i);
>> }
>>
>> Runtime.getRuntime().addShutdownHook(new Thread() {
>> @Override
>> public void run() {
>> // close workers
>> Throwable t = null;
>> LOG.info("We close the workers");
>> for (MyQueueConsumer queueConsumer : queueConsumers) {
>> try {
>> queueConsumer.close();
>> } catch (Throwable th) {
>> if (t == null) {
>> t = th;
>> }
>> }
>> }
>> // throw first exception that we encountered
>> if (t != null) {
>> throw new RuntimeException("some workers failed
>> to close", t);
>> }
>> }
>> });
>>
>> }catch (Exception e ){
>> e.printStackTrace();
>> }
>> }
>>
>> @Override
>> public void close() throws IOException {
>> this.queue.close();
>> }
>> }
>>
>>
>>
>>
>> Main
>> -=-=-
>>
>> package com.zk;
>>
>> import org.apache.curator.framework.CuratorFramework;
>> import org.apache.curator.test.TestingServer;
>>
>>
>>
>> public class QueueTestMain {
>>
>> /**
>> * @param args
>> */
>> public static void main(String[] args) {
>> CrawlerPropertyFile props;
>> try {
>> props = new CrawlerPropertyFile(args[0]);
>>
>> final String connectString;
>> System.out.println("DEBUG = " + Utils.DEBUG);
>> if (props.useZkTestServer()) {
>> System.out.println("Will launch from zkTestServer");
>> TestingServer server = new TestingServer();
>> connectString = server.getConnectString();
>> } else {
>> connectString = props.getZkServer();
>> }
>>
>> final CuratorFramework framework =
>> Utils.newFramework(connectString);
>> framework.start();
>>
>>
>> if (args[1] != null && args[1].equalsIgnoreCase("true")) {
>> @SuppressWarnings("unused")
>> QueueProducer producer = new QueueProducer(framework);
>> } else {
>>
>> final MyQueueConsumer[] queueConsumers = new
>> MyQueueConsumer[props.getNumberOfWorkers()];
>>
>> for (int i = 0; i < queueConsumers.length; i++) {
>> queueConsumers[i] = new MyQueueConsumer(framework,
>> "id_"+i);
>> }
>>
>> Runtime.getRuntime().addShutdownHook(new Thread() {
>> @Override
>> public void run() {
>> // close workers
>> Throwable t = null;
>> for (MyQueueConsumer queueConsumer :
>> queueConsumers) {
>> try {
>> queueConsumer.close();
>> } catch (Throwable th) {
>> if (t == null) {
>> t = th;
>> }
>> }
>> }
>> // throw first exception that we encountered
>> if (t != null) {
>> throw new RuntimeException("some workers
>> failed to close", t);
>> }
>> }
>> });
>>
>>
>> }
>> }catch (Exception e ){
>> e.printStackTrace();
>> }
>>
>> }
>>
>> }
>>
>>
>>
>>
>> Example of output:
>>
>>
>>
>>
>>
>> On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman <
>> [email protected]> wrote:
>>
>>> Can you produce a test that shows this? Anything else interesting in the
>>> log? Of course, there could be a bug.
>>>
>>> -Jordan
>>>
>>> On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList <
>>> [email protected]> wrote:
>>>
>>> > Hi
>>> >
>>> > I made a short test as following:
>>> >
>>> > - I have a chorum of 3 nodes for Zookeeper.
>>> > - I wrote a class using Curator QueueProducer who produces all the
>>> time (when the queue is 10% full, it creates new items) , items (random
>>> integer)
>>> > - I wrote a simple class using Curator Queue Consumer which simply
>>> prints to Log "consumed item i".
>>> >
>>> > I tested some different combinations :
>>> > - running the consumers on one, two or three nodes.
>>> > - running one or more consumers in parallel on a given node.
>>> >
>>> >
>>> > But, and here is my question: I see some very strange behavior when I
>>> have several consummers in parallel on a node. For example, running 5
>>> consumers per node on 3 nodes, I see a throughput **very** slow. When
>>> looking at my Log, I see that most of the consumers are most of the time on
>>> an idle state....
>>> >
>>> > Do I mistake somewhere?
>>> > I was expecting to enhance the throughput by augmenting the number of
>>> consumers, I am surprised to see the opposite....
>>> >
>>> > Thanks a lot
>>> >
>>> > Benjamin
>>>
>>>
>>
>>
> <ir-hadoop4_log.txt>
>
>
>