Author: rhs
Date: Mon Jun 16 16:27:54 2008
New Revision: 668345
URL: http://svn.apache.org/viewvc?rev=668345&view=rev
Log:
QPID-1143: added buffering, we now only issue one write per assembly
Modified:
incubator/qpid/trunk/qpid/python/qpid/framer.py
incubator/qpid/trunk/qpid/python/tests/framer.py
Modified: incubator/qpid/trunk/qpid/python/qpid/framer.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/framer.py?rev=668345&r1=668344&r2=668345&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/framer.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/framer.py Mon Jun 16 16:27:54 2008
@@ -20,7 +20,7 @@
import struct, socket
from exceptions import Closed
from packer import Packer
-from threading import Lock
+from threading import RLock
from logging import getLogger
raw = getLogger("qpid.io.raw")
@@ -75,12 +75,25 @@
def __init__(self, sock):
self.sock = sock
- self.sock_lock = Lock()
+ self.sock_lock = RLock()
+ self._buf = ""
def aborted(self):
return False
def write(self, buf):
+ self._buf += buf
+
+ def flush(self):
+ self.sock_lock.acquire()
+ try:
+ self._write(self._buf)
+ self._buf = ""
+ frm.debug("FLUSHED")
+ finally:
+ self.sock_lock.release()
+
+ def _write(self, buf):
while buf:
try:
n = self.sock.send(buf)
@@ -120,6 +133,7 @@
self.sock_lock.acquire()
try:
self.pack(Framer.HEADER, "AMQP", 1, 1, major, minor)
+ self.flush()
finally:
self.sock_lock.release()
@@ -130,6 +144,8 @@
track = frame.track & 0x0F
self.pack(Frame.HEADER, frame.flags, frame.type, size, track,
frame.channel)
self.write(frame.payload)
+ if frame.isLastSegment() and frame.isLastFrame():
+ self.flush()
frm.debug("SENT %s", frame)
finally:
self.sock_lock.release()
Modified: incubator/qpid/trunk/qpid/python/tests/framer.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/framer.py?rev=668345&r1=668344&r2=668345&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/framer.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/framer.py Mon Jun 16 16:27:54 2008
@@ -37,6 +37,7 @@
while True:
frame = conn.read_frame()
conn.write_frame(frame)
+ conn.flush()
except Closed:
pass
@@ -60,6 +61,7 @@
c.write_frame(Frame(0, 1, 2, 3, "IS"))
c.write_frame(Frame(0, 1, 2, 3, "A"))
c.write_frame(Frame(LAST_FRM, 1, 2, 3, "TEST"))
+ c.flush()
f = c.read_frame()
assert f.flags & FIRST_FRM