Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java?rev=675165&r1=675164&r2=675165&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java (original) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java Wed Jul 9 06:26:54 2008 @@ -69,6 +69,7 @@ synchronized (lock) { sender.send(header.toByteBuffer()); + sender.flush(); } } @@ -79,32 +80,40 @@ frames.add(frame); bytes += HEADER_SIZE + frame.getSize(); - if (frame.isLastFrame() && frame.isLastSegment() || bytes > 64*1024) + if (bytes > 64*1024) { - ByteBuffer buf = ByteBuffer.allocate(bytes); - for (Frame f : frames) - { - buf.put(f.getFlags()); - buf.put((byte) f.getType().getValue()); - buf.putShort((short) (f.getSize() + HEADER_SIZE)); - // RESERVED - buf.put(RESERVED); - buf.put(f.getTrack()); - buf.putShort((short) f.getChannel()); - // RESERVED - buf.putInt(0); - for(ByteBuffer frg : f) - { - buf.put(frg); - } - } - buf.flip(); - - frames.clear(); - bytes = 0; + flush(); + } + } + } - sender.send(buf); + public void flush() + { + synchronized (lock) + { + ByteBuffer buf = ByteBuffer.allocate(bytes); + int nframes = frames.size(); + for (int i = 0; i < nframes; i++) + { + Frame frame = frames.get(i); + buf.put(frame.getFlags()); + buf.put((byte) frame.getType().getValue()); + buf.putShort((short) (frame.getSize() + HEADER_SIZE)); + // RESERVED + buf.put(RESERVED); + buf.put(frame.getTrack()); + buf.putShort((short) frame.getChannel()); + // RESERVED + buf.putInt(0); + buf.put(frame.getBody()); } + buf.flip(); + + frames.clear(); + bytes = 0; + + sender.send(buf); + sender.flush(); } }
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java?rev=675165&r1=675164&r2=675165&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java (original) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java Wed Jul 9 06:26:54 2008 @@ -55,6 +55,11 @@ write(buf); } + public void flush() + { + // pass + } + /* The extra copying sucks. * If I know for sure that the buf is backed * by an array then I could do buf.array() Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java?rev=675165&r1=675164&r2=675165&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java (original) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java Wed Jul 9 06:26:54 2008 @@ -58,6 +58,11 @@ } } + public void flush() + { + // pass + } + public synchronized void close() { // MINA will sometimes throw away in-progress writes when you Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java?rev=675165&r1=675164&r2=675165&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java (original) +++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java Wed Jul 9 06:26:54 2008 @@ -47,6 +47,11 @@ } } + public void flush() + { + // pass + } + private void write(java.nio.ByteBuffer buf) { synchronized (lock)
