Hi Gentle Readers! Is round trip latency expected to degrade linearly as more receivers are added to a system given an equivalent number of messages sent?
The attached test application attempts to capture the situation. The basic setup is to start N receivers on 1-N different nodes, then run a single sender which tracks the results in a log file on the system distributing delays across multiple buckets. Note when changing the number of receivers to result in a similar number of messages it is necessary to modify the message count of the sender. For example: Starting receivers with a simple script: #!/bin/sh cnt=1 maxcnt=16 host=`hostname -s` echo $host while (($cnt <= $maxcnt)) do corestart -s -c 1 -i 1 -j 2 -p 30000000 ((cnt++)) done And then start a single sender: corestart -c 100 -i 1 -j 2 -p 1000000 By varying the number of receivers (and the associated message count) the round trip latency degrades with the number of receivers in a non-linear relationship. How are your results for 1-8 node systems? Any insights as to what may cause this anomaly, including a poorly written test application :)! thanks, dan ps. there are several other parameters that allow this test application to exercise some interesting aspects of the library., such as running the number if iterations up (-i) to test initialize/finalize logic.
/*
* @file corestart.c
* test application to utilize multiple connections, groups and sends
* and measure end to end round trip between multiple instances.
* For example:
* A test with a single sender of m messages and N multiple receivers
* results (m)*(N+1) messages received by each instance.
* Due to differences in clocks between multiple nodes, the latency
* statistics may only be valid for round trip time stamps
* (sender checking origination time stamp against current time
* when receiving messages reflected back from receivers).
*/
#include <stdio.h>
#include <stdarg.h>
#include <unistd.h>
#include <signal.h>
#include <sys/time.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <sys/ioctl.h>
#include <linux/sockios.h>
#include <syslog.h>
#include <errno.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <corosync/corotypes.h>
#include <corosync/cpg.h>
#define DEFAULT_ITERATIONS ((unsigned long)2)
#define MAX_BUF ((size_t)128)
#define DEFAULT_USEC ((useconds_t)1000000)
#define SEC_PER_USEC (1000000)
#define DEFAULT_HANDLE_COUNT (2)
#define DEFAULT_MESSAGE_COUNT (10)
#define NULL_HANDLE ((cpg_handle_t)0)
#define MAXFDS (256)
static int verbose = 0; /**< noisy print of progress to stdout */
static suseconds_t maxUsecTolerance = 500; /**< maximum tolerance in usecs for library call */
int msgcnt; /**< message count */
unsigned long maxIterations; /**< maximum iterations in loop */
int handleCnt; /**< maximum count of handles */
static unsigned long msgQueued = 0;
static unsigned long msgDelivered = 0;
static useconds_t asleep; /**< microseconds to sleep between iterations */
static useconds_t psleep; /**< microseconds to sleep post iterations */
static int doSend = 1; /**< enable sending of messages */
typedef void (*sighandler_t)(int);
static void sigintr_handler (int signum) __attribute__((__noreturn__));
static char logname[] = "corestart";
enum loglevel_s
{
LOGMSG_EMERG = LOG_EMERG,
LOGMSG_ALERT = LOG_ALERT,
LOGMSG_CRIT = LOG_CRIT,
LOGMSG_ERROR = LOG_ERR,
LOGMSG_WARNING = LOG_WARNING,
LOGMSG_NOTICE = LOG_NOTICE,
LOGMSG_INFO = LOG_INFO,
LOGMSG_DEBUG = LOG_DEBUG,
};
typedef enum loglevel_s loglevel_t;
#define DefaultLogLevel (LOGMSG_INFO)
static loglevel_t loglevel = DefaultLogLevel;
static char logpath[] = "/var/log/";
static int logfd = -1;
/* logmsg()
* @brief log all messages below designated logging level
* @param[in] level, the logging level of the message
* @param[in] fmt, printf style formatting
* @param[in] varargs, arguments based on arguments in fmt
*/
static void
__attribute__ ((format (printf, 2, 3)))
logmsg( const loglevel_t level, const char *fmt, ...)
{
va_list ap;
int len;
char lbuf[1024];
if(level > loglevel) {
return;
}
va_start(ap, fmt);
if(logfd >= 0) {
vsnprintf(lbuf, sizeof(lbuf)-2, fmt, ap);
len = write(logfd, lbuf, strlen(lbuf));
if(len < 0) {
printf("%s", lbuf);
}
}
#ifdef USE_SYSLOG
vsyslog((int)level, fmt, ap);
#endif
va_end(ap);
return;
}
static void
loginit(void)
{
char logfile[PATH_MAX];
snprintf(logfile, sizeof(logfile), "%s%s", logpath, logname);
#ifdef USE_SYSLOG
openlog(logname, LOG_NDELAY, LOG_USER);
#endif
if( (logfd = open(logfile, O_WRONLY|O_APPEND|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH)) < 0 )
{
printf("failed to open logfile %s\n", logfile);
}
}
static void
logwrap(void)
{
#ifdef USE_SYSLOG
closelog();
#endif
if(logfd >= 0) {
if(close(logfd) < 0) {
printf("failed to close logfile %s", strerror(errno));
}
logfd = -1;
}
}
/* @struct context
* @brief context for group
*/
typedef struct appContext {
int idx; /**< simple integer index of connection */
int fd; /**< file descriptor of connection */
char groupNameStr[CPG_MAX_NAME_LENGTH]; /**< arbitrary group name */
int coInitOk; /**< handle ok for communcations */
}appContext;
unsigned long long hvalMax[] = {
50, 100, 200, 300, 400, 500, 1000, 10000, 25000, 50000, 75000, 100000, 250000, 500000, 750000, 1000000, 2000000, 0xffffffffffffffff
};
#define HSlots ((sizeof (hvalMax))/sizeof(hvalMax[0]))
static unsigned long long hval[HSlots];
static unsigned long long hvalAck[HSlots];
static void
hvalClear(void)
{
int i;
for(i = 0; i < HSlots; i++) {
hval[i] = 0;
hvalAck[i] = 0;
}
}
#define MSEC_PER_USEC (1000)
static void
hvalDump(void)
{
char cbuf[240];
int i;
int clen;
int len;
logmsg(LOGMSG_INFO, "total msg counts diff %ld queued %lu delivered %lu seperated by usec buckets\n", ((long)msgDelivered - (long)msgQueued), msgQueued, msgDelivered);
for(i = 0, clen = 0; i < HSlots; i++) {
len = snprintf(cbuf+clen, sizeof(cbuf) - clen - 1, "%10lld", hvalMax[i]);
if(len > 0) {
clen += len;
}
}
logmsg(LOGMSG_INFO,"%s\n", cbuf);
for(i = 0, clen = 0; i < HSlots; i++) {
len = snprintf(cbuf+clen, sizeof(cbuf) - clen - 1, "%10lld", hval[i]);
if(len > 0) {
clen += len;
}
}
logmsg(LOGMSG_INFO,"%s\n", cbuf);
for(i = 0, clen = 0; i < HSlots; i++) {
len = snprintf(cbuf+clen, sizeof(cbuf) - clen - 1, "%10lld", hvalAck[i]);
if(len > 0) {
clen += len;
}
}
logmsg(LOGMSG_INFO,"%s\n", cbuf);
}
typedef enum Cmd_e {
CmdTime,
CmdGeneration,
CmdConnection,
CmdMessageCount,
CmdAck,
}Cmd_e;
typedef struct Cmd_t {
Cmd_e idx;
const char *key;
}Cmd_t;
enum {
CmdTimeTok = 't',
CmdGenerationTok = 'g',
CmdConnectionTok = 'c',
CmdMessageCountTok = 'm',
CmdAckTok = 'a',
};
enum {
CmdAckNone = 0x0,
CmdAckSet = 0x1,
CmdAckClear = 0x2,
};
const Cmd_t cmds[] = {
{CmdTime, "tme"},
{CmdGeneration, "gen"},
{CmdConnection, "cnx"},
{CmdMessageCount, "msg"},
{CmdAck, "ack"},
};
static unsigned long
getLong(char *cp)
{
char *cval;
unsigned long lval;
lval = 0;
cval = strstr(cp, "=");
if(cval) {
cval++;
lval = strtoul(cval, NULL, 0);
}
return lval;
}
/* DeliverCallback()
* @brief message delivery callback function
*/
static void
DeliverCallback (
cpg_handle_t handle,
const struct cpg_name *groupName,
uint32_t nodeid,
uint32_t pid,
void *msg,
size_t msg_len)
{
struct timeval tvbeg; /**< time started */
struct timeval tvend; /**< time end */
struct timeval tvdelta; /**< time delta */
char *cp;
int cmdIdx;
char *cval;
char *cend;
unsigned long generation;
unsigned long connection;
unsigned long messageCount;
unsigned long messageAck;
unsigned long long usecs;
int i;
char mbuf[MAX_BUF]; /**< buffer for a message */
int mlen;
unsigned long long usecsbeg;
unsigned long long usecsend;
messageAck = CmdAckNone;
gettimeofday(&tvend, NULL);
msgDelivered ++;
if(msg_len < sizeof(mbuf)) {
mlen = msg_len;
} else {
mlen = sizeof(mbuf);
}
memcpy(mbuf, msg, mlen);
if(verbose) {
logmsg(LOGMSG_INFO,"deliver %s usec %llu msg %s\n", groupName->value, usecs, (char *)msg);
}
timerclear(&tvbeg);
timerclear(&tvdelta);
for(cp = strtok(msg, " "), cmdIdx = 0; cp != NULL; cp = strtok(NULL, " "), cmdIdx++) {
switch(*cp) {
case CmdTimeTok:
cval = strstr(cp, "=");
if(cval) {
cval++;
cend = strstr(cval, ".");
if(cend) {
*cend = 0;
tvbeg.tv_sec = strtoul(cval, NULL, 0);
cval = cend + 1;
tvbeg.tv_usec = strtoul(cval, NULL, 0);
}
}
break;
case CmdGenerationTok:
generation = getLong(cp);
break;
case CmdConnectionTok:
connection = getLong(cp);
break;
case CmdMessageCountTok:
messageCount = getLong(cp);
break;
case CmdAckTok:
messageAck = getLong(cp);
break;
} /* esac */
} /* rof */
if(timerisset(&tvbeg)) {
timersub(&tvend, &tvbeg, &tvdelta);
}
/* calculate time difference */
{
usecsbeg = tvbeg.tv_sec * SEC_PER_USEC + tvbeg.tv_usec;
usecsend = tvend.tv_sec * SEC_PER_USEC + tvend.tv_usec;
if(usecsend > usecsbeg) {
usecs = usecsend-usecsbeg;
} else {
usecs = usecsbeg-usecsend;
}
}
/* register time in range buckets for tracking */
for(i = 0; i < HSlots && usecs > hvalMax[i]; i++) {
;
}
if(messageAck == CmdAckSet) {
hval[i]++;
} else {
hvalAck[i]++;
}
/* log information */
{
char tsbuf[128];
char tebuf[128];
ctime_r(&tvbeg.tv_sec, tsbuf);
tsbuf[strlen(tsbuf)-1] = 0; /* remove carriage return */
ctime_r(&tvend.tv_sec, tebuf);
tebuf[strlen(tebuf)-1] = 0; /* remove carriage return */
if(verbose) {
logmsg(LOGMSG_INFO,"%s delta %ld.%ld beg %s.%lu end %s.%lu\n",
groupName->value, tvdelta.tv_sec, tvdelta.tv_usec,
tsbuf, tvbeg.tv_usec, tebuf, tvend.tv_usec);
}
}
/* if warranted - send a response */
if(!doSend && (messageAck == CmdAckSet)) {
struct iovec iov; /**< output vector */
size_t iovLen; /**< length of vector */
cs_error_t result;
snprintf(mbuf, sizeof(mbuf), "%s=%lu.%lu %s=%lu %s=%lu %s=%lu %s=%d",
cmds[CmdTime].key, tvbeg.tv_sec, tvbeg.tv_usec,
cmds[CmdGeneration].key, generation,
cmds[CmdConnection].key, connection,
cmds[CmdMessageCount].key, messageCount,
cmds[CmdAck].key, CmdAckClear);
iov.iov_base = mbuf;
iov.iov_len = sizeof(mbuf);
iovLen = sizeof(iov)/sizeof(struct iovec);
msgQueued++;
result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, &iov, iovLen);
if(CS_OK != result) {
logmsg(LOGMSG_ERROR,"deliverCallback %s cpg mcast joined %d\n", groupName->value, result);
}
if(verbose)
logmsg(LOGMSG_INFO,"post ack %s\n", mbuf);
}
}
/* ConfigchgCallback()
* @brief configuration change callback
*/
static void
ConfchgCallback (
cpg_handle_t handle,
const struct cpg_name *groupName,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
const struct cpg_address *joined_list, size_t joined_list_entries)
{
int i;
if(verbose)
logmsg(LOGMSG_INFO,"configchg %s\n", groupName->value);
for (i=0; i<joined_list_entries; i++) {
if(verbose)
logmsg(LOGMSG_INFO,"configchg %s joined %d %d reason: %d\n", groupName->value,
joined_list[i].nodeid, joined_list[i].pid,
joined_list[i].reason);
}
for (i=0; i<left_list_entries; i++) {
if(verbose)
logmsg(LOGMSG_INFO,"configchg %s left %d %d reason: %d\n", groupName->value,
left_list[i].nodeid, left_list[i].pid,
left_list[i].reason);
}
for (i=0; i<member_list_entries; i++) {
if(verbose)
logmsg(LOGMSG_INFO,"configchg %s member %d %d %d\n", groupName->value, i,
member_list[i].nodeid, member_list[i].pid);
}
/* Is it us?? NOTE: in reality we should also check the nodeid */
if (left_list_entries && left_list[0].pid == getpid()) {
if(verbose)
logmsg(LOGMSG_INFO,"configchg %s pid %d left group do finalize\n", groupName->value, getpid());
}
}
/* @struct callback
* @brief CPG callback for change in group and message delivery
*/
static cpg_callbacks_t callbacks = {
.cpg_deliver_fn = DeliverCallback,
.cpg_confchg_fn = ConfchgCallback,
};
/* deltatime()
* @brief check change in time and report if over the threshold
* @param[in] tvbeg beginning timer
* @param[in] msg message to report on error
*/
static void
deltatime(struct timeval *tvbeg, const char *msg)
{
struct timeval tvend; /**< time end */
struct timeval tvdelta; /**< time delta */
int report;
gettimeofday(&tvend, NULL);
timersub(&tvend, tvbeg, &tvdelta);
report = 0;
if(tvdelta.tv_sec < 0 || tvdelta.tv_usec < 0) {
logmsg(LOGMSG_WARNING,"%s negative delta! %lu.%lu\n", msg, tvdelta.tv_sec, tvdelta.tv_usec);
}
if(tvdelta.tv_sec > 0) {
report = 1;
}
if(tvdelta.tv_sec == 0 && tvdelta.tv_usec > maxUsecTolerance) {
report = 1;
}
if(report) {
logmsg(LOGMSG_WARNING,"%s out of tolerance %lu.%06lu\n", msg, tvdelta.tv_sec, tvdelta.tv_usec);
}
}
typedef struct SetHandle
{
cpg_handle_t handle[MAXFDS];
int fdlist[MAXFDS];
int fdnxt;
} SetHandle;
/* newSetHandle()
* @brief create a new handle for checking all corosync fds
*/
static SetHandle *
newSetHandle(void)
{
SetHandle *sh;
sh = (SetHandle *)malloc(sizeof(SetHandle));
sh->fdnxt = 0;
memset(sh->fdlist, -1, sizeof(sh->fdlist));
memset(sh->handle, -1, sizeof(sh->handle));
return sh;
}
/* delSetHandle()
* @brief free memory of set handle
* @param[in] handle to all fds
*/
static void
delSetHandle(SetHandle *sh)
{
memset(sh, 0, sizeof(*sh));
free(sh);
}
/* addFd()
* @brief add a handle in the set of handles to monitor
* @param[in] handle to all fds
* @param[in] handle of coro connection to add
*/
static cs_error_t
addFd(SetHandle *sh, cpg_handle_t handle)
{
cs_error_t result;
int coFd;
int i;
const char *name = "";
result = cpg_fd_get(handle, &coFd);
if (coFd < 0) {
logmsg(LOGMSG_WARNING,"%s invalid cpg fd %d\n", name, result);
return CS_ERR_NOT_EXIST;
}
if (CS_OK != result) {
logmsg(LOGMSG_WARNING,"%s Could not get cpg fd %d result %d\n", name, coFd, result);
return result;
}
for(i=0; i<sh->fdnxt; i++) {
if(sh->fdlist[i] == coFd) {
logmsg(LOGMSG_WARNING,"%s skip adding duplicate fd %d\n", name, coFd);
return CS_ERR_NOT_EXIST;
}
}
if(sh->fdnxt >= MAXFDS) {
logmsg(LOGMSG_WARNING,"%s maximum number of fds %d\n", name, MAXFDS);
return CS_ERR_NOT_EXIST;
}
sh->fdlist[sh->fdnxt] = coFd;
sh->handle[sh->fdnxt] = handle;
sh->fdnxt++;
return CS_OK;
}
/* delFd()
* @brief delete a handle in the set of handles to monitor
* @param[in] handle to all fds
* @param[in] handle of coro connection to remove
*/
static cs_error_t
delFd(SetHandle *sh, cpg_handle_t handle)
{
cs_error_t result;
int coFd;
int i,j;
const char *name = "";
result = cpg_fd_get(handle, &coFd);
if (coFd < 0) {
logmsg(LOGMSG_WARNING,"%s invalid cpg fd %d\n", name, result);
return CS_ERR_NOT_EXIST;
}
if (CS_OK != result) {
logmsg(LOGMSG_WARNING,"%s Could not get cpg fd %d\n", name, result);
return result;
}
for(i=0; i<sh->fdnxt; i++) {
if(sh->fdlist[i] == coFd) {
sh->fdlist[i] = -1;
sh->handle[i] = -1;
for(j=i+1; j<sh->fdnxt; j++) {
sh->fdlist[j-1] = sh->fdlist[j];
sh->handle[j-1] = sh->handle[j];
}
sh->fdnxt--;
}
}
return CS_OK;
}
/* checkFdSet()
* @brief use select to see if there is any inbound data
* @param[in] handle beginning timer
* @param[in] name message to report on error
*/
static cs_error_t
checkFdAll(SetHandle *sh, cpg_handle_t handle[], useconds_t timeOut)
{
cs_error_t result;
fd_set readFds;
fd_set writeFds;
fd_set exceptFds;
int coFd;
struct timeval tv;
int resultSelect;
int nfds;
int moreToRead = 1;
int i;
struct timeval tvbeg; /**< time begin */
struct timeval tvnow; /**< time now */
struct timeval tvdelta; /**< time delta */
unsigned long long usecDelta;
gettimeofday(&tvbeg, NULL);
usecDelta = timeOut;
while(moreToRead && usecDelta >= 0) {
moreToRead = 0;
FD_ZERO (&readFds);
FD_ZERO (&writeFds);
FD_ZERO (&exceptFds);
for(i=0; i<sh->fdnxt; i++) {
coFd = sh->fdlist[i];
FD_SET(coFd, &readFds);
FD_SET(coFd, &exceptFds);
if(nfds < coFd + 1) {
nfds = coFd + 1;
}
}
tv.tv_sec = 0;
if(usecDelta >= 0 && usecDelta < timeOut) {
tv.tv_usec = usecDelta;
} else {
tv.tv_usec = timeOut;
}
resultSelect = select (nfds, &readFds, &writeFds, &exceptFds, &tv);
if (resultSelect == -1) {
if(verbose)
logmsg(LOGMSG_INFO,"select error Could not select on file descriptor %d %s\n", coFd, strerror(errno));
return resultSelect;
}
for(i=0; i<sh->fdnxt; i++) {
coFd = sh->fdlist[i];
if(verbose)
logmsg(LOGMSG_INFO,"select read %d write %d except %d\n",
FD_ISSET(coFd, &readFds),
FD_ISSET(coFd, &writeFds),
FD_ISSET(coFd, &exceptFds) );
if(FD_ISSET(coFd, &readFds) || FD_ISSET(coFd, &writeFds)) {
if(verbose) logmsg(LOGMSG_INFO,"dispatch\n");
result = cpg_dispatch(sh->handle[i], CS_DISPATCH_ALL);
if(CS_OK != result) {
logmsg(LOGMSG_ERROR,"error cpg dispatch %d\n", result);
return result;
}
}
if(FD_ISSET(coFd, &readFds)) {
moreToRead = 1;
}
if(FD_ISSET(coFd, &exceptFds)) {
logmsg(LOGMSG_ERROR,"error exceptfd %d\n", coFd);
/* XXX remove fd from list to consider */
}
} /* rof */
gettimeofday(&tvnow, NULL);
timersub(&tvnow, &tvbeg, &tvdelta);
usecDelta = tvdelta.tv_sec * SEC_PER_USEC + tvdelta.tv_usec;
if(usecDelta < 0) {
usecDelta = 0;
}
if(usecDelta > timeOut) {
usecDelta = 0;
} else {
usecDelta = timeOut - usecDelta;
}
} /* elihw */
return CS_OK;
}
/* coInit()
* @brief initialize the connection to the corosync daemon
* @param[in] handle beginning timer
* @param[in] groupNameStr the name of the group
* @param[in] ctx overall context
* @param[in] sh handle to list of all connections
*/
static cs_error_t
coInit(cpg_handle_t *handle, char *groupNameStr, void *ctx, SetHandle *sh)
{
cs_error_t result; /**< result code returned from cpg library calls */
struct cpg_name groupName; /**< group name/length structure */
struct timeval tvbeg; /**< timing begin */
uint32_t nodeid; /**< nodeid defined by library */
int cofd; /**< file descriptor in use by library for interprocess comms */
int cnt; /**< count of trys to join group */
int maxCnt; /**< maximum number of try to join */
int value; /**< ioctl value */
#ifdef NO_DELAY
int setarg; /**< set argument value to setsockopt */
#endif
if(groupNameStr == NULL) {
logmsg(LOGMSG_ERROR,"error - null group name not allowed\n");
_exit(3);
}
strncpy(groupName.value, groupNameStr, sizeof(groupName.value));
groupName.length = strlen(groupNameStr)+1;
if(NULL_HANDLE != *handle) {
if(verbose)
logmsg(LOGMSG_INFO,"%s leave handle %lx\n", groupNameStr, *handle);
gettimeofday(&tvbeg, NULL);
result = cpg_leave(*handle, &groupName);
deltatime(&tvbeg, "cpg_leave");
if(CS_OK != result) {
logmsg(LOGMSG_WARNING,"error %s cpg leave %d\n", groupNameStr, result);
}
result = cpg_fd_get(*handle, &cofd);
if (CS_OK != result) {
logmsg(LOGMSG_WARNING, "error %s Could not get old cpg fd %d\n", groupNameStr, result);
}
value = 0;
if(ioctl(cofd, SIOCOUTQ, &value) < 0) {
logmsg(LOGMSG_WARNING,"error ioctl(SIOCOUTQ) %s\n", strerror(errno));
} else {
if(verbose)
logmsg(LOGMSG_INFO,"found ioctl(SIOCOUTQ) %d in output queue\n", value);
}
gettimeofday(&tvbeg, NULL);
result = cpg_finalize (*handle);
deltatime(&tvbeg, "cpg_finalize");
if(CS_OK != result) {
logmsg(LOGMSG_WARNING,"error %s finalize failed %d\n", groupNameStr, result);
}
*handle = NULL_HANDLE;
delFd(sh, cofd);
}
gettimeofday(&tvbeg, NULL);
result = cpg_initialize (handle, &callbacks);
deltatime(&tvbeg, "cpg_initialize");
if (CS_OK != result) {
logmsg(LOGMSG_ERROR, "error %s Could not initialize Cluster Process Group API instance %d\n", groupNameStr, result);
*handle = NULL_HANDLE;
return result;
}
if(verbose)
logmsg(LOGMSG_INFO, "%s handle %lx\n", groupNameStr, *handle);
/* propose enhancement of a context per group! */
gettimeofday(&tvbeg, NULL);
result = cpg_context_set(*handle, ctx);
deltatime(&tvbeg, "context_set");
if (CS_OK != result) {
logmsg(LOGMSG_ERROR, "error %s Could not set cpg context %d\n", groupNameStr, result);
return result;
}
gettimeofday(&tvbeg, NULL);
result = cpg_fd_get(*handle, &cofd);
deltatime(&tvbeg, "cpg_fd_get");
if (CS_OK != result) {
logmsg(LOGMSG_WARNING, "error %s Could not get cpg fd %d\n", groupNameStr, result);
return result;
}
#ifdef NO_DELAY
setarg = 1;
if (setsockopt(cofd, IPPROTO_TCP, TCP_NODELAY, &setarg,
(socklen_t) sizeof(setarg)) < 0) {
logmsg(LOGMSG_WARNING,"error setsockopt(NODELAY) %s\n", strerror(errno));
}
#endif
gettimeofday(&tvbeg, NULL);
result = cpg_local_get (*handle, &nodeid);
deltatime(&tvbeg, "cpg_local_get");
if (CS_OK != result) {
logmsg(LOGMSG_WARNING, "error %s Could not get local node id %d\n", groupNameStr, result);
return result;
}
cnt = 0;
maxCnt = 1;
do {
snprintf(groupName.value, sizeof(groupName.value)-1, "%s", groupNameStr);
groupName.length = strlen(groupName.value)+1;
gettimeofday(&tvbeg, NULL);
result = cpg_join(*handle, &groupName);
deltatime(&tvbeg, "cpg_join");
if (CS_OK != result) {
logmsg(LOGMSG_ERROR, "Error Join %s handle %lx Could not join process group %d\n", groupName.value , *handle, result);
if(cnt >= 1) {
usleep(500000);
}
}
} while(CS_ERR_TRY_AGAIN == result && ++cnt < maxCnt);
if(CS_OK != result) {
return result;
}
if(verbose)
logmsg(LOGMSG_INFO, "%s handle %lx fd %d nodeid %x\n", groupNameStr, *handle, cofd, nodeid);
return CS_OK;
}
/* sigintr_handler()
* @brief trap signals report statistics and exit to force coverage
* @param[in] signal number
*/
static void
sigintr_handler (int signum) {
printf("sigintr_handler signum %d\n", signum);
hvalDump();
logwrap();
exit (0);
}
/* usage()
* @brief print usage to stdout
* @param[in] name of the executable program
*/
const char *options = "c:i:j:l:p:st:u:v"; /**< char listing of options */
static void
usage(const char *pgm)
{
printf("%s [groupname]\n", pgm);
printf(" [-c <msgcnt:%d>] maximum message count\n", msgcnt);
printf(" [-i <maxIterations:%ld>] maximum iterations\n", maxIterations);
printf(" [-j <handleCnt:%d>] maximum handles for process\n", handleCnt);
printf(" [-l <loglevel:0x%x>] log level mask\n", loglevel);
printf(" [-s] %d toggle sending mode (0=>off)\n", doSend);
printf(" [-t <maxUsecTolerance:%ld>] maximum reporting micro seconds\n", maxUsecTolerance);
printf(" [-u <asleep:%d>] microseconds to usleep between iterations\n", asleep);
printf(" [-p <psleep:%d>] microseconds to usleep post iterations\n", asleep);
printf(" [-v] %d verbose output (0=>off)\n", verbose);
_exit(0);
}
/* main()
* @brief test main for stress test and timing of messages
* @param[in] argc count of arguments
* @param[in] argv array of arguments, followed by optional group name
* @return 0 => success
*/
#define MAX_HANDLE (1024)
int
main (int argc, char *argv[])
{
cpg_handle_t handle[MAX_HANDLE]; /**< opaque handle to corosync communications */
struct appContext ctx[MAX_HANDLE];
char groupNameBaseStr[CPG_MAX_NAME_LENGTH]; /**< arbitrary group name */
cs_error_t result; /**< return value of library calls */
struct timeval tvbeg; /**< timing begin */
unsigned long i; /**< iteration counter (simple context tracker) */
struct iovec iov; /**< output vector */
size_t iovLen; /**< length of vector */
char mbuf[MAX_BUF]; /**< buffer for a message */
int opt; /**< option currently parsed */
int j; /**< iteration of different handles to corosync */
int cofd; /**< file descriptor in use by library for interprocess comms */
int prevfd[MAX_HANDLE]; /**< previous fd for index */
int k; /**< iteration on message send */
int value; /**< ioctl value */
sighandler_t prevHandler; /**< signal handler registration return */
SetHandle *sh;
prevHandler = signal (SIGINT, sigintr_handler);
if(SIG_ERR == prevHandler) {
perror("error signal handler failed\n");
}
loginit();
hvalClear();
sh = newSetHandle();
asleep = (useconds_t)DEFAULT_USEC;
psleep = (useconds_t)DEFAULT_USEC * 10;
maxIterations = DEFAULT_ITERATIONS;
handleCnt = DEFAULT_HANDLE_COUNT;
msgcnt = DEFAULT_MESSAGE_COUNT;
while ( (opt = getopt(argc, argv, options)) != -1 ) {
switch (opt) {
case 'c':
msgcnt = strtoul(optarg, NULL, 0);
break;
case 'j':
handleCnt = strtoul(optarg, NULL, 0);
if(handleCnt > MAX_HANDLE) {
logmsg(LOGMSG_WARNING,"warning...maximum handles %d\n", MAX_HANDLE);
handleCnt = MAX_HANDLE;
}
break;
case 'i':
maxIterations = strtoul(optarg, NULL, 0);
break;
case 'l':
loglevel = strtoul(optarg, NULL, 0);
break;
case 'p':
psleep = strtoul(optarg, NULL, 0);
break;
case 's':
doSend = (doSend+1)%2;
break;
case 't':
maxUsecTolerance = strtoul(optarg, NULL, 0);
break;
case 'u':
asleep = strtoul(optarg, NULL, 0);
break;
case 'v':
verbose = (verbose+1)%2;
break;
default:
usage(argv[0]);
break;
}
}
if (argc > optind) {
snprintf(groupNameBaseStr, sizeof(groupNameBaseStr), "%s", argv[optind]);
} else {
snprintf(groupNameBaseStr, sizeof(groupNameBaseStr), "%s_groupName", argv[0]);
}
/* clear out data structures for test */
for(j = 0; j < handleCnt; j++) {
handle[j] = NULL_HANDLE;
prevfd[j] = -1;
snprintf(ctx[j].groupNameStr, sizeof(ctx[j].groupNameStr), "%s-%d", groupNameBaseStr, j);
}
gettimeofday(&tvbeg, NULL);
/* iterate through test */
for(i = 0; i < maxIterations; i++) {
/* create corosync handles (destroying prior handles) */
for(j = 0; j < handleCnt; j++) {
result = coInit(&handle[j], ctx[j].groupNameStr, (void *)ctx, sh);
if(CS_OK != result) {
logmsg(LOGMSG_WARNING,"%s failed to init iteration %lu/%lu\n",ctx[j].groupNameStr, i, maxIterations);
ctx[j].coInitOk = 0;
continue;
}
ctx[j].coInitOk = 1;
result = cpg_fd_get(handle[j], &cofd);
if (CS_OK != result) {
logmsg(LOGMSG_WARNING, "error %s Could not get cpg fd %d\n",ctx[j].groupNameStr, result);
continue;
}
if(prevfd[j] != -1 && cofd != prevfd[j]) {
logmsg(LOGMSG_WARNING,"failed %s %lu/%d/%d leaking fd now %d previously %d\n",ctx[j].groupNameStr, i, j, k, cofd, prevfd[j]);
}
prevfd[j] = cofd;
addFd(sh, handle[j]);
#ifdef EXTRA_DISPATCH
result = cpg_dispatch(handle[j], CS_DISPATCH_ALL);
if(CS_OK != result) {
logmsg(LOGMSG_WARNING,"error %s cpg dispatch %d\n",ctx[j].groupNameStr, result);
ctx[j].coInitOk = 0;
}
#endif
}
/* receiver simply waits some specified time for messages */
if(!doSend) {
result = checkFdAll(sh, handle, asleep);
continue;
}
/* sender creates and sends messages across connections */
for(k = 0; k < msgcnt; k++) {
for(j = 0; j < handleCnt; j++) {
gettimeofday(&tvbeg, NULL);
snprintf(mbuf, sizeof(mbuf), "%s=%lu.%lu %s=%lu %s=%d %s=%d %s=%d",
cmds[CmdTime].key, tvbeg.tv_sec, tvbeg.tv_usec,
cmds[CmdGeneration].key, i,
cmds[CmdConnection].key, j,
cmds[CmdMessageCount].key, k,
cmds[CmdAck].key, CmdAckSet);
iov.iov_base = mbuf;
iov.iov_len = sizeof(mbuf);
iovLen = sizeof(iov)/sizeof(struct iovec);
msgQueued++;
result = cpg_mcast_joined(handle[j], CPG_TYPE_AGREED, &iov, iovLen);
if(CS_OK != result) {
logmsg(LOGMSG_ERROR,"error %s %lu/%d/%d cpg mcast joined %d\n", ctx[j].groupNameStr, i, j, k, result);
result = coInit(&handle[j],ctx[j].groupNameStr, (void *)ctx, sh);
if(CS_OK != result) {
ctx[j].coInitOk = 0;
logmsg(LOGMSG_ERROR,"%s failed to init iteration %lu/%lu\n",ctx[j].groupNameStr, i, maxIterations);
}
continue;
} else {
char tbuf[128];
ctime_r(&tvbeg.tv_sec, tbuf);
tbuf[strlen(tbuf)-1] = 0; /* remove carriage return */
if(verbose)
logmsg(LOGMSG_INFO,"queued msg %d %d stime %ld.%ld %s.%ld\n", j, k, tvbeg.tv_sec, tvbeg.tv_usec, tbuf, tvbeg.tv_usec);
}
if(verbose) logmsg(LOGMSG_INFO,"dispatch messages %d\n", j);
/* send a message */
result = cpg_dispatch(handle[j], CS_DISPATCH_ALL);
if(CS_OK != result) {
logmsg(LOGMSG_WARNING,"error %s cpg main dispatch %d\n",ctx[j].groupNameStr, result);
ctx[j].coInitOk = 0;
result = coInit(&handle[j], ctx[j].groupNameStr, (void *)ctx, sh);
if(CS_OK != result) {
logmsg(LOGMSG_ERROR,"%s failed to init iteration %lu/%lu\n",ctx[j].groupNameStr, i, maxIterations);
}
ctx[j].coInitOk = 1;
continue;
}
}
}
/* now that all the messages were sent in the first round */
result = checkFdAll(sh, handle, asleep);
}
deltatime(&tvbeg, "post all iterations");
result = checkFdAll(sh, handle, psleep);
deltatime(&tvbeg, "post final sleep");
for(j = 0; j < handleCnt; j++) {
struct cpg_name groupName; /**< group name/length structure */
strncpy(groupName.value,ctx[j].groupNameStr, sizeof(groupName.value));
groupName.length = strlen(ctx[j].groupNameStr)+1;
gettimeofday(&tvbeg, NULL);
result = cpg_leave(handle[j], &groupName);
deltatime(&tvbeg, "cpg_leave");
if(CS_OK != result) {
logmsg(LOGMSG_WARNING,"error %s cpg leave %d\n",ctx[j].groupNameStr, result);
}
result = cpg_dispatch(handle[j], CS_DISPATCH_ALL);
if(CS_OK != result) {
logmsg(LOGMSG_WARNING,"error %s cpg dispatch %d\n",ctx[j].groupNameStr, result);
}
value = 0;
if(ioctl(cofd, SIOCOUTQ, &value) < 0) {
logmsg(LOGMSG_WARNING,"error ioctl(SIOCOUTQ) %s\n", strerror(errno));
} else {
if(verbose)
logmsg(LOGMSG_INFO,"ioctl(SIOCOUTQ) %d\n", value);
}
result = cpg_finalize (handle[j]);
if(CS_OK != result) {
logmsg(LOGMSG_WARNING,"finalize failed %d\n", result);
continue;
}
}
hvalDump();
logwrap();
delSetHandle(sh);
return (0);
}
Makefile.co
Description: Binary data
_______________________________________________ Openais mailing list [email protected] https://lists.linux-foundation.org/mailman/listinfo/openais
