Hi Gentle Readers!

Is round trip latency expected to degrade linearly as more receivers are
added to a system given an equivalent number of messages sent?

The attached test application attempts to capture the situation.  The basic
setup is to start N receivers on 1-N different nodes, then run a single
sender which tracks the results in a log file on the system distributing
delays across multiple buckets.  Note when changing the number of receivers
to result in a similar number of messages it is necessary to modify the
message count of the sender.

For example:
Starting receivers with a simple script:
#!/bin/sh
cnt=1
maxcnt=16
host=`hostname -s`
echo $host
while  (($cnt <= $maxcnt))
do
  corestart -s -c 1 -i 1 -j 2 -p 30000000
  ((cnt++))
done

And then start a single sender:
corestart -c 100 -i 1 -j 2 -p 1000000

By varying the number of receivers (and the associated message count)  the
round trip latency degrades with the number of receivers in a non-linear
relationship.

How are your results for 1-8 node systems?

Any insights as to what may cause this anomaly, including a poorly written
test application :)!
thanks,
dan
ps.  there are several other parameters that allow this test application to
exercise some interesting aspects of the library., such as running the
number if iterations up (-i) to test initialize/finalize logic.
/*
 *  @file corestart.c
 *  test application to utilize multiple connections, groups and sends
 *  and measure end to end round trip between multiple instances.
 *  For example:
 *  A test with a single sender of m messages and N multiple receivers 
 *  results (m)*(N+1) messages received by each instance.
 *  Due to differences in clocks between multiple nodes, the latency 
 *  statistics may only be valid for round trip time stamps 
 *  (sender checking origination time stamp against current time 
 *  when receiving messages reflected back from receivers).
 */

#include <stdio.h>
#include <stdarg.h>
#include <unistd.h>
#include <signal.h>
#include <sys/time.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <sys/ioctl.h>
#include <linux/sockios.h>
#include <syslog.h>
#include <errno.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#include <corosync/corotypes.h>
#include <corosync/cpg.h>

#define DEFAULT_ITERATIONS ((unsigned long)2)
#define MAX_BUF ((size_t)128)
#define DEFAULT_USEC ((useconds_t)1000000)

#define SEC_PER_USEC (1000000)
#define DEFAULT_HANDLE_COUNT (2)
#define DEFAULT_MESSAGE_COUNT (10)
#define NULL_HANDLE ((cpg_handle_t)0)
#define MAXFDS (256)

static int verbose = 0; /**< noisy print of progress to stdout */
static suseconds_t maxUsecTolerance = 500; /**< maximum tolerance in usecs for library call */
int msgcnt;	/**< message count */
unsigned long maxIterations;	/**< maximum iterations in loop */
int handleCnt;	/**< maximum count of handles */
static unsigned long msgQueued = 0;
static unsigned long msgDelivered = 0;

static	useconds_t asleep;	/**< microseconds to sleep between iterations */
static	useconds_t psleep;	/**< microseconds to sleep post iterations */
static	int doSend = 1;	/**< enable sending of messages */

typedef void (*sighandler_t)(int);
static void sigintr_handler (int signum) __attribute__((__noreturn__));

static char logname[] = "corestart";
enum loglevel_s
{
	LOGMSG_EMERG	= 	LOG_EMERG,
	LOGMSG_ALERT	=	LOG_ALERT,
	LOGMSG_CRIT	=	LOG_CRIT,
	LOGMSG_ERROR	=	LOG_ERR,
	LOGMSG_WARNING	=	LOG_WARNING,
	LOGMSG_NOTICE	=	LOG_NOTICE,
	LOGMSG_INFO	=	LOG_INFO,
	LOGMSG_DEBUG	=	LOG_DEBUG,
};
typedef enum loglevel_s loglevel_t;
#define DefaultLogLevel (LOGMSG_INFO)

static loglevel_t loglevel = DefaultLogLevel;
static char logpath[] = "/var/log/";
static int logfd = -1;

/* logmsg()
 * @brief log all messages below designated logging level
 * @param[in] level, the logging level of the message
 * @param[in] fmt, printf style formatting
 * @param[in] varargs, arguments based on arguments in fmt
 */
static void
__attribute__ ((format (printf, 2, 3)))
logmsg( const loglevel_t level, const char *fmt, ...)
{
	va_list ap;
	int len;
	char lbuf[1024];

	if(level > loglevel) {
		return;
	}

	va_start(ap, fmt);

	if(logfd >= 0) {
		vsnprintf(lbuf, sizeof(lbuf)-2, fmt, ap);
		len = write(logfd, lbuf, strlen(lbuf));
		if(len < 0) {
			printf("%s", lbuf);
		}
	}
#ifdef USE_SYSLOG
	vsyslog((int)level, fmt, ap);
#endif
	va_end(ap);
	return;
}
static void
loginit(void)
{
	char logfile[PATH_MAX];
	snprintf(logfile, sizeof(logfile), "%s%s", logpath, logname);
#ifdef USE_SYSLOG
	openlog(logname, LOG_NDELAY, LOG_USER);
#endif
	if( (logfd = open(logfile, O_WRONLY|O_APPEND|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH)) < 0 )
    {
		printf("failed to open logfile %s\n", logfile);
	}
}

static void
logwrap(void)
{
#ifdef USE_SYSLOG
	closelog();
#endif
	if(logfd >= 0) {
		if(close(logfd) < 0) {
			printf("failed to close logfile %s", strerror(errno));
		}
		logfd = -1;
	}
}

/* @struct context
 * @brief context for group
 */
typedef struct appContext {
	int idx; /**< simple integer index of connection */
	int fd;	/**< file descriptor of connection */
	char groupNameStr[CPG_MAX_NAME_LENGTH]; /**< arbitrary group name */
	int coInitOk; /**< handle ok for communcations */
}appContext;

unsigned long long hvalMax[] = {
	50, 100, 200, 300, 400, 500, 1000, 10000, 25000, 50000, 75000, 100000, 250000, 500000, 750000, 1000000, 2000000, 0xffffffffffffffff
};
#define HSlots ((sizeof (hvalMax))/sizeof(hvalMax[0]))

static unsigned long long hval[HSlots];
static unsigned long long hvalAck[HSlots];

static void
hvalClear(void)
{
	int i;
	for(i = 0; i < HSlots; i++) {
		hval[i] = 0;
		hvalAck[i] = 0;
	}
}

#define MSEC_PER_USEC (1000)
static void
hvalDump(void)
{
	char cbuf[240];
	int i;
	int clen;
	int len;

	logmsg(LOGMSG_INFO, "total msg counts diff %ld queued %lu delivered %lu seperated by usec buckets\n", ((long)msgDelivered - (long)msgQueued), msgQueued, msgDelivered);
	for(i = 0, clen = 0; i < HSlots; i++) {
		len = snprintf(cbuf+clen, sizeof(cbuf) - clen - 1, "%10lld", hvalMax[i]);
		if(len > 0) {
			clen += len;
		}
	}
	logmsg(LOGMSG_INFO,"%s\n", cbuf);
	
	for(i = 0, clen = 0; i < HSlots; i++) {
		len = snprintf(cbuf+clen, sizeof(cbuf) - clen - 1, "%10lld", hval[i]);
		if(len > 0) {
			clen += len;
		}
	}
	logmsg(LOGMSG_INFO,"%s\n", cbuf);

	for(i = 0, clen = 0; i < HSlots; i++) {
		len = snprintf(cbuf+clen, sizeof(cbuf) - clen - 1, "%10lld", hvalAck[i]);
		if(len > 0) {
			clen += len;
		}
	}
	logmsg(LOGMSG_INFO,"%s\n", cbuf);
}

typedef enum Cmd_e {
	CmdTime,
	CmdGeneration,
	CmdConnection,
	CmdMessageCount,
	CmdAck,
}Cmd_e;

typedef struct Cmd_t {
	Cmd_e idx;
	const char *key;
}Cmd_t;

enum {
	CmdTimeTok = 't',
	CmdGenerationTok = 'g',
	CmdConnectionTok = 'c',
	CmdMessageCountTok = 'm',
	CmdAckTok = 'a',
};

enum {
	CmdAckNone = 0x0,
	CmdAckSet = 0x1,
	CmdAckClear = 0x2,
};

const Cmd_t cmds[] = {
	{CmdTime, "tme"},
	{CmdGeneration, "gen"},
	{CmdConnection, "cnx"},
	{CmdMessageCount, "msg"},
	{CmdAck, "ack"},
};

static unsigned long
getLong(char *cp)
{
	char *cval;
	unsigned long lval;
	lval = 0;

	cval = strstr(cp, "=");
	if(cval) {
		cval++;
		lval = strtoul(cval, NULL, 0);
	}
	return lval;
}

/* DeliverCallback()
 * @brief message delivery callback function
 */
static void 
DeliverCallback (
	cpg_handle_t handle,
	const struct cpg_name *groupName,
	uint32_t nodeid,
	uint32_t pid,
	void *msg,
	size_t msg_len)
{
	struct timeval tvbeg;	/**< time started */
	struct timeval tvend;	/**< time end */
	struct timeval tvdelta;	/**< time delta */
	char *cp;
	int cmdIdx;
	char *cval;
	char *cend;
	unsigned long generation; 
	unsigned long connection;
	unsigned long messageCount;
	unsigned long messageAck;
	unsigned long long usecs;
	int i;
	char mbuf[MAX_BUF];	/**< buffer for a message */
	int mlen;
	unsigned long long usecsbeg;
	unsigned long long usecsend;

	messageAck = CmdAckNone;
	gettimeofday(&tvend, NULL);
	msgDelivered ++;
	if(msg_len < sizeof(mbuf)) {
		mlen = msg_len;
	} else {
		mlen = sizeof(mbuf);
	}
	memcpy(mbuf, msg, mlen);

	if(verbose) {
		logmsg(LOGMSG_INFO,"deliver %s usec %llu msg %s\n", groupName->value, usecs, (char *)msg);
	}

	timerclear(&tvbeg);
	timerclear(&tvdelta);

	for(cp = strtok(msg, " "), cmdIdx = 0; cp != NULL; cp = strtok(NULL, " "), cmdIdx++) {
	switch(*cp) {
	case CmdTimeTok:
		cval = strstr(cp, "=");
		if(cval) {
			cval++;
			cend = strstr(cval, ".");
			if(cend) {
				*cend = 0;
				tvbeg.tv_sec = strtoul(cval, NULL, 0);
				cval = cend + 1;
				tvbeg.tv_usec = strtoul(cval, NULL, 0);
			}
		}
		break;
	case CmdGenerationTok:
		generation = getLong(cp);
		break;
	case CmdConnectionTok:
		connection = getLong(cp);
		break;
	case CmdMessageCountTok:
		messageCount = getLong(cp);
		break;
	case CmdAckTok:
		messageAck = getLong(cp);
		break;
	} /* esac */
	} /* rof */

	if(timerisset(&tvbeg)) {
		timersub(&tvend, &tvbeg, &tvdelta);
	}
	/* calculate time difference */
	{
	usecsbeg = tvbeg.tv_sec * SEC_PER_USEC + tvbeg.tv_usec;
	usecsend = tvend.tv_sec * SEC_PER_USEC + tvend.tv_usec;
	if(usecsend > usecsbeg) {
		usecs = usecsend-usecsbeg;
	} else {
		usecs = usecsbeg-usecsend;
	}
	}
	/* register time in range buckets for tracking */
	for(i = 0; i < HSlots && usecs > hvalMax[i]; i++) {
		;
	}
	if(messageAck == CmdAckSet) {
		hval[i]++;
	} else {
		hvalAck[i]++;
	}
	/* log information */
	{
		char tsbuf[128];
		char tebuf[128];
		ctime_r(&tvbeg.tv_sec, tsbuf);
		tsbuf[strlen(tsbuf)-1] = 0; /* remove carriage return */
		ctime_r(&tvend.tv_sec, tebuf);
		tebuf[strlen(tebuf)-1] = 0; /* remove carriage return */
		if(verbose) {
		logmsg(LOGMSG_INFO,"%s delta %ld.%ld beg %s.%lu end %s.%lu\n", 
			groupName->value, tvdelta.tv_sec, tvdelta.tv_usec,
			tsbuf, tvbeg.tv_usec, tebuf, tvend.tv_usec);
		}
	}
	/* if warranted - send a response */
	if(!doSend && (messageAck == CmdAckSet)) {
		struct iovec iov;	/**< output vector */
		size_t iovLen;	/**< length of vector */
		cs_error_t result;
		snprintf(mbuf, sizeof(mbuf), "%s=%lu.%lu %s=%lu %s=%lu %s=%lu %s=%d", 
				cmds[CmdTime].key, tvbeg.tv_sec, tvbeg.tv_usec, 
				cmds[CmdGeneration].key, generation, 
				cmds[CmdConnection].key, connection, 
				cmds[CmdMessageCount].key, messageCount,
				cmds[CmdAck].key, CmdAckClear);
		iov.iov_base = mbuf;
		iov.iov_len = sizeof(mbuf);
		iovLen = sizeof(iov)/sizeof(struct iovec);
		msgQueued++;
		result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, &iov, iovLen);
		if(CS_OK != result) {
			logmsg(LOGMSG_ERROR,"deliverCallback %s cpg mcast joined %d\n", groupName->value, result);
		}
		if(verbose)
			logmsg(LOGMSG_INFO,"post ack %s\n", mbuf);
	}
}

/* ConfigchgCallback() 
 * @brief configuration change callback
 */
static void 
ConfchgCallback (
	cpg_handle_t handle,
	const struct cpg_name *groupName,
	const struct cpg_address *member_list, size_t member_list_entries,
	const struct cpg_address *left_list, size_t left_list_entries,
	const struct cpg_address *joined_list, size_t joined_list_entries)
{
	int i;

	if(verbose)
	logmsg(LOGMSG_INFO,"configchg %s\n", groupName->value);

	for (i=0; i<joined_list_entries; i++) {
		if(verbose)
		logmsg(LOGMSG_INFO,"configchg %s joined %d %d reason: %d\n", groupName->value,
			joined_list[i].nodeid, joined_list[i].pid,
			joined_list[i].reason);
	}
	for (i=0; i<left_list_entries; i++) {
		if(verbose)
		logmsg(LOGMSG_INFO,"configchg %s left %d %d reason: %d\n", groupName->value,
			left_list[i].nodeid, left_list[i].pid,
			left_list[i].reason);
	}
	for (i=0; i<member_list_entries; i++) {
		if(verbose)
		logmsg(LOGMSG_INFO,"configchg %s member %d %d %d\n", groupName->value, i,
			member_list[i].nodeid, member_list[i].pid);
	}

	/* Is it us?? NOTE: in reality we should also check the nodeid */
	if (left_list_entries && left_list[0].pid == getpid()) {
		if(verbose)
		logmsg(LOGMSG_INFO,"configchg %s pid %d left group do finalize\n", groupName->value, getpid());
	}
}

/* @struct callback
 * @brief CPG callback for change in group and message delivery
 */
static cpg_callbacks_t callbacks = {
	.cpg_deliver_fn =            DeliverCallback,
	.cpg_confchg_fn =            ConfchgCallback,
};

/* deltatime() 
 * @brief check change in time and report if over the threshold
 * @param[in] tvbeg beginning timer
 * @param[in] msg message to report on error
 */
static void
deltatime(struct timeval *tvbeg, const char *msg)
{
	struct timeval tvend;	/**< time end */
	struct timeval tvdelta;	/**< time delta */
	int report;

	gettimeofday(&tvend, NULL);
	timersub(&tvend, tvbeg, &tvdelta);
	
	report = 0;
	if(tvdelta.tv_sec < 0 || tvdelta.tv_usec < 0) {
		logmsg(LOGMSG_WARNING,"%s negative delta! %lu.%lu\n", msg, tvdelta.tv_sec, tvdelta.tv_usec);
	}
	if(tvdelta.tv_sec > 0) {
		report = 1;
	}
	if(tvdelta.tv_sec == 0 && tvdelta.tv_usec > maxUsecTolerance) {
		report = 1;
	}
	if(report) {
		logmsg(LOGMSG_WARNING,"%s out of tolerance %lu.%06lu\n", msg, tvdelta.tv_sec, tvdelta.tv_usec);
	}
}

typedef struct SetHandle 
{
	cpg_handle_t handle[MAXFDS];
	int fdlist[MAXFDS];
	int fdnxt;
} SetHandle;

/* newSetHandle() 
 * @brief create a new handle for checking all corosync fds
 */
static SetHandle *
newSetHandle(void)
{
	SetHandle *sh;
	sh = (SetHandle *)malloc(sizeof(SetHandle));
	sh->fdnxt = 0;
	memset(sh->fdlist, -1, sizeof(sh->fdlist));
	memset(sh->handle, -1, sizeof(sh->handle));
	return sh;
}

/* delSetHandle() 
 * @brief free memory of set handle
 * @param[in] handle to all fds
 */
static void
delSetHandle(SetHandle *sh)
{
	memset(sh, 0, sizeof(*sh));
	free(sh);
}

/* addFd() 
 * @brief add a handle in the set of handles to monitor
 * @param[in] handle to all fds
 * @param[in] handle of coro connection to add
 */
static cs_error_t
addFd(SetHandle *sh, cpg_handle_t handle)
{
	cs_error_t result;
	int coFd;
	int i;
	const char *name = "";
	result = cpg_fd_get(handle, &coFd);
	if (coFd < 0) {
		logmsg(LOGMSG_WARNING,"%s invalid cpg fd %d\n", name, result);
		return CS_ERR_NOT_EXIST;
	}
	if (CS_OK != result) {
		logmsg(LOGMSG_WARNING,"%s Could not get cpg fd %d result %d\n", name, coFd, result);
		return result;
	}
	for(i=0; i<sh->fdnxt; i++) {
		if(sh->fdlist[i] == coFd) {
			logmsg(LOGMSG_WARNING,"%s skip adding duplicate fd %d\n", name, coFd);
			return CS_ERR_NOT_EXIST;
		}
	}
	if(sh->fdnxt >= MAXFDS) {
		logmsg(LOGMSG_WARNING,"%s maximum number of fds %d\n", name, MAXFDS);
		return CS_ERR_NOT_EXIST;
	}
	sh->fdlist[sh->fdnxt] = coFd;
	sh->handle[sh->fdnxt] = handle;
	sh->fdnxt++;
	return CS_OK;
}

/* delFd() 
 * @brief delete a handle in the set of handles to monitor
 * @param[in] handle to all fds
 * @param[in] handle of coro connection to remove
 */
static cs_error_t
delFd(SetHandle *sh, cpg_handle_t handle)
{
	cs_error_t result;
	int coFd;
	int i,j;
	const char *name = "";

	result = cpg_fd_get(handle, &coFd);
	if (coFd < 0) {
		logmsg(LOGMSG_WARNING,"%s invalid cpg fd %d\n", name, result);
		return CS_ERR_NOT_EXIST;
	}
	if (CS_OK != result) {
		logmsg(LOGMSG_WARNING,"%s Could not get cpg fd %d\n", name, result);
		return result;
	}
	for(i=0; i<sh->fdnxt; i++) {
		if(sh->fdlist[i] == coFd) {
			sh->fdlist[i] = -1;
			sh->handle[i] = -1;
			for(j=i+1; j<sh->fdnxt; j++) {
				sh->fdlist[j-1] = sh->fdlist[j];
				sh->handle[j-1] = sh->handle[j];
			}
			sh->fdnxt--;
		}
	}
	return CS_OK;
}

/* checkFdSet() 
 * @brief use select to see if there is any inbound data
 * @param[in] handle beginning timer
 * @param[in] name message to report on error
 */
static cs_error_t
checkFdAll(SetHandle *sh, cpg_handle_t handle[], useconds_t timeOut)
{
  cs_error_t result;
  fd_set readFds;
  fd_set writeFds;
  fd_set exceptFds;
  int coFd;
  struct timeval tv;
  int resultSelect;
  int nfds;
  int moreToRead = 1;
  int i;
  struct timeval tvbeg;	/**< time begin */
  struct timeval tvnow;	/**< time now */
  struct timeval tvdelta;	/**< time delta */
  unsigned long long usecDelta;

  gettimeofday(&tvbeg, NULL);
  usecDelta = timeOut;
  while(moreToRead && usecDelta >= 0) {
	moreToRead = 0;
	FD_ZERO (&readFds);
	FD_ZERO (&writeFds);
	FD_ZERO (&exceptFds);
	for(i=0; i<sh->fdnxt; i++) {
		coFd = sh->fdlist[i];
		FD_SET(coFd, &readFds);
		FD_SET(coFd, &exceptFds);
		if(nfds < coFd + 1) {
			nfds = coFd + 1;
		}
	}
	tv.tv_sec = 0;
	if(usecDelta >= 0 && usecDelta < timeOut) {
		tv.tv_usec = usecDelta;
	} else {
		tv.tv_usec = timeOut;
	}
	resultSelect = select (nfds, &readFds, &writeFds, &exceptFds, &tv);
	if (resultSelect == -1) {
		if(verbose)
			logmsg(LOGMSG_INFO,"select error Could not select on file descriptor %d %s\n", coFd, strerror(errno));
		return resultSelect;
	}
	for(i=0; i<sh->fdnxt; i++) {
		coFd = sh->fdlist[i];
		if(verbose)
		logmsg(LOGMSG_INFO,"select read %d write %d except %d\n",
		FD_ISSET(coFd, &readFds),
		FD_ISSET(coFd, &writeFds),
		FD_ISSET(coFd, &exceptFds) );
		if(FD_ISSET(coFd, &readFds) || FD_ISSET(coFd, &writeFds)) {
			if(verbose) logmsg(LOGMSG_INFO,"dispatch\n");
			result = cpg_dispatch(sh->handle[i], CS_DISPATCH_ALL);
			if(CS_OK != result) {
				logmsg(LOGMSG_ERROR,"error cpg dispatch %d\n", result);
				return result;
			}
		}
		if(FD_ISSET(coFd, &readFds)) {
			moreToRead = 1;
		}
		if(FD_ISSET(coFd, &exceptFds)) {
			logmsg(LOGMSG_ERROR,"error exceptfd %d\n", coFd);
			/* XXX remove fd from list to consider */
		}
	} /* rof */
	gettimeofday(&tvnow, NULL);
	timersub(&tvnow, &tvbeg, &tvdelta);
	usecDelta = tvdelta.tv_sec * SEC_PER_USEC + tvdelta.tv_usec;
	if(usecDelta < 0) {
		usecDelta = 0;
	}
	if(usecDelta > timeOut) {
		usecDelta = 0;
	} else {
		usecDelta = timeOut - usecDelta;
	}
	} /* elihw */
	return CS_OK;
}

/* coInit() 
 * @brief initialize the connection to the corosync daemon
 * @param[in] handle beginning timer
 * @param[in] groupNameStr the name of the group
 * @param[in] ctx overall context 
 * @param[in] sh handle to list of all connections
 */
static cs_error_t 
coInit(cpg_handle_t *handle, char *groupNameStr, void *ctx, SetHandle *sh)
{
	cs_error_t result;	/**< result code returned from cpg library calls */
	struct cpg_name groupName;	/**< group name/length structure */
	struct timeval tvbeg;	/**< timing begin */
	uint32_t nodeid;	/**< nodeid defined by library */
	int cofd; /**< file descriptor in use by library for interprocess comms */
	int cnt;	/**< count of trys to join group */
	int maxCnt;	/**< maximum number of try to join */
	int value;	/**< ioctl value */
#ifdef NO_DELAY
	int setarg;	/**< set argument value to setsockopt */
#endif

	if(groupNameStr == NULL) {
		logmsg(LOGMSG_ERROR,"error - null group name not allowed\n");
		_exit(3);
	}

	strncpy(groupName.value, groupNameStr, sizeof(groupName.value));
	groupName.length = strlen(groupNameStr)+1;

	if(NULL_HANDLE != *handle) {
		if(verbose)
			logmsg(LOGMSG_INFO,"%s leave handle %lx\n", groupNameStr, *handle);
		gettimeofday(&tvbeg, NULL);
		result = cpg_leave(*handle, &groupName);
		deltatime(&tvbeg, "cpg_leave");
		if(CS_OK != result) {
			logmsg(LOGMSG_WARNING,"error %s cpg leave %d\n", groupNameStr, result);
		}

		result = cpg_fd_get(*handle, &cofd);
		if (CS_OK != result) {
			logmsg(LOGMSG_WARNING, "error %s Could not get old cpg fd %d\n", groupNameStr, result);
		}
		value = 0;
		if(ioctl(cofd, SIOCOUTQ, &value) < 0) {
			logmsg(LOGMSG_WARNING,"error ioctl(SIOCOUTQ) %s\n", strerror(errno));
		} else {
			if(verbose)
				logmsg(LOGMSG_INFO,"found ioctl(SIOCOUTQ) %d in output queue\n", value);
		}
		gettimeofday(&tvbeg, NULL);
		result = cpg_finalize (*handle);
		deltatime(&tvbeg, "cpg_finalize");
		if(CS_OK != result) {
			logmsg(LOGMSG_WARNING,"error %s finalize failed %d\n", groupNameStr, result);
		}
		*handle = NULL_HANDLE;
		delFd(sh, cofd);
	}
	gettimeofday(&tvbeg, NULL);
	result = cpg_initialize (handle, &callbacks);
	deltatime(&tvbeg, "cpg_initialize");
	if (CS_OK != result) {
		logmsg(LOGMSG_ERROR, "error %s Could not initialize Cluster Process Group API instance %d\n", groupNameStr, result);
		*handle = NULL_HANDLE;
		return result;
	}
	if(verbose)
		logmsg(LOGMSG_INFO, "%s handle %lx\n", groupNameStr, *handle);
	/* propose enhancement of a context per group! */
	gettimeofday(&tvbeg, NULL);
	result = cpg_context_set(*handle, ctx);
	deltatime(&tvbeg, "context_set");
	if (CS_OK != result) {
		logmsg(LOGMSG_ERROR, "error %s Could not set cpg context %d\n", groupNameStr, result);
		return result;
	}
	gettimeofday(&tvbeg, NULL);
	result = cpg_fd_get(*handle, &cofd);
	deltatime(&tvbeg, "cpg_fd_get");
	if (CS_OK != result) {
		logmsg(LOGMSG_WARNING, "error %s Could not get cpg fd %d\n", groupNameStr, result);
		return result;
	}
#ifdef NO_DELAY
	setarg = 1;
	if (setsockopt(cofd, IPPROTO_TCP, TCP_NODELAY, &setarg,
		(socklen_t) sizeof(setarg)) < 0) {
		logmsg(LOGMSG_WARNING,"error setsockopt(NODELAY) %s\n", strerror(errno));
	}
#endif

	gettimeofday(&tvbeg, NULL);
	result = cpg_local_get (*handle, &nodeid);
	deltatime(&tvbeg, "cpg_local_get");
	if (CS_OK != result) {
		logmsg(LOGMSG_WARNING, "error %s Could not get local node id %d\n", groupNameStr, result);
		return result;
	}

	cnt = 0;
	maxCnt = 1;
	do { 
		snprintf(groupName.value, sizeof(groupName.value)-1, "%s", groupNameStr);
		groupName.length = strlen(groupName.value)+1;

		gettimeofday(&tvbeg, NULL);
		result = cpg_join(*handle, &groupName);
		deltatime(&tvbeg, "cpg_join");
		if (CS_OK != result) {
			logmsg(LOGMSG_ERROR, "Error Join %s handle %lx Could not join process group %d\n", groupName.value , *handle, result);
			if(cnt >= 1) {
				usleep(500000);
			}
		}
	} while(CS_ERR_TRY_AGAIN == result && ++cnt < maxCnt);
	if(CS_OK != result) {
		return result;
	}
	if(verbose) 
		logmsg(LOGMSG_INFO, "%s handle %lx fd %d nodeid %x\n", groupNameStr, *handle, cofd, nodeid);

	return CS_OK;
}

/* sigintr_handler()
 * @brief trap signals report statistics and exit to force coverage
 * @param[in] signal number
 */
static void 
sigintr_handler (int signum) {
	printf("sigintr_handler signum %d\n", signum);

	hvalDump();
	logwrap();

	exit (0);
}

/* usage()
 * @brief print usage to stdout
 * @param[in] name of the executable program
 */
const char *options = "c:i:j:l:p:st:u:v"; /**< char listing of options */
static void
usage(const char *pgm)
{
	printf("%s [groupname]\n", pgm);
	printf(" [-c <msgcnt:%d>]         maximum message count\n", msgcnt);
	printf(" [-i <maxIterations:%ld>] maximum iterations\n", maxIterations);
	printf(" [-j <handleCnt:%d>]      maximum handles for process\n", handleCnt);
	printf(" [-l <loglevel:0x%x>]       log level mask\n", loglevel);
	printf(" [-s] %d                  toggle sending mode (0=>off)\n", doSend);
	printf(" [-t <maxUsecTolerance:%ld>] maximum reporting micro seconds\n", maxUsecTolerance);
	printf(" [-u <asleep:%d>]         microseconds to usleep between iterations\n", asleep);
	printf(" [-p <psleep:%d>]         microseconds to usleep post iterations\n", asleep);
	printf(" [-v] %d                  verbose output (0=>off)\n", verbose);
	_exit(0);
}

/* main()
 * @brief test main for stress test and timing of messages
 * @param[in] argc count of arguments
 * @param[in] argv array of arguments, followed by optional group name
 * @return 0 => success
 */
#define MAX_HANDLE (1024)
int 
main (int argc, char *argv[]) 
{
	cpg_handle_t handle[MAX_HANDLE]; /**< opaque handle to corosync communications */
	struct appContext ctx[MAX_HANDLE];
	char groupNameBaseStr[CPG_MAX_NAME_LENGTH]; /**< arbitrary group name */
	cs_error_t result;	/**< return value of library calls */
	struct timeval tvbeg;	/**< timing begin */
	unsigned long i;	/**< iteration counter (simple context tracker) */
	struct iovec iov;	/**< output vector */
	size_t iovLen;	/**< length of vector */
	char mbuf[MAX_BUF];	/**< buffer for a message */
	int opt; /**< option currently parsed */
	int j;	/**< iteration of different handles to corosync */
	int cofd; /**< file descriptor in use by library for interprocess comms */
	int prevfd[MAX_HANDLE]; /**< previous fd for index */
	int k;	/**< iteration on message send */
	int value;	/**< ioctl value */
	sighandler_t prevHandler;	/**< signal handler registration return */
	SetHandle *sh;

	prevHandler = signal (SIGINT, sigintr_handler);
	if(SIG_ERR == prevHandler) {
		perror("error signal handler failed\n");
	}

	loginit();
	hvalClear();
	sh = newSetHandle();

	asleep = (useconds_t)DEFAULT_USEC;
	psleep = (useconds_t)DEFAULT_USEC * 10;
	maxIterations = DEFAULT_ITERATIONS;
	handleCnt = DEFAULT_HANDLE_COUNT;
	msgcnt = DEFAULT_MESSAGE_COUNT;

	while ( (opt = getopt(argc, argv, options)) != -1 ) {
		switch (opt) {
		case 'c':
			msgcnt = strtoul(optarg, NULL, 0);
			break;
		case 'j':
			handleCnt = strtoul(optarg, NULL, 0);
			if(handleCnt > MAX_HANDLE) {
				logmsg(LOGMSG_WARNING,"warning...maximum handles %d\n", MAX_HANDLE);
				handleCnt = MAX_HANDLE;
			}
			break;
		case 'i':
			maxIterations = strtoul(optarg, NULL, 0);
			break;
		case 'l':
			loglevel = strtoul(optarg, NULL, 0);
			break;
		case 'p':
			psleep = strtoul(optarg, NULL, 0);
			break;
		case 's':
			doSend = (doSend+1)%2;
			break;
		case 't':
			maxUsecTolerance = strtoul(optarg, NULL, 0);
			break;
		case 'u':
			asleep = strtoul(optarg, NULL, 0);
			break;
		case 'v':
			verbose = (verbose+1)%2;
			break;
		default:
			usage(argv[0]);
			break;
		}
	}

	if (argc > optind) {
		snprintf(groupNameBaseStr, sizeof(groupNameBaseStr), "%s", argv[optind]);
	} else {
		snprintf(groupNameBaseStr, sizeof(groupNameBaseStr), "%s_groupName", argv[0]);
	}

	/* clear out data structures for test */
	for(j = 0; j < handleCnt; j++) {
		handle[j] = NULL_HANDLE;
		prevfd[j] = -1;
		snprintf(ctx[j].groupNameStr, sizeof(ctx[j].groupNameStr), "%s-%d", groupNameBaseStr, j);
	}
	gettimeofday(&tvbeg, NULL);
	/* iterate through test */
	for(i = 0; i < maxIterations; i++) {
		/* create corosync handles (destroying prior handles) */
		for(j = 0; j < handleCnt; j++) {
			result = coInit(&handle[j], ctx[j].groupNameStr, (void *)ctx, sh);
			if(CS_OK != result) {
				logmsg(LOGMSG_WARNING,"%s failed to init iteration %lu/%lu\n",ctx[j].groupNameStr, i, maxIterations);
				ctx[j].coInitOk = 0;
				continue;
			}
			ctx[j].coInitOk = 1;
			result = cpg_fd_get(handle[j], &cofd);
			if (CS_OK != result) {
				logmsg(LOGMSG_WARNING, "error %s Could not get cpg fd %d\n",ctx[j].groupNameStr, result);
				continue;
			}
			if(prevfd[j] != -1 && cofd != prevfd[j]) {
				logmsg(LOGMSG_WARNING,"failed %s %lu/%d/%d leaking fd now %d previously %d\n",ctx[j].groupNameStr, i, j, k, cofd, prevfd[j]);
			}
			prevfd[j] = cofd;
			addFd(sh, handle[j]);
#ifdef EXTRA_DISPATCH
			result = cpg_dispatch(handle[j], CS_DISPATCH_ALL);
			if(CS_OK != result) {
				logmsg(LOGMSG_WARNING,"error %s cpg dispatch %d\n",ctx[j].groupNameStr, result);
				ctx[j].coInitOk = 0;
			}
#endif
		}
		/* receiver simply waits some specified time for messages */
		if(!doSend) {
			result = checkFdAll(sh, handle, asleep);
			continue;
		}

		/* sender creates and sends messages across connections */
		for(k = 0; k < msgcnt; k++) {
			for(j = 0; j < handleCnt; j++) {
				gettimeofday(&tvbeg, NULL);
				snprintf(mbuf, sizeof(mbuf), "%s=%lu.%lu %s=%lu %s=%d %s=%d %s=%d", 
					cmds[CmdTime].key, tvbeg.tv_sec, tvbeg.tv_usec, 
					cmds[CmdGeneration].key, i, 
					cmds[CmdConnection].key, j, 
					cmds[CmdMessageCount].key, k,
					cmds[CmdAck].key, CmdAckSet);
				iov.iov_base = mbuf;
				iov.iov_len = sizeof(mbuf);
				iovLen = sizeof(iov)/sizeof(struct iovec);
				msgQueued++;
				result = cpg_mcast_joined(handle[j], CPG_TYPE_AGREED, &iov, iovLen);
				if(CS_OK != result) {
					logmsg(LOGMSG_ERROR,"error %s %lu/%d/%d cpg mcast joined %d\n", ctx[j].groupNameStr, i, j, k, result);
					result = coInit(&handle[j],ctx[j].groupNameStr, (void *)ctx, sh);
					if(CS_OK != result) {
						ctx[j].coInitOk = 0;
						logmsg(LOGMSG_ERROR,"%s failed to init iteration %lu/%lu\n",ctx[j].groupNameStr, i, maxIterations);
					}
					continue;
				} else {
					char tbuf[128];
					ctime_r(&tvbeg.tv_sec, tbuf);
					tbuf[strlen(tbuf)-1] = 0; /* remove carriage return */
					if(verbose) 
						logmsg(LOGMSG_INFO,"queued msg %d %d stime %ld.%ld %s.%ld\n", j, k, tvbeg.tv_sec, tvbeg.tv_usec, tbuf, tvbeg.tv_usec);
				}

				if(verbose) logmsg(LOGMSG_INFO,"dispatch messages %d\n", j);
				/* send a message */
				result = cpg_dispatch(handle[j], CS_DISPATCH_ALL);
				if(CS_OK != result) {
					logmsg(LOGMSG_WARNING,"error %s cpg main dispatch %d\n",ctx[j].groupNameStr, result);
					ctx[j].coInitOk = 0;
					result = coInit(&handle[j], ctx[j].groupNameStr, (void *)ctx, sh);
					if(CS_OK != result) {
						logmsg(LOGMSG_ERROR,"%s failed to init iteration %lu/%lu\n",ctx[j].groupNameStr, i, maxIterations);
					}
					ctx[j].coInitOk = 1;
					continue;
				}
			}
		}
		/* now that all the messages were sent in the first round */
		result = checkFdAll(sh, handle, asleep);
	}
	deltatime(&tvbeg, "post all iterations");
	result = checkFdAll(sh, handle, psleep);
	deltatime(&tvbeg, "post final sleep");

	for(j = 0; j < handleCnt; j++) {
		struct cpg_name groupName;	/**< group name/length structure */
		strncpy(groupName.value,ctx[j].groupNameStr, sizeof(groupName.value));
		groupName.length = strlen(ctx[j].groupNameStr)+1;
		gettimeofday(&tvbeg, NULL);
		result = cpg_leave(handle[j], &groupName);
		deltatime(&tvbeg, "cpg_leave");
		if(CS_OK != result) {
			logmsg(LOGMSG_WARNING,"error %s cpg leave %d\n",ctx[j].groupNameStr, result);
		}
		result = cpg_dispatch(handle[j], CS_DISPATCH_ALL);
		if(CS_OK != result) {
			logmsg(LOGMSG_WARNING,"error %s cpg dispatch %d\n",ctx[j].groupNameStr, result);
		}
		value = 0;
		if(ioctl(cofd, SIOCOUTQ, &value) < 0) {
			logmsg(LOGMSG_WARNING,"error ioctl(SIOCOUTQ) %s\n", strerror(errno));
		} else {
			if(verbose)
			logmsg(LOGMSG_INFO,"ioctl(SIOCOUTQ) %d\n", value);
		}
		result = cpg_finalize (handle[j]);
		if(CS_OK != result) {
			logmsg(LOGMSG_WARNING,"finalize failed %d\n", result);
			continue;
		}
	}

	hvalDump();
	logwrap();
	delSetHandle(sh);

	return (0);
}

Attachment: Makefile.co
Description: Binary data

_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to