Author: pcl Date: Fri May 11 09:12:07 2007 New Revision: 537221 URL: http://svn.apache.org/viewvc?view=rev&rev=537221 Log: OPENJPA-230. Updated patch based on an out-of-band patch from Vishal. This version avoids interrupts while still handling guaranteed delivery of messages in the queue.
Modified: incubator/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java Modified: incubator/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java URL: http://svn.apache.org/viewvc/incubator/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java?view=diff&rev=537221&r1=537220&r2=537221 ============================================================================== --- incubator/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java (original) +++ incubator/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java Fri May 11 09:12:07 2007 @@ -25,7 +25,6 @@ import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import java.io.OptionalDataException; import java.io.OutputStream; import java.net.InetAddress; import java.net.ServerSocket; @@ -40,6 +39,8 @@ import java.util.LinkedList; import java.util.Map; import java.util.Set; +import java.util.List; +import java.util.Collections; import org.apache.commons.pool.PoolableObjectFactory; import org.apache.commons.pool.impl.GenericObjectPool; @@ -50,6 +51,7 @@ import org.apache.openjpa.util.InternalException; import org.apache.openjpa.util.Serialization; import org.apache.openjpa.lib.util.concurrent.ReentrantLock; +import org.apache.openjpa.lib.util.concurrent.Concurrent; import serp.util.Strings; @@ -68,14 +70,14 @@ private static final int DEFAULT_PORT = 5636; - private static Localizer s_loc = Localizer.forPackage + private static final Localizer s_loc = Localizer.forPackage (TCPRemoteCommitProvider.class); private static long s_idSequence = System.currentTimeMillis(); // A map of listen ports to listeners in this JVM. We might // want to look into allowing same port, different interface -- // that is not currently possible in a single JVM. - private static Map s_portListenerMap = new HashMap(); + private static final Map s_portListenerMap = new HashMap(); private long _id; private byte[] _localhost; @@ -85,7 +87,8 @@ private int _recoveryTimeMillis = 15000; private TCPPortListener _listener; private BroadcastQueue _broadcastQueue = new BroadcastQueue(); - private LinkedList _broadcastThreads = new LinkedList(); + private final List _broadcastThreads = Collections.synchronizedList( + new LinkedList()); private ArrayList _addresses = new ArrayList(); private ReentrantLock _addressesLock; @@ -177,7 +180,7 @@ // Threads will not end until they send another pk. for (int i = numBroadcastThreads; i < cur; i++) { BroadcastWorkerThread worker = (BroadcastWorkerThread) - _broadcastThreads.removeFirst(); + _broadcastThreads.remove(0); worker.setRunning(false); } } else if (cur < numBroadcastThreads) { @@ -186,7 +189,7 @@ BroadcastWorkerThread wt = new BroadcastWorkerThread(); wt.setDaemon(true); wt.start(); - _broadcastThreads.addLast(wt); + _broadcastThreads.add(wt); } } } @@ -364,6 +367,18 @@ if (_listener != null) _listener.removeProvider(this); + // Remove Broadcast Threads then close sockets. + _broadcastQueue.close(); + + // Wait for _broadcastThreads to get cleaned up. + while(!_broadcastThreads.isEmpty()) { + try { + Thread.sleep(500); + } catch (InterruptedException ie) { + // Ignore. + } + } + _addressesLock.lock(); try { for (Iterator iter = _addresses.iterator(); iter.hasNext();) @@ -371,16 +386,6 @@ } finally { _addressesLock.unlock(); } - - // We are done transmitting. Interrupt any worker threads. - synchronized (_broadcastThreads) { - Thread worker; - for (Iterator iter = _broadcastThreads.iterator(); - iter.hasNext();) { - worker = (Thread) iter.next(); - worker.interrupt(); - } - } } /** @@ -390,18 +395,38 @@ */ private static class BroadcastQueue { - LinkedList _packetQueue = new LinkedList(); + private LinkedList _packetQueue = new LinkedList(); + private boolean _closed = false; + + public synchronized void close() { + _closed = true; + notifyAll(); + } + + public synchronized boolean isClosed() { + return _closed; + } public synchronized void addPacket(byte[] bytes) { _packetQueue.addLast(bytes); notify(); } + /** + * @return the bytes defining the packet to process, or + * <code>null</code> if the queue is empty. + */ public synchronized byte[] removePacket() throws InterruptedException { - while (_packetQueue.isEmpty()) + // only wait if the queue is still open. This allows processing + // of events in the queue to continue, while avoiding sleeping + // during shutdown. + while (!_closed && _packetQueue.isEmpty()) wait(); - return (byte[]) _packetQueue.removeFirst(); + if (_packetQueue.isEmpty()) + return null; + else + return (byte[]) _packetQueue.removeFirst(); } } @@ -416,19 +441,28 @@ public void run() { while (_keepRunning) { try { - // This will block until there is a packet to send. + // This will block until there is a packet to send, or + // until the queue is closed. byte[] bytes = _broadcastQueue.removePacket(); - sendUpdatePacket(bytes); + if (bytes != null) + sendUpdatePacket(bytes); + else if (_broadcastQueue.isClosed()) + _keepRunning = false; } catch (InterruptedException e) { // End the thread. break; } } + remove(); } public void setRunning(boolean keepRunning) { _keepRunning = keepRunning; } + + private void remove() { + _broadcastThreads.remove(this); + } } /** @@ -437,14 +471,11 @@ private static class TCPPortListener implements Runnable { - private static Localizer s_loc = Localizer.forPackage - (TCPPortListener.class); - private final Log _log; private ServerSocket _receiveSocket; private Thread _acceptThread; private Set _receiverThreads = new HashSet(); - private Set _providers = new HashSet(); + private final Set _providers = new HashSet(); /** * Cache the local IP address @@ -466,7 +497,7 @@ * Construct a new TCPPortListener configured to use the specified port. */ private TCPPortListener(int port, Log log) - throws UnknownHostException, IOException { + throws IOException { _port = port; _log = log; _receiveSocket = new ServerSocket(_port); @@ -637,19 +668,19 @@ + ":" + _s.getPort())); } break; - } catch (Exception e) { + } catch (Throwable e) { if (_log.isWarnEnabled()) _log.warn(s_loc.get("tcp-receive-error"), e); break; - } catch (Throwable t) { } } // We are done receiving on this socket and this worker // thread is terminating. try { _in.close(); - _s.close(); - } catch (Exception e) { + if (_s != null) + _s.close(); + } catch (IOException e) { _log.warn(s_loc.get("tcp-close-socket-error", _s.getInetAddress().getHostAddress() + ":" + _s.getPort()), e); @@ -658,11 +689,10 @@ /** * Process an [EMAIL PROTECTED] InputStream} containing objects written - * by [EMAIL PROTECTED] TCPRemoteCommitProvider#broadcastCommitInfo}. + * by [EMAIL PROTECTED] TCPRemoteCommitProvider#broadcast(RemoteCommitEvent)}. */ private void handle(InputStream in) - throws IOException, ClassNotFoundException, - OptionalDataException { + throws IOException, ClassNotFoundException { // This will block waiting for the next ObjectInputStream ois = new Serialization.ClassResolvingObjectInputStream(in);