Hi,

I've noticed what seems to be an intermittent corruption of CPG messages. The
problem was that if the dispatch_buffer is filled up exactly, then bytes_left
returned by shared_mem_dispatch_bytes_left() would be conn_info->dispatch_size
instead of zero. The exec could then continue copying messages into the
dispatch_buffer overwriting older messages. If a coroipc_response_header_t gets
overwritten, then cpg_dispatch() can fail and never recover (if the header size
is zero, then the increment in coroipcc_dispatch_put() fails and cpg_dispatch()
returns the same invalid coroipc_response_header_t again next time).

The attached patch avoids the problem by preventing the dispatch_buffer
completely filling up. It may not be the best solution, but I couldn't think of
a better way to differentiate between a completely full dispatch_buffer and an
empty one without adding more variables just to track this.

Attached is a test app that'll generate the problem. We're using flatiron trunk
with a small memory footprint (so dispatch_buffer is 65536 bytes). This app
should hit the fail case almost immediately, then the app will fail to send or
receive any more messages.

Cheers,
Tim
diff --git a/exec/coroipcs.c b/exec/coroipcs.c
index 5b2411d..b4ebe19 100644
--- a/exec/coroipcs.c
+++ b/exec/coroipcs.c
@@ -1406,7 +1406,7 @@ static void msg_send_or_queue (void *conn, const struct iovec *iov, unsigned int
 	for (i = 0; i < iov_len; i++) {
 		bytes_msg += iov[i].iov_len;
 	}
-	if (bytes_left < bytes_msg || list_empty (&conn_info->outq_head) == 0) {
+	if (bytes_left <= bytes_msg || list_empty (&conn_info->outq_head) == 0) {
 		outq_item = api->malloc (sizeof (struct outq_item));
 		if (outq_item == NULL) {
 			ipc_disconnect (conn);
#define _BSD_SOURCE
/*
 * Copyright (c) 2009 Allied Telesis Lab, NZ.
 *
 * All rights reserved.
 *
 * Author: Angus Salkeld ([email protected])
 * Author: Tim Beale ([email protected])
 *
 * This software licensed under BSD license, the text of which follows:
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * - Redistributions of source code must retain the above copyright notice,
 *   this list of conditions and the following disclaimer.
 * - Redistributions in binary form must reproduce the above copyright notice,
 *   this list of conditions and the following disclaimer in the documentation
 *   and/or other materials provided with the distribution.
 * - Neither the name of Allied Telesis Labs. nor the names of its
 *   contributors may be used to endorse or promote products derived from this
 *   software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
 * THE POSSIBILITY OF SUCH DAMAGE.
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
#include <unistd.h>
#include <time.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/un.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>

#include <corosync/corotypes.h>
#include <corosync/coroipc_types.h>
#include <corosync/coroipcc.h>
#include <corosync/corodefs.h>
#include <corosync/hdb.h>
#include <corosync/list.h>
#include <corosync/cpg.h>
#include <corosync/ipc_cpg.h>

// assumption: corosync small memory footprint is used and dispatch_buffer=65536 bytes
#define DISPATCH_SIZE 65536
#define CPG_HEADER_SIZE sizeof (struct res_lib_cpg_deliver_callback)
#define CPG_MSG_SIZE    512
#define CPG_MSG_NUM     128

int msg_rx_count;

static char test_data[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWYZ0123456789"
                          "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWYZ0123456789"
                          "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWYZ0123456789"
                          "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWYZ0123456789"
                          "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWYZ0123456789"
                          "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWYZ0123456789";

void cpg_bm_confchg_fn (
	cpg_handle_t handle,
	struct cpg_name *group_name,
	struct cpg_address *member_list, int member_list_entries,
	struct cpg_address *left_list, int left_list_entries,
	struct cpg_address *joined_list, int joined_list_entries)
{
	printf("%s\n",__func__);
}

// each msg sent has a unique sequence number
static int tx_order_counter = 1;
static int rx_order_counter = 0;
static int rx_errors = 0;

static void
receive_error_incr (void)
{
	rx_errors++;

	if ( rx_errors > 512 )
	{
		printf ("### Too many errors, exiting\n");
		exit (1);
	}
} 

void cpg_bm_deliver_fn (
        cpg_handle_t handle,
        struct cpg_name *group_name,
        uint32_t nodeid,
        uint32_t pid,
        void *msg,
        int msg_len)
{
	int rx_number;

	rx_number = ((int *)msg)[0];

	// sanity check msg is in the correct order
	if (rx_number != rx_order_counter + 1)
	{
		printf ("### Unexpected seq: %u (expected %u)\n", rx_number, rx_order_counter);
		receive_error_incr();
	}

	rx_order_counter++;
	msg += sizeof (int);

	// sanity check msg data is correct
	if (memcmp (msg, test_data, msg_len - sizeof(int)) != 0)
	{
		printf ("### Unexpected data: %s\n", (char *)msg);
		receive_error_incr();
	}

	msg_rx_count++;
}

cpg_callbacks_t callbacks = {
	.cpg_deliver_fn 	= cpg_bm_deliver_fn,
	.cpg_confchg_fn		= cpg_bm_confchg_fn
};

void cpg_transmit (cpg_handle_t handle, int num_to_tx, int msg_len)
{
	struct iovec iov[2];
	unsigned int res;
	int msg_tx_count = 0;
	cpg_flow_control_state_t flow_current = CPG_FLOW_CONTROL_DISABLED;

	iov[0].iov_base = &tx_order_counter;
	iov[0].iov_len = sizeof (int);
	msg_len -= sizeof (int);
	iov[1].iov_base = test_data;
	iov[1].iov_len = msg_len;
	assert (msg_len > 0 && msg_len <= sizeof(test_data));

	printf("%s() sending %u %u-byte messages\n", __func__, num_to_tx, msg_len);
	do {
		cpg_flow_control_state_get (handle, &flow_current);

		if (flow_current == CPG_FLOW_CONTROL_DISABLED) {
			res = cpg_mcast_joined (handle, CPG_TYPE_AGREED, iov, 2);
			if (res == CPG_OK) {
				msg_tx_count++;
				tx_order_counter++;
			} else if (res != CPG_ERR_TRY_AGAIN) {
				printf("### cpg_mcast_joined() failed %d\n", res);
			}
		} else {
			printf ("### flow control enabled, sent %u msgs\n", msg_tx_count);
			return;
		}
	} while (msg_tx_count < num_to_tx);
}

static struct cpg_name group_name = {
	.value = "cpg_test",
	.length = 6
};

static void
cpg_receive(cpg_handle_t handle, int num_to_rx)
{
	fd_set master_fds;
	fd_set read_fds;
	struct timeval timeout;
	int cpg_fd;
	int cum_rx_count = 0;
	int res;

	timeout.tv_sec = 0;
	timeout.tv_usec = 1;

	if (cpg_fd_get (handle, &cpg_fd) == CPG_OK)
	{
		FD_SET (cpg_fd, &master_fds);
	}

	while (cum_rx_count < num_to_rx) {
		read_fds = master_fds;

		if (select (cpg_fd+1, &read_fds, NULL, NULL, &timeout) > 0 &&
			FD_ISSET (cpg_fd, &read_fds))
		{
			msg_rx_count = 0;
			res = cpg_dispatch (handle, CPG_DISPATCH_ALL);
			if (res != CPG_OK) {
				printf ("### cpg dispatch returned error %d\n", res);
				receive_error_incr ();
			}
			cum_rx_count += msg_rx_count;
		} else {
			printf( "### expected %u, dispatched only %u...returning\n",
					num_to_rx, cum_rx_count );
			receive_error_incr ();
			return;
		}
	}
	printf("cpg_dispatch() %d messages received OK\n", cum_rx_count);
}

int main (void) {
	cpg_handle_t handle;
	unsigned int res;
	int msg_len;

	res = cpg_initialize (&handle, &callbacks);
	if (res != CPG_OK) {
		printf ("cpg_initialize failed with result %d\n", res);
		exit (1);
	}

	res = cpg_join (handle, &group_name);
	if (res != CPG_OK) {
		printf ("cpg_join failed with result %d\n", res);
		exit (1);
	}

	// these messages should be a size that'll fill up the dispatch_buffer exactly
	msg_len = (CPG_MSG_SIZE - CPG_HEADER_SIZE);
	assert ((CPG_MSG_SIZE * CPG_MSG_NUM) == DISPATCH_SIZE);

	// send/receive a couple of messages to start with
	cpg_transmit (handle, 10, msg_len);
	sleep(1);
	cpg_receive (handle, 10);

	// send in exactly enough messages to fill up the dispatch_buffer
	cpg_transmit (handle, CPG_MSG_NUM, msg_len);

	// now send in some more that'll overwrite what was just sent
	cpg_transmit (handle, CPG_MSG_NUM, msg_len + 12);
	sleep(1);

	// do the receive and the cpg_dispatch()s should fail
	cpg_receive (handle, CPG_MSG_NUM * 2);

	// normally it'll fail by this point after which cpg_dispatch will fail to dispatch
	// anything and flow control gets stuck in the enabled state, so can't send either
	while( 1 )
	{
		int rand = (random() % 1024);
		int len = (rand % msg_len) + 10;
		cpg_transmit (handle, rand, len );
		sleep(1);
		cpg_receive (handle, rand);
	}

	res = cpg_finalize (handle);
	if (res != CPG_OK) {
		printf ("cpg_join failed with result %d\n", res);
		exit (1);
	}
	return (0);
}
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to