Author: rhs
Date: Fri May  9 11:40:13 2008
New Revision: 654907

URL: http://svn.apache.org/viewvc?rev=654907&view=rev
Log:
QPID-1045: always notify incoming message queues of session closure and provide 
API for notifying listeners of closure; also preserve connection close code and 
report in errors

Modified:
    incubator/qpid/trunk/qpid/python/qpid/connection.py
    incubator/qpid/trunk/qpid/python/qpid/delegates.py
    incubator/qpid/trunk/qpid/python/qpid/exceptions.py
    incubator/qpid/trunk/qpid/python/qpid/framer.py
    incubator/qpid/trunk/qpid/python/qpid/queue.py
    incubator/qpid/trunk/qpid/python/qpid/session.py
    incubator/qpid/trunk/qpid/python/tests/connection.py

Modified: incubator/qpid/trunk/qpid/python/qpid/connection.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/connection.py?rev=654907&r1=654906&r2=654907&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/connection.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/connection.py Fri May  9 11:40:13 2008
@@ -19,8 +19,7 @@
 
 import datatypes, session
 from threading import Thread, Condition, RLock
-from util import wait
-from framer import Closed
+from util import wait, notify
 from assembler import Assembler, Segment
 from codec010 import StringCodec
 from session import Session
@@ -39,11 +38,11 @@
 
 class ConnectionFailed(Exception): pass
 
-def client(*args):
-  return delegates.Client(*args)
+def client(*args, **kwargs):
+  return delegates.Client(*args, **kwargs)
 
-def server(*args):
-  return delegates.Server(*args)
+def server(*args, **kwargs):
+  return delegates.Server(*args, **kwargs)
 
 class Connection(Assembler):
 
@@ -61,13 +60,14 @@
     self.condition = Condition()
     self.opened = False
     self.failed = False
+    self.close_code = (None, "connection aborted")
 
     self.thread = Thread(target=self.run)
     self.thread.setDaemon(True)
 
     self.channel_max = 65535
 
-    self.delegate = delegate(self, args)
+    self.delegate = delegate(self, **args)
 
   def attach(self, name, ch, delegate, force=False):
     self.lock.acquire()
@@ -101,6 +101,8 @@
       ssn = self.sessions.pop(name, None)
       if ssn is not None:
         ssn.channel = None
+        ssn.closed()
+        notify(ssn.condition)
         return ssn
     finally:
       self.lock.release()
@@ -127,13 +129,23 @@
     finally:
       self.lock.release()
 
+  def detach_all(self):
+    self.lock.acquire()
+    try:
+      for ssn in self.attached.values():
+        if self.close_code[0] != 200:
+          ssn.exceptions.append(self.close_code)
+        self.detach(ssn.name, ssn.channel)
+    finally:
+      self.lock.release()
+
   def start(self, timeout=None):
     self.delegate.start()
     self.thread.start()
     if not wait(self.condition, lambda: self.opened or self.failed, timeout):
       raise Timeout()
-    if (self.failed):
-      raise ConnectionFailed() 
+    if self.failed:
+      raise ConnectionFailed(*self.close_code)
 
   def run(self):
     # XXX: we don't really have a good way to exit this loop without
@@ -142,6 +154,7 @@
       try:
         seg = self.read_segment()
       except Closed:
+        self.detach_all()
         break
       self.delegate.received(seg)
 

Modified: incubator/qpid/trunk/qpid/python/qpid/delegates.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/delegates.py?rev=654907&r1=654906&r2=654907&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/delegates.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/delegates.py Fri May  9 11:40:13 2008
@@ -50,6 +50,7 @@
       ssn.received(seg)
 
   def connection_close(self, ch, close):
+    self.connection.close_code = (close.reply_code, close.reply_text)
     ch.connection_close_ok()
     self.connection.sock.close()
     if not self.connection.opened:
@@ -73,13 +74,11 @@
     notify(ch.session.condition)
 
   def session_detach(self, ch, d):
-    self.connection.detach(d.name, ch)
+    ssn = self.connection.detach(d.name, ch)
     ch.session_detached(d.name)
 
   def session_detached(self, ch, d):
-    ssn = self.connection.detach(d.name, ch)
-    if ssn is not None:
-      notify(ch.session.condition)
+    self.connection.detach(d.name, ch)
 
   def session_command_point(self, ch, cp):
     ssn = ch.session
@@ -127,11 +126,11 @@
                 "version": "development",
                 "platform": os.name}
 
-  def __init__(self, connection, args={}):
-    Delegate.__init__(self, connection)    
-    self.username = args.get('username', 'guest')
-    self.password = args.get('password', 'guest')
-    self.mechanism = args.get('mechanism', 'PLAIN')
+  def __init__(self, connection, username="guest", password="guest", 
mechanism="PLAIN"):
+    Delegate.__init__(self, connection)
+    self.username = username
+    self.password = password
+    self.mechanism = mechanism
 
   def start(self):
     self.connection.write_header(self.spec.major, self.spec.minor)

Modified: incubator/qpid/trunk/qpid/python/qpid/exceptions.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/exceptions.py?rev=654907&r1=654906&r2=654907&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/exceptions.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/exceptions.py Fri May  9 11:40:13 2008
@@ -17,4 +17,5 @@
 # under the License.
 #
 
+class Closed(Exception): pass
 class Timeout(Exception): pass

Modified: incubator/qpid/trunk/qpid/python/qpid/framer.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/framer.py?rev=654907&r1=654906&r2=654907&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/framer.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/framer.py Fri May  9 11:40:13 2008
@@ -18,6 +18,7 @@
 #
 
 import struct, socket
+from exceptions import Closed
 from packer import Packer
 from threading import Lock
 from logging import getLogger
@@ -66,8 +67,6 @@
                                      self.channel,
                                      self.payload)
 
-class Closed(Exception): pass
-
 class FramingError(Exception): pass
 
 class Framer(Packer):

Modified: incubator/qpid/trunk/qpid/python/qpid/queue.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/queue.py?rev=654907&r1=654906&r2=654907&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/queue.py Fri May  9 11:40:13 2008
@@ -25,8 +25,7 @@
 
 from Queue import Queue as BaseQueue, Empty, Full
 from threading import Thread
-
-class Closed(Exception): pass
+from exceptions import Closed
 
 class Queue(BaseQueue):
 
@@ -37,6 +36,7 @@
     BaseQueue.__init__(self, *args, **kwargs)
     self.error = None
     self.listener = None
+    self.exc_listener = None
     self.thread = None
 
   def close(self, error = None):
@@ -53,15 +53,20 @@
     else:
       return result
 
-  def listen(self, listener):
+  def listen(self, listener, exc_listener = None):
+    if listener is None and exc_listener is not None:
+      raise ValueError("cannot set exception listener without setting 
listener")
+
     self.listener = listener
-    if listener == None:
-      if self.thread != None:
+    self.exc_listener = exc_listener
+
+    if listener is None:
+      if self.thread is not None:
         self.put(Queue.STOP)
         self.thread.join()
         self.thread = None
     else:
-      if self.thread == None:
+      if self.thread is None:
         self.thread = Thread(target = self.run)
         self.thread.setDaemon(True)
         self.thread.start()
@@ -72,5 +77,7 @@
         o = self.get()
         if o == Queue.STOP: break
         self.listener(o)
-      except Closed:
+      except Closed, e:
+        if self.exc_listener is not None:
+          self.exc_listener(e)
         break

Modified: incubator/qpid/trunk/qpid/python/qpid/session.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/session.py?rev=654907&r1=654906&r2=654907&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/session.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/session.py Fri May  9 11:40:13 2008
@@ -52,7 +52,8 @@
     self.timeout = timeout
     self.channel = None
     self.invoke_lock = Lock()
-    self.closed = False
+    self._closing = False
+    self._closed = False
 
     self.condition = Condition()
 
@@ -82,7 +83,9 @@
 
   def error(self):
     exc = self.exceptions[:]
-    if len(exc) == 1:
+    if len(exc) == 0:
+      return None
+    elif len(exc) == 1:
       return exc[0]
     else:
       return tuple(exc)
@@ -102,13 +105,31 @@
   def close(self, timeout=None):
     self.invoke_lock.acquire()
     try:
-      self.closed = True
+      self._closing = True
       self.channel.session_detach(self.name)
     finally:
       self.invoke_lock.release()
     if not wait(self.condition, lambda: self.channel is None, timeout):
       raise Timeout()
 
+  def closed(self):
+    self.lock.acquire()
+    try:
+      if self._closed: return
+      self._closed = True
+
+      error = self.error()
+      for id in self.results:
+        f = self.results[id]
+        f.error(error)
+      self.results.clear()
+
+      for q in self._incoming.values():
+        q.close(error)
+      notify(self.condition)
+    finally:
+      self.lock.release()
+
   def resolve_method(self, name):
     cmd = self.spec.instructions.get(name)
     if cmd is not None and cmd.track == self.spec["track.command"].value:
@@ -136,7 +157,7 @@
       self.invoke_lock.release()
 
   def do_invoke(self, type, args, kwargs):
-    if self.closed:
+    if self._closing:
       raise SessionClosed()
 
     if self.channel == None:
@@ -311,20 +332,7 @@
     future.set(er.value)
 
   def execution_exception(self, ex):
-    self.session.lock.acquire()
-    try:
-      self.session.exceptions.append(ex)
-      error = self.session.error()
-      for id in self.session.results:
-        f = self.session.results[id]
-        f.error(error)
-      self.session.results.clear()
-
-      for q in self.session._incoming.values():
-        q.close(error)
-      notify(self.session.condition)
-    finally:
-      self.session.lock.release()
+    self.session.exceptions.append(ex)
 
 class Client(Delegate):
 

Modified: incubator/qpid/trunk/qpid/python/tests/connection.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/connection.py?rev=654907&r1=654906&r2=654907&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/connection.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/connection.py Fri May  9 11:40:13 
2008
@@ -51,8 +51,16 @@
   def queue_query(self, qq):
     return qq._type.result.type.new((qq.queue,), {})
 
-  def message_transfer(self, cmd, header, body):
-    self.queue.put((cmd, header, body))
+  def message_transfer(self, cmd, headers, body):
+    if cmd.destination == "echo":
+      m = Message(body)
+      m.headers = headers
+      self.session.message_transfer(cmd.destination, cmd.accept_mode,
+                                    cmd.acquire_mode, m)
+    elif cmd.destination == "abort":
+      self.session.channel.connection.sock.close()
+    else:
+      self.queue.put((cmd, headers, body))
 
 class ConnectionTest(TestCase):
 
@@ -134,3 +142,59 @@
     qq = ssn.queue_query("asdf")
     assert qq.queue == "asdf"
     c.close(5)
+
+  def testCloseGet(self):
+    c = Connection(connect("0.0.0.0", PORT), self.spec)
+    c.start(10)
+    ssn = c.session("test", timeout=10)
+    echos = ssn.incoming("echo")
+
+    for i in range(10):
+      ssn.message_transfer("echo", message=Message("test%d" % i))
+
+    ssn.auto_sync=False
+    ssn.message_transfer("abort")
+
+    for i in range(10):
+      m = echos.get(timeout=10)
+      assert m.body == "test%d" % i
+
+    try:
+      m = echos.get(timeout=10)
+      assert False
+    except Closed, e:
+      pass
+
+  def testCloseListen(self):
+    c = Connection(connect("0.0.0.0", PORT), self.spec)
+    c.start(10)
+    ssn = c.session("test", timeout=10)
+    echos = ssn.incoming("echo")
+
+    messages = []
+    exceptions = []
+    condition = Condition()
+    def listener(m): messages.append(m)
+    def exc_listener(e):
+      exceptions.append(e)
+      condition.acquire()
+      condition.notify()
+      condition.release()
+
+    echos.listen(listener, exc_listener)
+
+    for i in range(10):
+      ssn.message_transfer("echo", message=Message("test%d" % i))
+
+    ssn.auto_sync=False
+    ssn.message_transfer("abort")
+
+    condition.acquire()
+    condition.wait(10)
+    condition.release()
+
+    for i in range(10):
+      m = messages.pop(0)
+      assert m.body == "test%d" % i
+
+    assert len(exceptions) == 1


Reply via email to