Author: pcl
Date: Fri May 11 09:12:07 2007
New Revision: 537221

URL: http://svn.apache.org/viewvc?view=rev&rev=537221
Log:
OPENJPA-230. Updated patch based on an out-of-band patch from Vishal. This 
version avoids interrupts while still handling guaranteed delivery of messages 
in the queue.

Modified:
    
incubator/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java

Modified: 
incubator/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java
URL: 
http://svn.apache.org/viewvc/incubator/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java?view=diff&rev=537221&r1=537220&r2=537221
==============================================================================
--- 
incubator/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java
 (original)
+++ 
incubator/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java
 Fri May 11 09:12:07 2007
@@ -25,7 +25,6 @@
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.io.OptionalDataException;
 import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.ServerSocket;
@@ -40,6 +39,8 @@
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Set;
+import java.util.List;
+import java.util.Collections;
 
 import org.apache.commons.pool.PoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericObjectPool;
@@ -50,6 +51,7 @@
 import org.apache.openjpa.util.InternalException;
 import org.apache.openjpa.util.Serialization;
 import org.apache.openjpa.lib.util.concurrent.ReentrantLock;
+import org.apache.openjpa.lib.util.concurrent.Concurrent;
 
 import serp.util.Strings;
 
@@ -68,14 +70,14 @@
 
     private static final int DEFAULT_PORT = 5636;
 
-    private static Localizer s_loc = Localizer.forPackage
+    private static final Localizer s_loc = Localizer.forPackage
         (TCPRemoteCommitProvider.class);
     private static long s_idSequence = System.currentTimeMillis();
 
     // A map of listen ports to listeners in this JVM. We might
     // want to look into allowing same port, different interface --
     // that is not currently possible in a single JVM.
-    private static Map s_portListenerMap = new HashMap();
+    private static final Map s_portListenerMap = new HashMap();
 
     private long _id;
     private byte[] _localhost;
@@ -85,7 +87,8 @@
     private int _recoveryTimeMillis = 15000;
     private TCPPortListener _listener;
     private BroadcastQueue _broadcastQueue = new BroadcastQueue();
-    private LinkedList _broadcastThreads = new LinkedList();
+    private final List _broadcastThreads = Collections.synchronizedList(
+        new LinkedList());
 
     private ArrayList _addresses = new ArrayList();
     private ReentrantLock _addressesLock;
@@ -177,7 +180,7 @@
                 // Threads will not end until they send another pk.
                 for (int i = numBroadcastThreads; i < cur; i++) {
                     BroadcastWorkerThread worker = (BroadcastWorkerThread)
-                        _broadcastThreads.removeFirst();
+                        _broadcastThreads.remove(0);
                     worker.setRunning(false);
                 }
             } else if (cur < numBroadcastThreads) {
@@ -186,7 +189,7 @@
                     BroadcastWorkerThread wt = new BroadcastWorkerThread();
                     wt.setDaemon(true);
                     wt.start();
-                    _broadcastThreads.addLast(wt);
+                    _broadcastThreads.add(wt);
                 }
             }
         }
@@ -364,6 +367,18 @@
         if (_listener != null)
             _listener.removeProvider(this);
 
+        // Remove Broadcast Threads then close sockets.
+        _broadcastQueue.close();
+
+        // Wait for _broadcastThreads to get cleaned up.
+        while(!_broadcastThreads.isEmpty()) {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException ie) {
+                // Ignore.
+            }
+        }
+        
         _addressesLock.lock();
         try {
             for (Iterator iter = _addresses.iterator(); iter.hasNext();)
@@ -371,16 +386,6 @@
         } finally {
             _addressesLock.unlock();
         }
-
-        // We are done transmitting. Interrupt any worker threads.
-        synchronized (_broadcastThreads) {
-            Thread worker;
-            for (Iterator iter = _broadcastThreads.iterator();
-                iter.hasNext();) {
-                worker = (Thread) iter.next();
-                worker.interrupt();
-            }
-        }
     }
 
     /**
@@ -390,18 +395,38 @@
      */
     private static class BroadcastQueue {
 
-        LinkedList _packetQueue = new LinkedList();
+        private LinkedList _packetQueue = new LinkedList();
+        private boolean _closed = false;
+
+        public synchronized void close() {
+            _closed = true;
+            notifyAll();
+        }
+
+        public synchronized boolean isClosed() {
+            return _closed;
+        }
 
         public synchronized void addPacket(byte[] bytes) {
             _packetQueue.addLast(bytes);
             notify();
         }
 
+        /**
+         * @return the bytes defining the packet to process, or
+         * <code>null</code> if the queue is empty.
+         */
         public synchronized byte[] removePacket()
             throws InterruptedException {
-            while (_packetQueue.isEmpty())
+            // only wait if the queue is still open. This allows processing
+            // of events in the queue to continue, while avoiding sleeping
+            // during shutdown.
+            while (!_closed && _packetQueue.isEmpty())
                 wait();
-            return (byte[]) _packetQueue.removeFirst();
+            if (_packetQueue.isEmpty())
+                return null;
+            else
+                return (byte[]) _packetQueue.removeFirst();
         }
     }
 
@@ -416,19 +441,28 @@
         public void run() {
             while (_keepRunning) {
                 try {
-                    // This will block until there is a packet to send.
+                    // This will block until there is a packet to send, or
+                    // until the queue is closed.
                     byte[] bytes = _broadcastQueue.removePacket();
-                    sendUpdatePacket(bytes);
+                    if (bytes != null)
+                        sendUpdatePacket(bytes);
+                    else if (_broadcastQueue.isClosed())
+                        _keepRunning = false;
                 } catch (InterruptedException e) {
                     // End the thread.
                     break;
                 }
             }
+            remove();
         }
 
         public void setRunning(boolean keepRunning) {
             _keepRunning = keepRunning;
         }
+        
+        private void remove() {
+            _broadcastThreads.remove(this);
+        }
     }
 
     /**
@@ -437,14 +471,11 @@
     private static class TCPPortListener
         implements Runnable {
 
-        private static Localizer s_loc = Localizer.forPackage
-            (TCPPortListener.class);
-
         private final Log _log;
         private ServerSocket _receiveSocket;
         private Thread _acceptThread;
         private Set _receiverThreads = new HashSet();
-        private Set _providers = new HashSet();
+        private final Set _providers = new HashSet();
 
         /**
          * Cache the local IP address
@@ -466,7 +497,7 @@
          * Construct a new TCPPortListener configured to use the specified 
port.
          */
         private TCPPortListener(int port, Log log)
-            throws UnknownHostException, IOException {
+            throws IOException {
             _port = port;
             _log = log;
             _receiveSocket = new ServerSocket(_port);
@@ -637,19 +668,19 @@
                                     + ":" + _s.getPort()));
                         }
                         break;
-                    } catch (Exception e) {
+                    } catch (Throwable e) {
                         if (_log.isWarnEnabled())
                             _log.warn(s_loc.get("tcp-receive-error"), e);
                         break;
-                    } catch (Throwable t) {
                     }
                 }
                 // We are done receiving on this socket and this worker
                 // thread is terminating.
                 try {
                     _in.close();
-                    _s.close();
-                } catch (Exception e) {
+                    if (_s != null)
+                        _s.close();
+                } catch (IOException e) {
                     _log.warn(s_loc.get("tcp-close-socket-error",
                         _s.getInetAddress().getHostAddress() + ":"
                             + _s.getPort()), e);
@@ -658,11 +689,10 @@
 
             /**
              * Process an [EMAIL PROTECTED] InputStream} containing objects 
written
-             * by [EMAIL PROTECTED] 
TCPRemoteCommitProvider#broadcastCommitInfo}.
+             * by [EMAIL PROTECTED] 
TCPRemoteCommitProvider#broadcast(RemoteCommitEvent)}.
              */
             private void handle(InputStream in)
-                throws IOException, ClassNotFoundException,
-                OptionalDataException {
+                throws IOException, ClassNotFoundException {
                 // This will block waiting for the next
                 ObjectInputStream ois = 
                     new Serialization.ClassResolvingObjectInputStream(in);


Reply via email to