Hi,
I've noticed what seems to be an intermittent corruption of CPG messages. The
problem was that if the dispatch_buffer is filled up exactly, then bytes_left
returned by shared_mem_dispatch_bytes_left() would be conn_info->dispatch_size
instead of zero. The exec could then continue copying messages into the
dispatch_buffer overwriting older messages. If a coroipc_response_header_t gets
overwritten, then cpg_dispatch() can fail and never recover (if the header size
is zero, then the increment in coroipcc_dispatch_put() fails and cpg_dispatch()
returns the same invalid coroipc_response_header_t again next time).
The attached patch avoids the problem by preventing the dispatch_buffer
completely filling up. It may not be the best solution, but I couldn't think of
a better way to differentiate between a completely full dispatch_buffer and an
empty one without adding more variables just to track this.
Attached is a test app that'll generate the problem. We're using flatiron trunk
with a small memory footprint (so dispatch_buffer is 65536 bytes). This app
should hit the fail case almost immediately, then the app will fail to send or
receive any more messages.
Cheers,
Tim
diff --git a/exec/coroipcs.c b/exec/coroipcs.c
index 5b2411d..b4ebe19 100644
--- a/exec/coroipcs.c
+++ b/exec/coroipcs.c
@@ -1406,7 +1406,7 @@ static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int
for (i = 0; i < iov_len; i++) {
bytes_msg += iov[i].iov_len;
}
- if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
+ if (bytes_left <= bytes_msg || list_empty (&conn_info->outq_head) == 0) {
outq_item = api->malloc (sizeof (struct outq_item));
if (outq_item == NULL) {
ipc_disconnect (conn);
#define _BSD_SOURCE
/*
* Copyright (c) 2009 Allied Telesis Lab, NZ.
*
* All rights reserved.
*
* Author: Angus Salkeld ([email protected])
* Author: Tim Beale ([email protected])
*
* This software licensed under BSD license, the text of which follows:
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* - Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* - Neither the name of Allied Telesis Labs. nor the names of its
* contributors may be used to endorse or promote products derived from this
* software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
* THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
#include <unistd.h>
#include <time.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/un.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <corosync/corotypes.h>
#include <corosync/coroipc_types.h>
#include <corosync/coroipcc.h>
#include <corosync/corodefs.h>
#include <corosync/hdb.h>
#include <corosync/list.h>
#include <corosync/cpg.h>
#include <corosync/ipc_cpg.h>
// assumption: corosync small memory footprint is used and dispatch_buffer=65536 bytes
#define DISPATCH_SIZE 65536
#define CPG_HEADER_SIZE sizeof (struct res_lib_cpg_deliver_callback)
#define CPG_MSG_SIZE 512
#define CPG_MSG_NUM 128
int msg_rx_count;
static char test_data[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWYZ0123456789"
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWYZ0123456789"
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWYZ0123456789"
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWYZ0123456789"
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWYZ0123456789"
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWYZ0123456789";
void cpg_bm_confchg_fn (
cpg_handle_t handle,
struct cpg_name *group_name,
struct cpg_address *member_list, int member_list_entries,
struct cpg_address *left_list, int left_list_entries,
struct cpg_address *joined_list, int joined_list_entries)
{
printf("%s\n",__func__);
}
// each msg sent has a unique sequence number
static int tx_order_counter = 1;
static int rx_order_counter = 0;
static int rx_errors = 0;
static void
receive_error_incr (void)
{
rx_errors++;
if ( rx_errors > 512 )
{
printf ("### Too many errors, exiting\n");
exit (1);
}
}
void cpg_bm_deliver_fn (
cpg_handle_t handle,
struct cpg_name *group_name,
uint32_t nodeid,
uint32_t pid,
void *msg,
int msg_len)
{
int rx_number;
rx_number = ((int *)msg)[0];
// sanity check msg is in the correct order
if (rx_number != rx_order_counter + 1)
{
printf ("### Unexpected seq: %u (expected %u)\n", rx_number, rx_order_counter);
receive_error_incr();
}
rx_order_counter++;
msg += sizeof (int);
// sanity check msg data is correct
if (memcmp (msg, test_data, msg_len - sizeof(int)) != 0)
{
printf ("### Unexpected data: %s\n", (char *)msg);
receive_error_incr();
}
msg_rx_count++;
}
cpg_callbacks_t callbacks = {
.cpg_deliver_fn = cpg_bm_deliver_fn,
.cpg_confchg_fn = cpg_bm_confchg_fn
};
void cpg_transmit (cpg_handle_t handle, int num_to_tx, int msg_len)
{
struct iovec iov[2];
unsigned int res;
int msg_tx_count = 0;
cpg_flow_control_state_t flow_current = CPG_FLOW_CONTROL_DISABLED;
iov[0].iov_base = &tx_order_counter;
iov[0].iov_len = sizeof (int);
msg_len -= sizeof (int);
iov[1].iov_base = test_data;
iov[1].iov_len = msg_len;
assert (msg_len > 0 && msg_len <= sizeof(test_data));
printf("%s() sending %u %u-byte messages\n", __func__, num_to_tx, msg_len);
do {
cpg_flow_control_state_get (handle, &flow_current);
if (flow_current == CPG_FLOW_CONTROL_DISABLED) {
res = cpg_mcast_joined (handle, CPG_TYPE_AGREED, iov, 2);
if (res == CPG_OK) {
msg_tx_count++;
tx_order_counter++;
} else if (res != CPG_ERR_TRY_AGAIN) {
printf("### cpg_mcast_joined() failed %d\n", res);
}
} else {
printf ("### flow control enabled, sent %u msgs\n", msg_tx_count);
return;
}
} while (msg_tx_count < num_to_tx);
}
static struct cpg_name group_name = {
.value = "cpg_test",
.length = 6
};
static void
cpg_receive(cpg_handle_t handle, int num_to_rx)
{
fd_set master_fds;
fd_set read_fds;
struct timeval timeout;
int cpg_fd;
int cum_rx_count = 0;
int res;
timeout.tv_sec = 0;
timeout.tv_usec = 1;
if (cpg_fd_get (handle, &cpg_fd) == CPG_OK)
{
FD_SET (cpg_fd, &master_fds);
}
while (cum_rx_count < num_to_rx) {
read_fds = master_fds;
if (select (cpg_fd+1, &read_fds, NULL, NULL, &timeout) > 0 &&
FD_ISSET (cpg_fd, &read_fds))
{
msg_rx_count = 0;
res = cpg_dispatch (handle, CPG_DISPATCH_ALL);
if (res != CPG_OK) {
printf ("### cpg dispatch returned error %d\n", res);
receive_error_incr ();
}
cum_rx_count += msg_rx_count;
} else {
printf( "### expected %u, dispatched only %u...returning\n",
num_to_rx, cum_rx_count );
receive_error_incr ();
return;
}
}
printf("cpg_dispatch() %d messages received OK\n", cum_rx_count);
}
int main (void) {
cpg_handle_t handle;
unsigned int res;
int msg_len;
res = cpg_initialize (&handle, &callbacks);
if (res != CPG_OK) {
printf ("cpg_initialize failed with result %d\n", res);
exit (1);
}
res = cpg_join (handle, &group_name);
if (res != CPG_OK) {
printf ("cpg_join failed with result %d\n", res);
exit (1);
}
// these messages should be a size that'll fill up the dispatch_buffer exactly
msg_len = (CPG_MSG_SIZE - CPG_HEADER_SIZE);
assert ((CPG_MSG_SIZE * CPG_MSG_NUM) == DISPATCH_SIZE);
// send/receive a couple of messages to start with
cpg_transmit (handle, 10, msg_len);
sleep(1);
cpg_receive (handle, 10);
// send in exactly enough messages to fill up the dispatch_buffer
cpg_transmit (handle, CPG_MSG_NUM, msg_len);
// now send in some more that'll overwrite what was just sent
cpg_transmit (handle, CPG_MSG_NUM, msg_len + 12);
sleep(1);
// do the receive and the cpg_dispatch()s should fail
cpg_receive (handle, CPG_MSG_NUM * 2);
// normally it'll fail by this point after which cpg_dispatch will fail to dispatch
// anything and flow control gets stuck in the enabled state, so can't send either
while( 1 )
{
int rand = (random() % 1024);
int len = (rand % msg_len) + 10;
cpg_transmit (handle, rand, len );
sleep(1);
cpg_receive (handle, rand);
}
res = cpg_finalize (handle);
if (res != CPG_OK) {
printf ("cpg_join failed with result %d\n", res);
exit (1);
}
return (0);
}
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais