The public-facing API will eventually communicate cross-thread instead of writing directly to the brigade. In preparation, pull the send logic into an internal function, and call that from the read loop instead of mod_websocket_plugin_send. --- mod_websocket.c | 161 ++++++++++++++++++++++++++++++++------------------------ 1 file changed, 91 insertions(+), 70 deletions(-)
diff --git a/mod_websocket.c b/mod_websocket.c index 5a04936..62be765 100644 --- a/mod_websocket.c +++ b/mod_websocket.c @@ -290,85 +290,103 @@ static void CALLBACK mod_websocket_protocol_set(const WebSocketServer *server, } } +/* + * Sends data to the WebSocket connection using the given server state. The + * server state must be locked upon entering this function. buffer_size is + * assumed to be within the limits defined by the WebSocket protocol (i.e. fits + * in 63 bits). + */ +static size_t mod_websocket_send_internal(WebSocketState *state, + const int type, + const unsigned char *buffer, + const size_t buffer_size) +{ + apr_uint64_t payload_length = + (apr_uint64_t) ((buffer != NULL) ? buffer_size : 0); + size_t written = 0; + + if ((state->r != NULL) && (state->obb != NULL) && !state->closing) { + unsigned char header[32]; + ap_filter_t *of = state->r->connection->output_filters; + apr_size_t pos = 0; + unsigned char opcode; + + switch (type) { + case MESSAGE_TYPE_TEXT: + opcode = OPCODE_TEXT; + break; + case MESSAGE_TYPE_BINARY: + opcode = OPCODE_BINARY; + break; + case MESSAGE_TYPE_PING: + opcode = OPCODE_PING; + break; + case MESSAGE_TYPE_PONG: + opcode = OPCODE_PONG; + break; + case MESSAGE_TYPE_CLOSE: + default: + state->closing = 1; + opcode = OPCODE_CLOSE; + break; + } + header[pos++] = FRAME_SET_FIN(1) | FRAME_SET_OPCODE(opcode); + if (payload_length < 126) { + header[pos++] = + FRAME_SET_MASK(0) | FRAME_SET_LENGTH(payload_length, 0); + } + else { + if (payload_length < 65536) { + header[pos++] = FRAME_SET_MASK(0) | 126; + } + else { + header[pos++] = FRAME_SET_MASK(0) | 127; + header[pos++] = FRAME_SET_LENGTH(payload_length, 7); + header[pos++] = FRAME_SET_LENGTH(payload_length, 6); + header[pos++] = FRAME_SET_LENGTH(payload_length, 5); + header[pos++] = FRAME_SET_LENGTH(payload_length, 4); + header[pos++] = FRAME_SET_LENGTH(payload_length, 3); + header[pos++] = FRAME_SET_LENGTH(payload_length, 2); + } + header[pos++] = FRAME_SET_LENGTH(payload_length, 1); + header[pos++] = FRAME_SET_LENGTH(payload_length, 0); + } + ap_fwrite(of, state->obb, (const char *)header, pos); /* Header */ + if (payload_length > 0) { + if (ap_fwrite(of, state->obb, + (const char *)buffer, + buffer_size) == APR_SUCCESS) { /* Payload Data */ + written = buffer_size; + } + } + if (ap_fflush(of, state->obb) != APR_SUCCESS) { + written = 0; + } + } + + return written; +} + static size_t CALLBACK mod_websocket_plugin_send(const WebSocketServer *server, const int type, const unsigned char *buffer, const size_t buffer_size) { - apr_uint64_t payload_length = - (apr_uint64_t) ((buffer != NULL) ? buffer_size : 0); size_t written = 0; /* Deal with size more that 63 bits - FIXME */ - if ((server != NULL) && (server->state != NULL)) { WebSocketState *state = server->state; apr_thread_mutex_lock(state->mutex); - - if ((state->r != NULL) && (state->obb != NULL) && !state->closing) { - unsigned char header[32]; - ap_filter_t *of = state->r->connection->output_filters; - apr_size_t pos = 0; - unsigned char opcode; - - switch (type) { - case MESSAGE_TYPE_TEXT: - opcode = OPCODE_TEXT; - break; - case MESSAGE_TYPE_BINARY: - opcode = OPCODE_BINARY; - break; - case MESSAGE_TYPE_PING: - opcode = OPCODE_PING; - break; - case MESSAGE_TYPE_PONG: - opcode = OPCODE_PONG; - break; - case MESSAGE_TYPE_CLOSE: - default: - state->closing = 1; - opcode = OPCODE_CLOSE; - break; - } - header[pos++] = FRAME_SET_FIN(1) | FRAME_SET_OPCODE(opcode); - if (payload_length < 126) { - header[pos++] = - FRAME_SET_MASK(0) | FRAME_SET_LENGTH(payload_length, 0); - } - else { - if (payload_length < 65536) { - header[pos++] = FRAME_SET_MASK(0) | 126; - } - else { - header[pos++] = FRAME_SET_MASK(0) | 127; - header[pos++] = FRAME_SET_LENGTH(payload_length, 7); - header[pos++] = FRAME_SET_LENGTH(payload_length, 6); - header[pos++] = FRAME_SET_LENGTH(payload_length, 5); - header[pos++] = FRAME_SET_LENGTH(payload_length, 4); - header[pos++] = FRAME_SET_LENGTH(payload_length, 3); - header[pos++] = FRAME_SET_LENGTH(payload_length, 2); - } - header[pos++] = FRAME_SET_LENGTH(payload_length, 1); - header[pos++] = FRAME_SET_LENGTH(payload_length, 0); - } - ap_fwrite(of, state->obb, (const char *)header, pos); /* Header */ - if (payload_length > 0) { - if (ap_fwrite(of, state->obb, - (const char *)buffer, - buffer_size) == APR_SUCCESS) { /* Payload Data */ - written = buffer_size; - } - } - if (ap_fflush(of, state->obb) != APR_SUCCESS) { - written = 0; - } - } + written = mod_websocket_send_internal(state, type, buffer, buffer_size); apr_thread_mutex_unlock(state->mutex); } + return written; } + static void CALLBACK mod_websocket_plugin_close(const WebSocketServer * server) { @@ -775,10 +793,12 @@ static void mod_websocket_data_framing(const WebSocketServer *server, status_code = STATUS_CODE_OK; break; case OPCODE_PING: - mod_websocket_plugin_send(server, - MESSAGE_TYPE_PONG, - application_data, - application_data_offset); + apr_thread_mutex_lock(state->mutex); + mod_websocket_send_internal(state, + MESSAGE_TYPE_PONG, + application_data, + application_data_offset); + apr_thread_mutex_unlock(state->mutex); break; case OPCODE_PONG: break; @@ -829,12 +849,13 @@ static void mod_websocket_data_framing(const WebSocketServer *server, /* Send server-side closing handshake */ status_code_buffer[0] = (status_code >> 8) & 0xFF; status_code_buffer[1] = status_code & 0xFF; - mod_websocket_plugin_send(server, MESSAGE_TYPE_CLOSE, - status_code_buffer, - sizeof(status_code_buffer)); + + apr_thread_mutex_lock(state->mutex); + mod_websocket_send_internal(state, MESSAGE_TYPE_CLOSE, + status_code_buffer, + sizeof(status_code_buffer)); /* We are done with the bucket brigade */ - apr_thread_mutex_lock(state->mutex); state->obb = NULL; apr_brigade_destroy(obb); } -- 2.1.1