First of all I would like to thank for the quick support I've received until now, although all the problems were between chair and keyboard until now (and it is probably the case with the following):

The attached source code does the following:
- from the main thread it creates the context (from the documentation [1] I understand that this can be passed around the threads)
    - I create two threads: one for receiving and one for sending
- the sender thread uses a queue to buffer the messages for sending, so that they can be sent from the same thread - each thread creates its own socket (because sockets can't be passed between threads AFAIK) - the sender creates a PUB socket, while the receiver a SUB socket - the main thread waits for both the .bind and the .connect call to succeed in their respective threads - when a thread detects an exception, it closes its socket (to clean up)

The following scenarios all work:
    - calling context.term() from the main thread (where it was created)
- calling context.term() from a new thread (not the main, nor the sender/receiver threads)

However the following doesn't work:
- calling context.term() from the sender thread (so I would do subscribeSocket.close(); context.term();). It gives the following error message and terminates the process:
        Assertion failed: nbytes == sizeof (command_t) (mailbox.cpp:245)

My questions would be:
- What am I doing wrong? I'm trying to achieve a clean stop (ie. if either the sender or the receiver die, the other should be stopped too) - Is there a more performant way to send messages //from an arbitrary thread// than creating one socket / thread or queuing them to a single thread?

Best regards,
Attila Balazs

[1] http://api.zeromq.org/2-1:zmq-init
package com.blogspot.hypefree.jzmqtest;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import org.zeromq.ZMQ;
import org.zeromq.ZMQException;
import org.zeromq.ZMQ.Context;

public class TestJZMQ {
	public static void main(String... args) throws Exception {
		while (true) {
			final ZMQ.Context context = ZMQ.context(1);
			
			long time = System.currentTimeMillis();
			CountDownLatch boundLatch = new CountDownLatch(1);
			Sender sender = new Sender(context, "inproc://test", boundLatch);
			final Thread senderThread = startThread(boundLatch, sender, "Sender@" + time, "Failed to bind");
			
			CountDownLatch subscribedLatch = new CountDownLatch(1);
			Receiver receiver = new Receiver(context, "inproc://test", subscribedLatch);
			final Thread receiverThread = startThread(subscribedLatch, receiver, "Receiver@" + time, "Failed to start receiver");
			
			for (int i = 0; i < 100; ++i) {
				sender.send("Hello World!".getBytes());
				Thread.sleep(10);
			}
			
			new Thread(new Runnable() {
				@Override
				public void run() {
					receiverThread.interrupt();
					senderThread.interrupt();
					context.term();
				}
			}).start();
			senderThread.join();
			receiverThread.join();
		}
	}

	private static Thread startThread(CountDownLatch boundLatch,
			Runnable runnable, String name, String errorMessage) throws Exception {
		Thread thread = new Thread(null, runnable, name);
		thread.setDaemon(true);
		thread.start();
		if (!boundLatch.await(10, TimeUnit.SECONDS)) {
			throw new Error(errorMessage + " (in 10 seconds)");
		}
		return thread;
	}
	
	static class Receiver implements Runnable {
		private final String subscribeAddress;

		private final Context context;

		private final CountDownLatch subscribedLatch;
		
		Receiver(ZMQ.Context context, String subscribeAddress, CountDownLatch subscribedLatch) {
	        this.context = context;
			this.subscribeAddress = subscribeAddress;
			this.subscribedLatch = subscribedLatch;
	    }

	    @Override
	    public void run() {
	        ZMQ.Socket subscribeSocket = context.socket(ZMQ.SUB);
	        
	        try {
	        	subscribeSocket.connect(subscribeAddress);
	        	subscribeSocket.subscribe(new byte[0]);
	        	subscribedLatch.countDown();
	        	dispatchLoop(subscribeSocket);
	        } catch (Throwable ex) {
	        	ex.printStackTrace();
	        }
	        
	    	subscribeSocket.close();
	    }

		private void dispatchLoop(ZMQ.Socket subscribeSocket) {
			Thread currentThread = Thread.currentThread();
			while (!currentThread.isInterrupted()) {
	            byte[] messageBytes;
	            try {
	                messageBytes = subscribeSocket.recv(0);
	            } catch (ZMQException ex) {
	                if (ZMQ.Error.ETERM.getCode() == ex.getErrorCode()) {
	                    System.err.println(String.format("Terminating ZeroMQ dispatch thread %s on request", currentThread.getName()));
	                    break;
	                }
	                throw ex;
	            }
	            
	            System.out.println("Received: " + new String(messageBytes));
	        }
		}		
	}
	
	static class Sender implements Runnable {
		private final Context context;
		
		private final String publishAddress;
		
		private final Queue<byte[]> messages;
		
		private final Semaphore messageCount;
		
		private final CountDownLatch boundLatch;		

		Sender(ZMQ.Context context, String publishAddress, CountDownLatch boundLatch) {
			this.context = context;
			this.publishAddress = publishAddress;
			this.boundLatch = boundLatch;
			this.messages = new ConcurrentLinkedQueue<byte[]>();
			this.messageCount = new Semaphore(0);
		}
		
		void send(byte[] message) {
			messages.add(message);
			messageCount.release();
		}
		
		@Override
		public void run() {
	        ZMQ.Socket publishSocket = context.socket(ZMQ.PUB);
	        
	    	try {
	    		publishSocket.bind(publishAddress);
	    		boundLatch.countDown();
	    		sendLoop(publishSocket);
	    	} catch (Throwable ex) {
	    		ex.printStackTrace();
	    	}

	    	publishSocket.close();
		}
		
		private void sendLoop(ZMQ.Socket publishSocket) {
			Thread currentThread = Thread.currentThread();
	        while (!currentThread.isInterrupted()) {
	        	try {
					messageCount.acquire();
				} catch (InterruptedException e) {
					break;
				}
				
				byte[] message = messages.remove();;
				
				try {
					publishSocket.send(message, 0);
	            } catch (ZMQException ex) {
	                if (ZMQ.Error.ETERM.getCode() == ex.getErrorCode()) {
	                	System.err.println(
                			String.format("Terminating ZeroMQ send thread %s on request", 
            					currentThread.getName()));
	                    break;
	                }
	                throw ex;
	            }
	        }
		}
	}
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to