First at all , thank you for your answer.
Here is the simple code, I used:
The producer and queueconsummer are given in the class
Every 5 minutes, I am printing the the number of processed items, and I see
some drastic differences between the different consumers:
Producer:
=-=-=-=-=
package com.zk;
import java.io.Closeable;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.test.TestingServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class QueueProducer implements Closeable {
final static Logger LOG = LoggerFactory.getLogger(QueueProducer.class);
final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
protected static final String PATH = "/test_queue";
protected static final String LOCK_PATH = "/test_lock_queue";
private DistributedQueue<CrawlUrl> queue;
private static final int QUEUE_SIZE = 100000;
private int items;
public QueueProducer(CuratorFramework framework) throws Exception {
LOG.info(java.net.InetAddress.getLocalHost().getHostName() + " is a
QueueProducer");
System.out.println(java.net.InetAddress.getLocalHost().getHostName() + " is
a QueueProducer");
this.queue = Utils.newDistributedQueue(framework,
Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, null);
this.queue.start();
addQueueContent(QUEUE_SIZE);
System.out.println("Done with the initial init");
// We register to the listener for monitoring the number of elements
// in the queue
framework.getCuratorListenable().addListener(new CuratorListener() {
@Override
public void eventReceived(final CuratorFramework framework_,
CuratorEvent event) throws Exception {
if (event.getPath() != null &&
event.getPath().equals(Utils.CRAWL_QUEUE_PATH)) {
// this also restores the notification
List<String> children = framework_.getChildren()
.watched().forPath(Utils.CRAWL_QUEUE_PATH);
if (children.size() <= QUEUE_SIZE/2) {
addQueueContent(QUEUE_SIZE - children.size());
}
}
}
});
while (true) {
List<String> children =
framework.getChildren().watched().forPath(Utils.CRAWL_QUEUE_PATH);
if (children.size() <= QUEUE_SIZE/2) {
LOG.info(dateFormat.format(new Date()) + " - In the
while(true) - We call for size " + children.size());
addQueueContent(QUEUE_SIZE - children.size());
}
Thread.sleep(5000);
}
}
void addQueueContent(int numberOfItems) {
LOG.info(dateFormat.format(new Date()) + " - addQueueContent " +
numberOfItems);
for (int i = 0; i < numberOfItems; i++) {
try {
CrawlUrl url = new CrawlUrl(""+this.items++);
this.queue.put(url);
} catch (Exception e) {
LOG.error ("Caught an error when adding the item " + i + "
in the initQueueContent()");
}
}
}
public static void main(String[] args) {
CrawlerPropertyFile props;
try {
props = new CrawlerPropertyFile(args[0]);
final String connectString;
System.out.println("DEBUG = " + Utils.DEBUG);
if (props.useZkTestServer()) {
System.out.println("Will launch from zkTestServer");
TestingServer server = new TestingServer();
connectString = server.getConnectString();
} else {
connectString = props.getZkServer();
}
final CuratorFramework framework =
Utils.newFramework(connectString);
framework.start();
@SuppressWarnings("unused")
QueueProducer producer = new QueueProducer(framework);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() throws IOException {
this.queue.close();
}
}
Consumer
=-=-=-=-=-
package com.zk;
import java.io.Closeable;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.test.TestingServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyQueueConsumer implements Closeable{
private DistributedQueue<CrawlUrl> queue;
String name;
String id;
FileWriter timeCounter;
final static Logger LOG =
LoggerFactory.getLogger(MyQueueConsumer.class);
final static DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
int numberOfProcessedURL;
private com.zk.MyQueueConsumer.FileWriterThread timeCounterThread;
private class FileWriterThread extends Thread {
public FileWriterThread() {
// empty ctor
}
@Override
public void run() {
// We write the stats:
try {
while (true) {
MyQueueConsumer.this.timeCounter.write(dateFormat.format(new Date()) + " "+
"[numberOfProcessed="+MyQueueConsumer.this.numberOfProcessedURL +"]\n") ;
MyQueueConsumer.this.timeCounter.flush();
// Sleeps 5 minutes
Thread.sleep(300000);
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public MyQueueConsumer(CuratorFramework framework, final String id)
throws Exception {
this.id = id;
this.name = java.net.InetAddress.getLocalHost().getHostName();
this.timeCounter = new FileWriter(new File("MyQueueConsumer_"+
this.name + "_" +id + "_timeCounter.txt"));
// this.timeCounterThread = new FileWriterThread();
// this.timeCounterThread.start();
this.queue = Utils.newDistributedQueue(framework,
Utils.CRAWL_QUEUE_PATH, Utils.CRAWL_QUEUE_LOCK_PATH, new
QueueConsumer<CrawlUrl>() {
@Override
public void stateChanged(CuratorFramework client,
ConnectionState newState) {
System.out.println(String.format("[%s] connection state
changed to %s", id, newState));
}
@Override
public void consumeMessage(CrawlUrl url) throws Exception {
try {
LOG.info(dateFormat.format(new
Date(System.currentTimeMillis())) + "["+id+ "-" + MyQueueConsumer.this.name+
"] processed " + url.url);
MyQueueConsumer.this.numberOfProcessedURL++;
} catch (Exception e) {
LOG.error( "["+id+ "-" + MyQueueConsumer.this.name+ "]"
+ e.getMessage() + " for url " + url.url );
}
}
});
try {
this.queue.start();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
try {
CrawlerPropertyFile props = new CrawlerPropertyFile(args[0]);
final String connectString;
System.out.println("DEBUG = " + Utils.DEBUG);
if (props.useZkTestServer()) {
System.out.println("Will launch from zkTestServer");
TestingServer server = new TestingServer();
connectString = server.getConnectString();
} else {
connectString = props.getZkServer();
}
final CuratorFramework framework =
Utils.newFramework(connectString);
framework.start();
final MyQueueConsumer[] queueConsumers = new
MyQueueConsumer[props.getNumberOfWorkers()];
for (int i = 0; i < queueConsumers.length; i++) {
queueConsumers[i] = new MyQueueConsumer(framework, "id_"+i);
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// close workers
Throwable t = null;
LOG.info("We close the workers");
for (MyQueueConsumer queueConsumer : queueConsumers) {
try {
queueConsumer.close();
} catch (Throwable th) {
if (t == null) {
t = th;
}
}
}
// throw first exception that we encountered
if (t != null) {
throw new RuntimeException("some workers failed to
close", t);
}
}
});
}catch (Exception e ){
e.printStackTrace();
}
}
@Override
public void close() throws IOException {
this.queue.close();
}
}
Main
-=-=-
package com.zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
public class QueueTestMain {
/**
* @param args
*/
public static void main(String[] args) {
CrawlerPropertyFile props;
try {
props = new CrawlerPropertyFile(args[0]);
final String connectString;
System.out.println("DEBUG = " + Utils.DEBUG);
if (props.useZkTestServer()) {
System.out.println("Will launch from zkTestServer");
TestingServer server = new TestingServer();
connectString = server.getConnectString();
} else {
connectString = props.getZkServer();
}
final CuratorFramework framework =
Utils.newFramework(connectString);
framework.start();
if (args[1] != null && args[1].equalsIgnoreCase("true")) {
@SuppressWarnings("unused")
QueueProducer producer = new QueueProducer(framework);
} else {
final MyQueueConsumer[] queueConsumers = new
MyQueueConsumer[props.getNumberOfWorkers()];
for (int i = 0; i < queueConsumers.length; i++) {
queueConsumers[i] = new MyQueueConsumer(framework,
"id_"+i);
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// close workers
Throwable t = null;
for (MyQueueConsumer queueConsumer :
queueConsumers) {
try {
queueConsumer.close();
} catch (Throwable th) {
if (t == null) {
t = th;
}
}
}
// throw first exception that we encountered
if (t != null) {
throw new RuntimeException("some workers failed
to close", t);
}
}
});
}
}catch (Exception e ){
e.printStackTrace();
}
}
}
Example of output:
On Sun, Nov 17, 2013 at 10:14 PM, Jordan Zimmerman <
[email protected]> wrote:
> Can you produce a test that shows this? Anything else interesting in the
> log? Of course, there could be a bug.
>
> -Jordan
>
> On Nov 14, 2013, at 1:18 PM, Sznajder ForMailingList <
> [email protected]> wrote:
>
> > Hi
> >
> > I made a short test as following:
> >
> > - I have a chorum of 3 nodes for Zookeeper.
> > - I wrote a class using Curator QueueProducer who produces all the time
> (when the queue is 10% full, it creates new items) , items (random integer)
> > - I wrote a simple class using Curator Queue Consumer which simply
> prints to Log "consumed item i".
> >
> > I tested some different combinations :
> > - running the consumers on one, two or three nodes.
> > - running one or more consumers in parallel on a given node.
> >
> >
> > But, and here is my question: I see some very strange behavior when I
> have several consummers in parallel on a node. For example, running 5
> consumers per node on 3 nodes, I see a throughput **very** slow. When
> looking at my Log, I see that most of the consumers are most of the time on
> an idle state....
> >
> > Do I mistake somewhere?
> > I was expecting to enhance the throughput by augmenting the number of
> consumers, I am surprised to see the opposite....
> >
> > Thanks a lot
> >
> > Benjamin
>
>