Author: markt Date: Mon Feb 11 14:03:35 2013 New Revision: 1444768 URL: http://svn.apache.org/r1444768 Log: Implement new approach to locking
Modified: tomcat/trunk/java/org/apache/tomcat/websocket/Constants.java tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java Modified: tomcat/trunk/java/org/apache/tomcat/websocket/Constants.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/Constants.java?rev=1444768&r1=1444767&r2=1444768&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/Constants.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/Constants.java Mon Feb 11 14:03:35 2013 @@ -31,6 +31,11 @@ public class Constants { public static final byte OPCODE_PING = 0x09; public static final byte OPCODE_PONG = 0x0A; + // Internal OP Codes + // RFC 6455 limits OP Codes to 4 bits so these should never clash + // Always set bit 4 so these will be treated as control codes + static final byte INTERNAL_OPCODE_FLUSH = 0x18; + // Client connection public static final String HOST_HEADER_NAME = "Host"; public static final String UPGRADE_HEADER_NAME = "Upgrade"; Modified: tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties?rev=1444768&r1=1444767&r2=1444768&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/LocalStrings.properties Mon Feb 11 14:03:35 2013 @@ -13,10 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -messageSendStateMachine.changeType=When sending a fragmented message, all fragments bust be of the same type -messageSendStateMachine.closed=Message will not be sent because the WebSocket session has been closed -messageSendStateMachine.inProgress=Message will not be sent because the WebSocket session is currently sending another message - # Note the wsFrame.* messages are used as close reasons in WebSocket control # frames and therefore must be 123 bytes (not characters) or less in length. # Messages are encoded using UTF-8 where a single character may be encoded in @@ -35,7 +31,10 @@ wsFrame.oneByteCloseCode=The client sent wsFrame.textMessageTooBig=The decoded text message was too big for the output buffer and the endpoint does not support partial messages wsFrame.wrongRsv=The client frame set the reserved bits to [{0}] which was not supported by this endpoint +wsRemoteEndpoint.closed=Message will not be sent because the WebSocket session has been closed +wsRemoteEndpoint.changeType=When sending a fragmented message, all fragments bust be of the same type wsRemoteEndpoint.concurrentMessageSend=Messages may not be sent concurrently even when using the asynchronous send messages. The client must wait for the previous message to complete before sending the next. +wsRemoteEndpoint.inProgress=Message will not be sent because the WebSocket session is currently sending another message wsSession.duplicateHandlerBinary=A binary message handler has already been configured wsSession.duplicateHandlerPong=A pong message handler has already been configured Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java?rev=1444768&r1=1444767&r2=1444768&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsRemoteEndpointBase.java Mon Feb 11 14:03:35 2013 @@ -24,14 +24,14 @@ import java.nio.CharBuffer; import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; import java.nio.charset.CoderResult; +import java.util.ArrayDeque; +import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; import javax.websocket.EncodeException; import javax.websocket.RemoteEndpoint; @@ -45,10 +45,18 @@ public abstract class WsRemoteEndpointBa private static final StringManager sm = StringManager.getManager(Constants.PACKAGE_NAME); - private final ReentrantLock writeLock = new ReentrantLock(); - private final Condition notInProgress = writeLock.newCondition(); - // Must hold writeLock above to modify state - private final MessageSendStateMachine state = new MessageSendStateMachine(); + private boolean messagePartInProgress = false; + private Queue<MessagePart> messagePartQueue = new ArrayDeque<>(); + private final Object messagePartLock = new Object(); + private boolean dataMessageInProgress = false; + + // State + private boolean closed = false; + private boolean fragmented = false; + private boolean nextFragmented = false; + private boolean text = false; + private boolean nextText = false; + // Max size of WebSocket header is 14 bytes private final ByteBuffer headerBuffer = ByteBuffer.allocate(14); private final ByteBuffer outputBuffer = ByteBuffer.allocate(8192); @@ -89,38 +97,17 @@ public abstract class WsRemoteEndpointBa @Override public void flushBatch() { - // Have to hold lock to flush output buffer - writeLock.lock(); try { - while (state.isInProgress()) { - notInProgress.await(); - } - FutureToSendHandler f2sh = new FutureToSendHandler(); - doWrite(f2sh, outputBuffer); - f2sh.get(); - } catch (InterruptedException | ExecutionException e) { + startMessageBlock(Constants.INTERNAL_OPCODE_FLUSH, null, true); + } catch (IOException e) { // TODO Log this? Runtime exception? Something else? - } finally { - writeLock.unlock(); } } @Override public void sendBytes(ByteBuffer data) throws IOException { - Future<SendResult> f = sendBytesByFuture(data); - try { - SendResult sr = f.get(); - if (!sr.isOK()) { - if (sr.getException() == null) { - throw new IOException(); - } else { - throw new IOException(sr.getException()); - } - } - } catch (InterruptedException | ExecutionException e) { - throw new IOException(e); - } + startMessageBlock(Constants.OPCODE_BINARY, data, true); } @@ -133,72 +120,35 @@ public abstract class WsRemoteEndpointBa @Override - public void sendBytesByCompletion(ByteBuffer data, SendHandler completion) { - boolean locked = writeLock.tryLock(); - if (!locked) { - throw new IllegalStateException( - sm.getString("wsRemoteEndpoint.concurrentMessageSend")); - } - try { - byte opCode = Constants.OPCODE_BINARY; - boolean isLast = true; - sendMessage(opCode, data, isLast, completion); - } finally { - writeLock.unlock(); - } + public void sendBytesByCompletion(ByteBuffer data, SendHandler handler) { + startMessage(Constants.OPCODE_BINARY, data, true, handler); } @Override - public void sendPartialBytes(ByteBuffer partialByte, boolean isLast) + public void sendPartialBytes(ByteBuffer partialByte, boolean last) throws IOException { - boolean locked = writeLock.tryLock(); - if (!locked) { - throw new IllegalStateException( - sm.getString("wsRemoteEndpoint.concurrentMessageSend")); - } - try { - byte opCode = Constants.OPCODE_BINARY; - FutureToSendHandler f2sh = new FutureToSendHandler(); - sendMessage(opCode, partialByte, isLast, f2sh); - f2sh.get(); - } catch (InterruptedException | ExecutionException e) { - throw new IOException(e); - } finally { - writeLock.unlock(); - } + startMessageBlock(Constants.OPCODE_BINARY, partialByte, last); } @Override public void sendPing(ByteBuffer applicationData) throws IOException, IllegalArgumentException { - sendControlMessage(Constants.OPCODE_PING, applicationData); + startMessageBlock(Constants.OPCODE_PING, applicationData, true); } @Override public void sendPong(ByteBuffer applicationData) throws IOException, IllegalArgumentException { - sendControlMessage(Constants.OPCODE_PONG, applicationData); + startMessageBlock(Constants.OPCODE_PONG, applicationData, true); } @Override public void sendString(String text) throws IOException { - Future<SendResult> f = sendStringByFuture(text); - try { - SendResult sr = f.get(); - if (!sr.isOK()) { - if (sr.getException() == null) { - throw new IOException(); - } else { - throw new IOException(sr.getException()); - } - } - } catch (InterruptedException | ExecutionException e) { - throw new IOException(e); - } + sendPartialString(CharBuffer.wrap(text), true); } @@ -211,19 +161,10 @@ public abstract class WsRemoteEndpointBa @Override - public void sendStringByCompletion(String text, SendHandler completion) { - boolean locked = writeLock.tryLock(); - if (!locked) { - throw new IllegalStateException( - sm.getString("wsRemoteEndpoint.concurrentMessageSend")); - } - try { - TextMessageSendHandler tmsh = new TextMessageSendHandler(completion, - CharBuffer.wrap(text), true, encoder, encoderBuffer, this); - tmsh.write(); - } finally { - writeLock.unlock(); - } + public void sendStringByCompletion(String text, SendHandler handler) { + TextMessageSendHandler tmsh = new TextMessageSendHandler(handler, + CharBuffer.wrap(text), true, encoder, encoderBuffer, this); + tmsh.write(); } @@ -251,70 +192,137 @@ public abstract class WsRemoteEndpointBa - - /** - * Sends a control message, blocking until the message is sent. - */ - void sendControlMessage(byte opCode, ByteBuffer payload) - throws IOException{ - - // Close needs to be sent so disable batching. This will flush any - // messages in the buffer - if (opCode == Constants.OPCODE_CLOSE) { - setBatchingAllowed(false); - } - - writeLock.lock(); + void sendPartialString(CharBuffer part, boolean last) throws IOException { try { - if (state.isInProgress()) { - notInProgress.await(); - } FutureToSendHandler f2sh = new FutureToSendHandler(); - sendMessage(opCode, payload, true, f2sh); + TextMessageSendHandler tmsh = new TextMessageSendHandler(f2sh, part, + last, encoder, encoderBuffer, this); + tmsh.write(); f2sh.get(); } catch (InterruptedException | ExecutionException e) { throw new IOException(e); - } finally { - notInProgress.signal(); - writeLock.unlock(); } } - void sendPartialString(CharBuffer fragment, boolean isLast) + void startMessageBlock(byte opCode, ByteBuffer payload, boolean last) throws IOException { - boolean locked = writeLock.tryLock(); - if (!locked) { - throw new IllegalStateException( - sm.getString("wsRemoteEndpoint.concurrentMessageSend")); - } + FutureToSendHandler f2sh = new FutureToSendHandler(); + startMessage(opCode, payload, last, f2sh); try { - FutureToSendHandler f2sh = new FutureToSendHandler(); - TextMessageSendHandler tmsh = new TextMessageSendHandler(f2sh, - fragment, isLast, encoder, encoderBuffer, this); - tmsh.write(); - f2sh.get(); + SendResult sr = f2sh.get(); + if (!sr.isOK()) { + if (sr.getException() == null) { + throw new IOException(); + } else { + throw new IOException(sr.getException()); + } + } } catch (InterruptedException | ExecutionException e) { throw new IOException(e); - } finally { - writeLock.unlock(); } } - private void sendMessage(byte opCode, ByteBuffer payload, boolean last, - SendHandler completion) { + void startMessage(byte opCode, ByteBuffer payload, boolean last, + SendHandler handler) { + MessagePart mp = new MessagePart(opCode, payload, last, handler, this); + + synchronized (messagePartLock) { + if (Constants.OPCODE_CLOSE == mp.getOpCode()) { + setBatchingAllowed(false); + } + if (messagePartInProgress) { + if (!Util.isControl(opCode)) { + if (dataMessageInProgress) { + throw new IllegalStateException( + sm.getString("wsRemoteEndpoint.inProgress")); + } else { + dataMessageInProgress = true; + } + } + messagePartQueue.add(mp); + } else { + messagePartInProgress = true; + writeMessagePart(mp); + } + } + } + + + void endMessage(SendHandler handler, SendResult result, + boolean dataMessage) { + synchronized (messagePartLock) { + + if (closed) { + close(); + } else { + fragmented = nextFragmented; + text = nextText; + } + + if (dataMessage) { + dataMessageInProgress = false; + } + MessagePart mpNext = messagePartQueue.poll(); + if (mpNext == null) { + messagePartInProgress = false; + } else { + writeMessagePart(mpNext); + } + } + + handler.setResult(result); + } + - if (!writeLock.isHeldByCurrentThread()) { - // Coding problem + void writeMessagePart(MessagePart mp) { + + if (closed) { throw new IllegalStateException( - "Must hold writeLock before calling this method"); + sm.getString("wsRemoteEndpoint.closed")); } - state.startMessage(opCode, last); + if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) { + nextFragmented = fragmented; + nextText = text; + doWrite(mp.getHandler(), outputBuffer); + return; + } + + // Control messages may be sent in the middle of fragmented message + // so they have no effect on the fragmented or text flags + boolean first; + if (Util.isControl(mp.getOpCode())) { + nextFragmented = fragmented; + nextText = text; + if (mp.getOpCode() == Constants.OPCODE_CLOSE) { + closed = true; + } + first = true; + } else { + boolean isText = Util.isText(mp.getOpCode()); - SendMessageSendHandler smsh = - new SendMessageSendHandler(state, completion, this); + if (fragmented) { + // Currently fragmented + if (text != isText) { + throw new IllegalStateException( + sm.getString("wsRemoteEndpoint.changeType")); + } + nextText = text; + nextFragmented = !mp.isLast(); + first = false; + } else { + // Wasn't fragmented. Might be now + if (mp.isLast()) { + nextFragmented = false; + } else { + nextFragmented = true; + nextText = isText; + } + first = true; + } + } byte[] mask; @@ -325,33 +333,106 @@ public abstract class WsRemoteEndpointBa } headerBuffer.clear(); - writeHeader(headerBuffer, opCode, payload, state.isFirst(), last, - isMasked(), mask); + writeHeader(headerBuffer, mp.getOpCode(), mp.getPayload(), first, + mp.isLast(), isMasked(), mask); headerBuffer.flip(); if (getBatchingAllowed() || isMasked()) { // Need to write via output buffer OutputBufferSendHandler obsh = new OutputBufferSendHandler( - smsh, headerBuffer, payload, mask, outputBuffer, - !getBatchingAllowed(), this); + mp.getHandler(), headerBuffer, mp.getPayload(), mask, + outputBuffer, !getBatchingAllowed(), this); obsh.write(); } else { // Can write directly - doWrite(smsh, headerBuffer, payload); + doWrite(mp.getHandler(), headerBuffer, mp.getPayload()); } + } - private void endMessage() { - writeLock.lock(); - try { - notInProgress.signal(); - } finally { - writeLock.unlock(); + private static class MessagePart { + private final byte opCode; + private final ByteBuffer payload; + private final boolean last; + private final SendHandler handler; + + public MessagePart(byte opCode, ByteBuffer payload, boolean last, + SendHandler handler, WsRemoteEndpointBase endpoint) { + this.opCode = opCode; + this.payload = payload; + this.last = last; + this.handler = new EndMessageHandler( + endpoint, handler, !Util.isControl(opCode)); + } + + + public byte getOpCode() { + return opCode; + } + + + public ByteBuffer getPayload() { + return payload; + } + + + public boolean isLast() { + return last; + } + + + public SendHandler getHandler() { + return handler; } } + /** + * Wraps the user provided handler so that the end point is notified when + * the message is complete. + */ + private static class EndMessageHandler implements SendHandler { + + private final WsRemoteEndpointBase endpoint; + private final SendHandler handler; + private final boolean dataMessage; + + public EndMessageHandler(WsRemoteEndpointBase endpoint, + SendHandler handler, boolean dataMessage) { + this.endpoint = endpoint; + this.handler = handler; + this.dataMessage = dataMessage; + } + + + @Override + public void setResult(SendResult result) { + endpoint.endMessage(handler, result, dataMessage); + } + } + + + + + + + + + + + + + + + + + + + + + + @@ -439,81 +520,6 @@ public abstract class WsRemoteEndpointBa } - private static class MessageSendStateMachine { - private boolean closed = false; - private boolean inProgress = false; - private boolean fragmented = false; - private boolean text = false; - private boolean first = false; - - private boolean nextFragmented = false; - private boolean nextText = false; - - public synchronized void startMessage(byte opCode, boolean isLast) { - - if (closed) { - throw new IllegalStateException( - sm.getString("messageSendStateMachine.closed")); - } - - if (inProgress) { - throw new IllegalStateException( - sm.getString("messageSendStateMachine.inProgress")); - } - - inProgress = true; - - // Control messages may be sent in the middle of fragmented message - // so they have no effect on the fragmented or text flags - if (Util.isControl(opCode)) { - nextFragmented = fragmented; - nextText = text; - if (opCode == Constants.OPCODE_CLOSE) { - closed = true; - } - first = true; - return; - } - - boolean isText = Util.isText(opCode); - - if (fragmented) { - // Currently fragmented - if (text != isText) { - throw new IllegalStateException( - sm.getString("messageSendStateMachine.changeType")); - } - nextText = text; - nextFragmented = !isLast; - first = false; - } else { - // Wasn't fragmented. Might be now - if (isLast) { - nextFragmented = false; - } else { - nextFragmented = true; - nextText = isText; - } - first = true; - } - } - - public synchronized void endMessage() { - inProgress = false; - fragmented = nextFragmented; - text = nextText; - } - - public synchronized boolean isInProgress() { - return inProgress; - } - - public synchronized boolean isFirst() { - return first; - } - } - - private static class TextMessageSendHandler implements SendHandler { private final SendHandler handler; @@ -543,7 +549,7 @@ public abstract class WsRemoteEndpointBa } isDone = !cr.isOverflow(); buffer.flip(); - endpoint.sendMessage(Constants.OPCODE_TEXT, buffer, + endpoint.startMessage(Constants.OPCODE_TEXT, buffer, isDone && isLast, this); } @@ -559,35 +565,6 @@ public abstract class WsRemoteEndpointBa /** - * Wraps user provided {@link SendHandler} so that state is updated when - * the message completes. - */ - private static class SendMessageSendHandler implements SendHandler { - - private final MessageSendStateMachine state; - private final SendHandler handler; - private final WsRemoteEndpointBase endpoint; - - public SendMessageSendHandler(MessageSendStateMachine state, - SendHandler handler, WsRemoteEndpointBase endpoint) { - this.state = state; - this.handler = handler; - this.endpoint = endpoint; - } - - @Override - public void setResult(SendResult result) { - state.endMessage(); - if (state.closed) { - endpoint.close(); - } - handler.setResult(result); - endpoint.endMessage(); - } - } - - - /** * Used to write data to the output buffer, flushing the buffer if it fills * up. */ @@ -802,6 +779,7 @@ public abstract class WsRemoteEndpointBa } private void doWrite(boolean last) throws IOException { + buffer.flip(); endpoint.sendPartialString(buffer, last); buffer.clear(); } Modified: tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java?rev=1444768&r1=1444767&r2=1444768&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java (original) +++ tomcat/trunk/java/org/apache/tomcat/websocket/WsSession.java Mon Feb 11 14:03:35 2013 @@ -253,8 +253,8 @@ public class WsSession implements Sessio } msg.flip(); try { - wsRemoteEndpoint.sendControlMessage( - Constants.OPCODE_CLOSE, msg); + wsRemoteEndpoint.startMessageBlock( + Constants.OPCODE_CLOSE, msg, true); } catch (IOException ioe) { // Unable to send close message. // TODO - Ignore? --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org