This patch optimizes how totem operates. In my testing, the
optimization is significant with 20-50% more throughput depending on
workload.
The optimization reduces all of the iovectorizing that occurs from
totemsrp->totemnet, and also on delivery of messages from
totemnet->totempg through to main.c. Finally totemsrp only malloc()s a
messages contents once and makes a replica of the iovector information
passed from totempg->totemsrp_mcast.
Note this requires changing one coroapi tpg_init which should have
minimal impact since I don't expect many people use this api.
Tested for 8 hr runtime with 3 cpgverifys per node in 3 node
configuration.
Regards
-steve
Index: services/votequorum.c
===================================================================
--- services/votequorum.c (revision 2171)
+++ services/votequorum.c (working copy)
@@ -181,8 +181,9 @@
const struct memb_ring_id *ring_id);
static void quorum_deliver_fn(unsigned int nodeid,
- const struct iovec *iovec, unsigned int iov_len,
- int endian_conversion_required);
+ const void *msg,
+ unsigned int msg_len,
+ int endian_conversion_required);
static int votequorum_exec_init_fn (struct corosync_api_v1 *corosync_api);
@@ -970,10 +971,14 @@
}
static void quorum_deliver_fn(unsigned int nodeid,
- const struct iovec *iovec, unsigned int iov_len,
- int endian_conversion_required)
+ const void *msg,
+ unsigned int msg_len,
+ int endian_conversion_required)
{
- struct q_protheader *header = iovec->iov_base;
+/*
+ * TODO this violates the const rules applied to delivered messages
+ */
+ struct q_protheader *header = (struct q_protheader *)msg;
char *buf;
ENTER();
@@ -988,7 +993,7 @@
if (header->tgtport == 0 &&
(header->tgtid == us->node_id ||
header->tgtid == 0)) {
- buf = (char *)(iovec->iov_base) + sizeof(struct q_protheader);
+ buf = (char *)(msg) + sizeof(struct q_protheader);
switch (*buf) {
case VOTEQUORUM_MSG_NODEINFO:
Index: include/corosync/totem/totempg.h
===================================================================
--- include/corosync/totem/totempg.h (revision 2171)
+++ include/corosync/totem/totempg.h (working copy)
@@ -84,8 +84,8 @@
void (*deliver_fn) (
unsigned int nodeid,
- const struct iovec *iovec,
- unsigned int iov_len,
+ const void *msg,
+ unsigned int msg_len,
int endian_conversion_required),
void (*confchg_fn) (
Index: include/corosync/engine/coroapi.h
===================================================================
--- include/corosync/engine/coroapi.h (revision 2171)
+++ include/corosync/engine/coroapi.h (working copy)
@@ -462,8 +462,8 @@
void (*deliver_fn) (
unsigned int nodeid,
- const struct iovec *iovec,
- unsigned int iov_len,
+ const void *msg,
+ unsigned int msg_len,
int endian_conversion_required),
void (*confchg_fn) (
Index: exec/totemnet.c
===================================================================
--- exec/totemnet.c (revision 2171)
+++ exec/totemnet.c (working copy)
@@ -131,7 +131,7 @@
void (*totemnet_deliver_fn) (
void *context,
const void *msg,
- size_t msg_len);
+ unsigned int msg_len);
void (*totemnet_iface_change_fn) (
void *context,
@@ -201,8 +201,8 @@
};
struct work_item {
- struct iovec iovec[20];
- unsigned int iov_len;
+ const void *msg;
+ unsigned int msg_len;
struct totemnet_instance *instance;
};
@@ -884,26 +884,26 @@
static inline void ucast_sendmsg (
struct totemnet_instance *instance,
struct totem_ip_address *system_to,
- const struct iovec *iovec_in,
- size_t iov_len_in)
+ const void *msg,
+ unsigned int msg_len)
{
struct msghdr msg_ucast;
int res = 0;
size_t buf_len;
unsigned char sheader[sizeof (struct security_header)];
unsigned char encrypt_data[FRAME_SIZE_MAX];
- struct iovec iovec_encrypt[20];
+ struct iovec iovec_encrypt[2];
const struct iovec *iovec_sendmsg;
struct sockaddr_storage sockaddr;
+ struct iovec iovec;
unsigned int iov_len;
int addrlen;
if (instance->totem_config->secauth == 1) {
-
iovec_encrypt[0].iov_base = sheader;
iovec_encrypt[0].iov_len = sizeof (struct security_header);
- memcpy (&iovec_encrypt[1], &iovec_in[0],
- sizeof (struct iovec) * iov_len_in);
+ iovec_encrypt[1].iov_base = (void *)msg;
+ iovec_encrypt[1].iov_len = msg_len;
/*
* Encrypt and digest the message
@@ -913,7 +913,7 @@
encrypt_data,
&buf_len,
iovec_encrypt,
- iov_len_in + 1);
+ 2);
if (instance->totem_config->crypto_accept == TOTEM_CRYPTO_ACCEPT_NEW) {
encrypt_data[buf_len++] = instance->totem_config->crypto_type;
@@ -927,8 +927,10 @@
iovec_sendmsg = &iovec_encrypt[0];
iov_len = 1;
} else {
- iovec_sendmsg = iovec_in;
- iov_len = iov_len_in;
+ iovec.iov_base = (void *)msg;
+ iovec.iov_len = msg_len;
+ iovec_sendmsg = &iovec;
+ iov_len = 1;
}
/*
@@ -954,15 +956,16 @@
static inline void mcast_sendmsg (
struct totemnet_instance *instance,
- const struct iovec *iovec_in,
- size_t iov_len_in)
+ const void *msg,
+ unsigned int msg_len)
{
struct msghdr msg_mcast;
int res = 0;
size_t buf_len;
unsigned char sheader[sizeof (struct security_header)];
unsigned char encrypt_data[FRAME_SIZE_MAX];
- struct iovec iovec_encrypt[20];
+ struct iovec iovec_encrypt[2];
+ struct iovec iovec;
const struct iovec *iovec_sendmsg;
struct sockaddr_storage sockaddr;
unsigned int iov_len;
@@ -972,8 +975,8 @@
iovec_encrypt[0].iov_base = sheader;
iovec_encrypt[0].iov_len = sizeof (struct security_header);
- memcpy (&iovec_encrypt[1], &iovec_in[0],
- sizeof (struct iovec) * iov_len_in);
+ iovec_encrypt[1].iov_base = (void *)msg;
+ iovec_encrypt[1].iov_len = msg_len;
/*
* Encrypt and digest the message
@@ -983,7 +986,7 @@
encrypt_data,
&buf_len,
iovec_encrypt,
- iov_len_in + 1);
+ 2);
if (instance->totem_config->crypto_accept == TOTEM_CRYPTO_ACCEPT_NEW) {
encrypt_data[buf_len++] = instance->totem_config->crypto_type;
@@ -997,8 +1000,11 @@
iovec_sendmsg = &iovec_encrypt[0];
iov_len = 1;
} else {
- iovec_sendmsg = iovec_in;
- iov_len = iov_len_in;
+ iovec.iov_base = (void *)msg;
+ iovec.iov_len = msg_len;
+
+ iovec_sendmsg = &iovec;
+ iov_len = 1;
}
/*
@@ -1045,33 +1051,31 @@
unsigned char sheader[sizeof (struct security_header)];
int res = 0;
size_t buf_len;
- struct iovec iovec_encrypted;
- struct iovec *iovec_sendmsg;
+ struct iovec iovec_enc[2];
+ struct iovec iovec;
struct sockaddr_storage sockaddr;
- unsigned int iovs;
int addrlen;
if (instance->totem_config->secauth == 1) {
- memmove (&work_item->iovec[1], &work_item->iovec[0],
- work_item->iov_len * sizeof (struct iovec));
- work_item->iovec[0].iov_base = sheader;
- work_item->iovec[0].iov_len = sizeof (struct security_header);
+ iovec_enc[0].iov_base = sheader;
+ iovec_enc[0].iov_len = sizeof (struct security_header);
+ iovec_enc[1].iov_base = (void *)work_item->msg;
+ iovec_enc[1].iov_len = work_item->msg_len;
/*
* Encrypt and digest the message
*/
encrypt_and_sign_worker (
instance,
- totemnet_mcast_thread_state->iobuf, &buf_len,
- work_item->iovec, work_item->iov_len + 1);
+ totemnet_mcast_thread_state->iobuf,
+ &buf_len,
+ iovec_enc, 2);
- iovec_sendmsg = &iovec_encrypted;
- iovec_sendmsg->iov_base = totemnet_mcast_thread_state->iobuf;
- iovec_sendmsg->iov_len = buf_len;
- iovs = 1;
+ iovec.iov_base = totemnet_mcast_thread_state->iobuf;
+ iovec.iov_len = buf_len;
} else {
- iovec_sendmsg = work_item->iovec;
- iovs = work_item->iov_len;
+ iovec.iov_base = (void *)work_item->msg;
+ iovec.iov_len = work_item->msg_len;
}
totemip_totemip_to_sockaddr_convert(&instance->mcast_address,
@@ -1079,8 +1083,8 @@
msg_mcast.msg_name = &sockaddr;
msg_mcast.msg_namelen = addrlen;
- msg_mcast.msg_iov = iovec_sendmsg;
- msg_mcast.msg_iovlen = iovs;
+ msg_mcast.msg_iov = &iovec;
+ msg_mcast.msg_iovlen = 1;
msg_mcast.msg_control = 0;
msg_mcast.msg_controllen = 0;
msg_mcast.msg_flags = 0;
@@ -1091,9 +1095,6 @@
*/
res = sendmsg (instance->totemnet_sockets.mcast_send, &msg_mcast,
MSG_NOSIGNAL);
- if (res > 0) {
- instance->stats_sent += res;
- }
}
int totemnet_finalize (
@@ -1849,8 +1850,8 @@
int totemnet_token_send (
hdb_handle_t handle,
- const struct iovec *iovec,
- unsigned int iov_len)
+ const void *msg,
+ unsigned int msg_len)
{
struct totemnet_instance *instance;
int res = 0;
@@ -1862,7 +1863,7 @@
goto error_exit;
}
- ucast_sendmsg (instance, &instance->token_target, iovec, iov_len);
+ ucast_sendmsg (instance, &instance->token_target, msg, msg_len);
hdb_handle_put (&totemnet_instance_database, handle);
@@ -1871,8 +1872,8 @@
}
int totemnet_mcast_flush_send (
hdb_handle_t handle,
- const struct iovec *iovec,
- unsigned int iov_len)
+ const void *msg,
+ unsigned int msg_len)
{
struct totemnet_instance *instance;
int res = 0;
@@ -1884,7 +1885,7 @@
goto error_exit;
}
- mcast_sendmsg (instance, iovec, iov_len);
+ mcast_sendmsg (instance, msg, msg_len);
hdb_handle_put (&totemnet_instance_database, handle);
@@ -1894,8 +1895,8 @@
int totemnet_mcast_noflush_send (
hdb_handle_t handle,
- const struct iovec *iovec,
- unsigned int iov_len)
+ const void *msg,
+ unsigned int msg_len)
{
struct totemnet_instance *instance;
struct work_item work_item;
@@ -1909,14 +1910,14 @@
}
if (instance->totem_config->threads) {
- memcpy (&work_item.iovec[0], iovec, iov_len * sizeof (struct iovec));
- work_item.iov_len = iov_len;
+ work_item.msg = msg;
+ work_item.msg_len = msg_len;
work_item.instance = instance;
worker_thread_group_work_add (&instance->worker_thread_group,
&work_item);
} else {
- mcast_sendmsg (instance, iovec, iov_len);
+ mcast_sendmsg (instance, msg, msg_len);
}
hdb_handle_put (&totemnet_instance_database, handle);
Index: exec/totemnet.h
===================================================================
--- exec/totemnet.h (revision 2171)
+++ exec/totemnet.h (working copy)
@@ -60,7 +60,7 @@
void (*deliver_fn) (
void *context,
const void *msg,
- size_t msg_len),
+ unsigned int msg_len),
void (*iface_change_fn) (
void *context,
@@ -72,18 +72,18 @@
extern int totemnet_token_send (
hdb_handle_t handle,
- const struct iovec *iovec,
- unsigned int iov_len);
+ const void *msg,
+ unsigned int msg_len);
extern int totemnet_mcast_flush_send (
hdb_handle_t handle,
- const struct iovec *iovec,
- unsigned int iov_len);
+ const void *msg,
+ unsigned int msg_len);
extern int totemnet_mcast_noflush_send (
hdb_handle_t handle,
- const struct iovec *iovec,
- unsigned int iov_len);
+ const void *msg,
+ unsigned int msg_len);
extern int totemnet_recv_flush (hdb_handle_t handle);
Index: exec/totempg.c
===================================================================
--- exec/totempg.c (revision 2171)
+++ exec/totempg.c (working copy)
@@ -210,8 +210,8 @@
struct totempg_group_instance {
void (*deliver_fn) (
unsigned int nodeid,
- const struct iovec *iovec,
- unsigned int iov_len,
+ const void *msg,
+ unsigned int msg_len,
int endian_conversion_required);
void (*confchg_fn) (
@@ -339,33 +339,31 @@
}
static inline void group_endian_convert (
- struct iovec *iovec)
+ void *msg,
+ int msg_len)
{
unsigned short *group_len;
int i;
- struct iovec iovec_aligned = { NULL, 0 };
- struct iovec *iovec_swab;
+ char *aligned_msg;
/*
* Align data structure for sparc and ia64
*/
- if ((size_t)iovec->iov_base % 4 != 0) {
- iovec_aligned.iov_base = alloca(iovec->iov_len);
- memcpy(iovec_aligned.iov_base, iovec->iov_base, iovec->iov_len);
- iovec_aligned.iov_len = iovec->iov_len;
- iovec_swab = &iovec_aligned;
+ if ((size_t)msg % 4 != 0) {
+ aligned_msg = alloca(msg_len);
+ memcpy(aligned_msg, msg, msg_len);
} else {
- iovec_swab = iovec;
+ aligned_msg = msg;
}
- group_len = (unsigned short *)iovec_swab->iov_base;
+ group_len = (unsigned short *)aligned_msg;
group_len[0] = swab16(group_len[0]);
for (i = 1; i < group_len[0] + 1; i++) {
group_len[i] = swab16(group_len[i]);
}
- if (iovec_swab == &iovec_aligned) {
- memcpy(iovec->iov_base, iovec_aligned.iov_base, iovec->iov_len);
+ if (aligned_msg != msg) {
+ memcpy(msg, aligned_msg, msg_len);
}
}
@@ -426,8 +424,8 @@
static inline void app_deliver_fn (
unsigned int nodeid,
- struct iovec *iovec,
- unsigned int iov_len,
+ void *msg,
+ unsigned int msg_len,
int endian_conversion_required)
{
int i;
@@ -435,18 +433,23 @@
struct iovec stripped_iovec;
unsigned int adjust_iovec;
unsigned int res;
+ struct iovec *iovec;
+
struct iovec aligned_iovec = { NULL, 0 };
if (endian_conversion_required) {
- group_endian_convert (iovec);
+ group_endian_convert (msg, msg_len);
}
+/*
+ * TODO This function needs to be rewritten for proper alignment to avoid 3+ memory copies
+ */
/*
* Align data structure for sparc and ia64
*/
- aligned_iovec.iov_base = alloca(iovec->iov_len);
- aligned_iovec.iov_len = iovec->iov_len;
- memcpy(aligned_iovec.iov_base, iovec->iov_base, iovec->iov_len);
+ aligned_iovec.iov_base = alloca(msg_len);
+ aligned_iovec.iov_len = msg_len;
+ memcpy(aligned_iovec.iov_base, msg, msg_len);
iovec = &aligned_iovec;
for (i = 0; i <= totempg_max_handle; i++) {
@@ -454,10 +457,9 @@
hdb_nocheck_convert (i), (void *)&instance);
if (res == 0) {
- assert (iov_len == 1);
- if (group_matches (iovec, iov_len, instance->groups, instance->groups_cnt, &adjust_iovec)) {
+ if (group_matches (iovec, 1, instance->groups, instance->groups_cnt, &adjust_iovec)) {
stripped_iovec.iov_len = iovec->iov_len - adjust_iovec;
-// stripped_iovec.iov_base = (char *)iovec->iov_base + adjust_iovec;
+ stripped_iovec.iov_base = (char *)iovec->iov_base + adjust_iovec;
/*
* Align data structure for sparc and ia64
@@ -475,8 +477,8 @@
instance->deliver_fn (
nodeid,
- &stripped_iovec,
- iov_len,
+ stripped_iovec.iov_base,
+ stripped_iovec.iov_len,
endian_conversion_required);
}
@@ -502,8 +504,8 @@
static void totempg_deliver_fn (
unsigned int nodeid,
- const struct iovec *iovec,
- unsigned int iov_len,
+ const void *msg,
+ unsigned int msg_len,
int endian_conversion_required)
{
struct totempg_mcast *mcast;
@@ -511,11 +513,11 @@
int i;
struct assembly *assembly;
char header[FRAME_SIZE_MAX];
- int h_index;
- int a_i = 0;
int msg_count;
int continuation;
int start;
+ const char *data;
+ int datasize;
assembly = assembly_ref (nodeid);
assert (assembly);
@@ -524,61 +526,29 @@
* Assemble the header into one block of data and
* assemble the packet contents into one block of data to simplify delivery
*/
- if (iov_len == 1) {
- /*
- * This message originated from external processor
- * because there is only one iovec for the full msg.
- */
- char *data;
- int datasize;
- mcast = (struct totempg_mcast *)iovec[0].iov_base;
- if (endian_conversion_required) {
- mcast->msg_count = swab16 (mcast->msg_count);
- }
+ mcast = (struct totempg_mcast *)msg;
+ if (endian_conversion_required) {
+ mcast->msg_count = swab16 (mcast->msg_count);
+ }
- msg_count = mcast->msg_count;
- datasize = sizeof (struct totempg_mcast) +
- msg_count * sizeof (unsigned short);
+ msg_count = mcast->msg_count;
+ datasize = sizeof (struct totempg_mcast) +
+ msg_count * sizeof (unsigned short);
- memcpy (header, iovec[0].iov_base, datasize);
- assert(iovec);
- data = iovec[0].iov_base;
+ memcpy (header, msg, datasize);
+ data = msg;
- msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast));
- if (endian_conversion_required) {
- for (i = 0; i < mcast->msg_count; i++) {
- msg_lens[i] = swab16 (msg_lens[i]);
- }
+ msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast));
+ if (endian_conversion_required) {
+ for (i = 0; i < mcast->msg_count; i++) {
+ msg_lens[i] = swab16 (msg_lens[i]);
}
-
- memcpy (&assembly->data[assembly->index], &data[datasize],
- iovec[0].iov_len - datasize);
- } else {
- /*
- * The message originated from local processor
- * becasue there is greater than one iovec for then full msg.
- */
- h_index = 0;
- for (i = 0; i < 2; i++) {
- memcpy (&header[h_index], iovec[i].iov_base, iovec[i].iov_len);
- h_index += iovec[i].iov_len;
- }
-
- mcast = (struct totempg_mcast *)header;
-// TODO make sure we are using a copy of mcast not the actual data itself
-
- msg_lens = (unsigned short *) (header + sizeof (struct totempg_mcast));
-
- for (i = 2; i < iov_len; i++) {
- a_i = assembly->index;
- assert (iovec[i].iov_len + a_i <= MESSAGE_SIZE_MAX);
- memcpy (&assembly->data[a_i], iovec[i].iov_base, iovec[i].iov_len);
- a_i += msg_lens[i - 2];
- }
- iov_len -= 2;
}
+ memcpy (&assembly->data[assembly->index], &data[datasize],
+ msg_len - datasize);
+
/*
* If the last message in the buffer is a fragment, then we
* can't deliver it. We'll first deliver the full messages
@@ -617,7 +587,7 @@
if (continuation == assembly->last_frag_num) {
assembly->last_frag_num = mcast->fragmented;
for (i = start; i < msg_count; i++) {
- app_deliver_fn(nodeid, &iov_delv, 1,
+ app_deliver_fn(nodeid, iov_delv.iov_base, iov_delv.iov_len,
endian_conversion_required);
assembly->index += msg_lens[i];
iov_delv.iov_base = &assembly->data[assembly->index];
@@ -990,8 +960,8 @@
void (*deliver_fn) (
unsigned int nodeid,
- const struct iovec *iovec,
- unsigned int iov_len,
+ const void *msg,
+ unsigned int msg_len,
int endian_conversion_required),
void (*confchg_fn) (
Index: exec/totemmrp.c
===================================================================
--- exec/totemmrp.c (revision 2171)
+++ exec/totemmrp.c (working copy)
@@ -68,8 +68,8 @@
void totemmrp_deliver_fn (
unsigned int nodeid,
- const struct iovec *iovec,
- unsigned int iov_len,
+ const void *msg,
+ unsigned int msg_len,
int endian_conversion_required);
void totemmrp_confchg_fn (
@@ -81,8 +81,8 @@
void (*pg_deliver_fn) (
unsigned int nodeid,
- const struct iovec *iovec,
- unsigned int iov_len,
+ const void *msg,
+ unsigned int msg_len,
int endian_conversion_required) = 0;
void (*pg_confchg_fn) (
@@ -94,11 +94,11 @@
void totemmrp_deliver_fn (
unsigned int nodeid,
- const struct iovec *iovec,
- unsigned int iov_len,
+ const void *msg,
+ unsigned int msg_len,
int endian_conversion_required)
{
- pg_deliver_fn (nodeid, iovec, iov_len, endian_conversion_required);
+ pg_deliver_fn (nodeid, msg, msg_len, endian_conversion_required);
}
void totemmrp_confchg_fn (
@@ -124,8 +124,8 @@
void (*deliver_fn) (
unsigned int nodeid,
- const struct iovec *iovec,
- unsigned int iov_len,
+ const void *msg,
+ unsigned int msg_len,
int endian_conversion_required),
void (*confchg_fn) (
enum totem_configuration_type configuration_type,
Index: exec/totemrrp.c
===================================================================
--- exec/totemrrp.c (revision 2171)
+++ exec/totemrrp.c (working copy)
@@ -71,7 +71,7 @@
void rrp_deliver_fn (
void *context,
const void *msg,
- size_t msg_len);
+ unsigned int msg_len);
void rrp_iface_change_fn (
void *context,
@@ -117,30 +117,30 @@
unsigned int iface_no,
void *context,
const void *msg,
- size_t msg_len);
+ unsigned int msg_len);
void (*mcast_noflush_send) (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len);
+ const void *msg,
+ unsigned int msg_len);
void (*mcast_flush_send) (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len);
+ const void *msg,
+ unsigned int msg_len);
void (*token_recv) (
struct totemrrp_instance *instance,
unsigned int iface_no,
void *context,
const void *msg,
- size_t msg_len,
+ unsigned int msg_len,
unsigned int token_seqid);
void (*token_send) (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len);
+ const void *msg,
+ unsigned int msg_len);
void (*recv_flush) (
struct totemrrp_instance *instance);
@@ -178,7 +178,7 @@
void (*totemrrp_deliver_fn) (
void *context,
const void *msg,
- size_t msg_len);
+ unsigned int msg_len);
void (*totemrrp_iface_change_fn) (
void *context,
@@ -234,30 +234,30 @@
unsigned int iface_no,
void *context,
const void *msg,
- size_t msg_len);
+ unsigned int msg_len);
static void none_mcast_noflush_send (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len);
+ const void *msg,
+ unsigned int msg_len);
static void none_mcast_flush_send (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len);
+ const void *msg,
+ unsigned int msg_len);
static void none_token_recv (
struct totemrrp_instance *instance,
unsigned int iface_no,
void *context,
const void *msg,
- size_t msg_len,
+ unsigned int msg_len,
unsigned int token_seqid);
static void none_token_send (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len);
+ const void *msg,
+ unsigned int msg_len);
static void none_recv_flush (
struct totemrrp_instance *instance);
@@ -292,30 +292,30 @@
unsigned int iface_no,
void *context,
const void *msg,
- size_t msg_len);
+ unsigned int msg_len);
static void passive_mcast_noflush_send (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len);
+ const void *msg,
+ unsigned int msg_len);
static void passive_mcast_flush_send (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len);
+ const void *msg,
+ unsigned int msg_len);
static void passive_token_recv (
struct totemrrp_instance *instance,
unsigned int iface_no,
void *context,
const void *msg,
- size_t msg_len,
+ unsigned int msg_len,
unsigned int token_seqid);
static void passive_token_send (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len);
+ const void *msg,
+ unsigned int msg_len);
static void passive_recv_flush (
struct totemrrp_instance *instance);
@@ -350,30 +350,30 @@
unsigned int iface_no,
void *context,
const void *msg,
- size_t msg_len);
+ unsigned int msg_len);
static void active_mcast_noflush_send (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len);
+ const void *msg,
+ unsigned int msg_len);
static void active_mcast_flush_send (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len);
+ const void *msg,
+ unsigned int msg_len);
static void active_token_recv (
struct totemrrp_instance *instance,
unsigned int iface_no,
void *context,
const void *msg,
- size_t msg_len,
+ unsigned int msg_len,
unsigned int token_seqid);
static void active_token_send (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len);
+ const void *msg,
+ unsigned int msg_len);
static void active_recv_flush (
struct totemrrp_instance *instance);
@@ -486,7 +486,7 @@
unsigned int iface_no,
void *context,
const void *msg,
- size_t msg_len)
+ unsigned int msg_len)
{
rrp_instance->totemrrp_deliver_fn (
context,
@@ -496,18 +496,18 @@
static void none_mcast_flush_send (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len)
+ const void *msg,
+ unsigned int msg_len)
{
- totemnet_mcast_flush_send (instance->net_handles[0], iovec, iov_len);
+ totemnet_mcast_flush_send (instance->net_handles[0], msg, msg_len);
}
static void none_mcast_noflush_send (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len)
+ const void *msg,
+ unsigned int msg_len)
{
- totemnet_mcast_noflush_send (instance->net_handles[0], iovec, iov_len);
+ totemnet_mcast_noflush_send (instance->net_handles[0], msg, msg_len);
}
static void none_token_recv (
@@ -515,7 +515,7 @@
unsigned int iface_no,
void *context,
const void *msg,
- size_t msg_len,
+ unsigned int msg_len,
unsigned int token_seq)
{
rrp_instance->totemrrp_deliver_fn (
@@ -526,12 +526,12 @@
static void none_token_send (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len)
+ const void *msg,
+ unsigned int msg_len)
{
totemnet_token_send (
instance->net_handles[0],
- iovec, iov_len);
+ msg, msg_len);
}
static void none_recv_flush (struct totemrrp_instance *instance)
@@ -686,7 +686,7 @@
unsigned int iface_no,
void *context,
const void *msg,
- size_t msg_len)
+ unsigned int msg_len)
{
struct passive_instance *passive_instance = (struct passive_instance *)rrp_instance->rrp_algo_instance;
unsigned int max;
@@ -740,8 +740,8 @@
static void passive_mcast_flush_send (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len)
+ const void *msg,
+ unsigned int msg_len)
{
struct passive_instance *passive_instance = (struct passive_instance *)instance->rrp_algo_instance;
@@ -749,13 +749,13 @@
passive_instance->msg_xmit_iface = (passive_instance->msg_xmit_iface + 1) % instance->interface_count;
} while (passive_instance->faulty[passive_instance->msg_xmit_iface] == 1);
- totemnet_mcast_flush_send (instance->net_handles[passive_instance->msg_xmit_iface], iovec, iov_len);
+ totemnet_mcast_flush_send (instance->net_handles[passive_instance->msg_xmit_iface], msg, msg_len);
}
static void passive_mcast_noflush_send (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len)
+ const void *msg,
+ unsigned int msg_len)
{
struct passive_instance *passive_instance = (struct passive_instance *)instance->rrp_algo_instance;
@@ -764,7 +764,7 @@
} while (passive_instance->faulty[passive_instance->msg_xmit_iface] == 1);
- totemnet_mcast_noflush_send (instance->net_handles[passive_instance->msg_xmit_iface], iovec, iov_len);
+ totemnet_mcast_noflush_send (instance->net_handles[passive_instance->msg_xmit_iface], msg, msg_len);
}
static void passive_token_recv (
@@ -772,7 +772,7 @@
unsigned int iface_no,
void *context,
const void *msg,
- size_t msg_len,
+ unsigned int msg_len,
unsigned int token_seq)
{
struct passive_instance *passive_instance = (struct passive_instance *)rrp_instance->rrp_algo_instance;
@@ -824,8 +824,8 @@
static void passive_token_send (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len)
+ const void *msg,
+ unsigned int msg_len)
{
struct passive_instance *passive_instance = (struct passive_instance *)instance->rrp_algo_instance;
@@ -835,7 +835,7 @@
totemnet_token_send (
instance->net_handles[passive_instance->token_xmit_iface],
- iovec, iov_len);
+ msg, msg_len);
}
@@ -1095,7 +1095,7 @@
unsigned int iface_no,
void *context,
const void *msg,
- size_t msg_len)
+ unsigned int msg_len)
{
instance->totemrrp_deliver_fn (
context,
@@ -1105,30 +1105,30 @@
static void active_mcast_flush_send (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len)
+ const void *msg,
+ unsigned int msg_len)
{
int i;
struct active_instance *rrp_algo_instance = (struct active_instance *)instance->rrp_algo_instance;
for (i = 0; i < instance->interface_count; i++) {
if (rrp_algo_instance->faulty[i] == 0) {
- totemnet_mcast_flush_send (instance->net_handles[i], iovec, iov_len);
+ totemnet_mcast_flush_send (instance->net_handles[i], msg, msg_len);
}
}
}
static void active_mcast_noflush_send (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len)
+ const void *msg,
+ unsigned int msg_len)
{
int i;
struct active_instance *rrp_algo_instance = (struct active_instance *)instance->rrp_algo_instance;
for (i = 0; i < instance->interface_count; i++) {
if (rrp_algo_instance->faulty[i] == 0) {
- totemnet_mcast_noflush_send (instance->net_handles[i], iovec, iov_len);
+ totemnet_mcast_noflush_send (instance->net_handles[i], msg, msg_len);
}
}
}
@@ -1138,7 +1138,7 @@
unsigned int iface_no,
void *context,
const void *msg,
- size_t msg_len,
+ unsigned int msg_len,
unsigned int token_seq)
{
int i;
@@ -1177,8 +1177,8 @@
static void active_token_send (
struct totemrrp_instance *instance,
- const struct iovec *iovec,
- unsigned int iov_len)
+ const void *msg,
+ unsigned int msg_len)
{
struct active_instance *rrp_algo_instance = (struct active_instance *)instance->rrp_algo_instance;
int i;
@@ -1187,7 +1187,7 @@
if (rrp_algo_instance->faulty[i] == 0) {
totemnet_token_send (
instance->net_handles[i],
- iovec, iov_len);
+ msg, msg_len);
}
}
@@ -1309,7 +1309,7 @@
void rrp_deliver_fn (
void *context,
const void *msg,
- size_t msg_len)
+ unsigned int msg_len)
{
unsigned int token_seqid;
unsigned int token_is;
@@ -1398,7 +1398,7 @@
void (*deliver_fn) (
void *context,
const void *msg,
- size_t msg_len),
+ unsigned int msg_len),
void (*iface_change_fn) (
void *context,
@@ -1589,8 +1589,8 @@
int totemrrp_token_send (
hdb_handle_t handle,
- const struct iovec *iovec,
- unsigned int iov_len)
+ const void *msg,
+ unsigned int msg_len)
{
struct totemrrp_instance *instance;
int res = 0;
@@ -1602,7 +1602,7 @@
goto error_exit;
}
- instance->rrp_algo->token_send (instance, iovec, iov_len);
+ instance->rrp_algo->token_send (instance, msg, msg_len);
hdb_handle_put (&totemrrp_instance_database, handle);
@@ -1612,8 +1612,8 @@
int totemrrp_mcast_flush_send (
hdb_handle_t handle,
- const struct iovec *iovec,
- unsigned int iov_len)
+ const void *msg,
+ unsigned int msg_len)
{
struct totemrrp_instance *instance;
int res = 0;
@@ -1626,7 +1626,7 @@
}
// TODO this needs to return the result
- instance->rrp_algo->mcast_flush_send (instance, iovec, iov_len);
+ instance->rrp_algo->mcast_flush_send (instance, msg, msg_len);
hdb_handle_put (&totemrrp_instance_database, handle);
error_exit:
@@ -1635,8 +1635,8 @@
int totemrrp_mcast_noflush_send (
hdb_handle_t handle,
- const struct iovec *iovec,
- unsigned int iov_len)
+ const void *msg,
+ unsigned int msg_len)
{
struct totemrrp_instance *instance;
int res = 0;
@@ -1656,7 +1656,7 @@
if (instance->processor_count > 1) {
// TODO this needs to return the result
- instance->rrp_algo->mcast_noflush_send (instance, iovec, iov_len);
+ instance->rrp_algo->mcast_noflush_send (instance, msg, msg_len);
}
hdb_handle_put (&totemrrp_instance_database, handle);
Index: exec/sync.c
===================================================================
--- exec/sync.c (revision 2171)
+++ exec/sync.c (working copy)
@@ -102,8 +102,8 @@
static void sync_deliver_fn (
unsigned int nodeid,
- const struct iovec *iovec,
- unsigned int iov_len,
+ const void *msg,
+ unsigned int msg_len,
int endian_conversion_required);
static void sync_confchg_fn (
@@ -327,12 +327,12 @@
static void sync_deliver_fn (
unsigned int nodeid,
- const struct iovec *iovec,
- unsigned int iov_len,
+ const void *msg,
+ unsigned int msg_len,
int endian_conversion_required)
{
struct req_exec_sync_barrier_start *req_exec_sync_barrier_start =
- (struct req_exec_sync_barrier_start *)iovec[0].iov_base;
+ (struct req_exec_sync_barrier_start *)msg;
unsigned int barrier_completed;
int i;
Index: exec/totemmrp.h
===================================================================
--- exec/totemmrp.h (revision 2171)
+++ exec/totemmrp.h (working copy)
@@ -61,8 +61,8 @@
void (*deliver_fn) (
unsigned int nodeid,
- const struct iovec *iovec,
- unsigned int iov_len,
+ const void *msg,
+ unsigned int msg_len,
int endian_conversion_required),
void (*confchg_fn) (
enum totem_configuration_type configuration_type,
Index: exec/totemsrp.c
===================================================================
--- exec/totemsrp.c (revision 2171)
+++ exec/totemsrp.c (working copy)
@@ -277,13 +277,12 @@
struct message_item {
struct mcast *mcast;
- struct iovec iovec[MAXIOVS];
- unsigned int iov_len;
+ unsigned int msg_len;
};
struct sort_queue_item {
- struct iovec iovec[MAXIOVS];
- unsigned int iov_len;
+ struct mcast *mcast;
+ unsigned int msg_len;
};
struct orf_token_mcast_thread_state {
@@ -443,23 +442,14 @@
//TODO struct srp_addr next_memb;
- char iov_buffer[FRAME_SIZE_MAX];
-
- struct iovec totemsrp_iov_recv;
-
hdb_handle_t totemsrp_poll_handle;
- /*
- * Function called when new message received
- */
- int (*totemsrp_recv) (char *group, struct iovec *iovec, unsigned int iov_len);
-
struct totem_ip_address mcast_address;
void (*totemsrp_deliver_fn) (
unsigned int nodeid,
- const struct iovec *iovec,
- unsigned int iov_len,
+ const void *msg,
+ unsigned int msg_len,
int endian_conversion_required);
void (*totemsrp_confchg_fn) (
@@ -603,7 +593,7 @@
void main_deliver_fn (
void *context,
const void *msg,
- size_t msg_len);
+ unsigned int msg_len);
void main_iface_change_fn (
void *context,
@@ -690,8 +680,8 @@
void (*deliver_fn) (
unsigned int nodeid,
- const struct iovec *iovec,
- unsigned int iov_len,
+ const void *msg,
+ unsigned int msg_len,
int endian_conversion_required),
void (*confchg_fn) (
@@ -750,8 +740,6 @@
*/
totemip_copy (&instance->mcast_address, &totem_config->interfaces[0].mcast_addr);
- memset (instance->iov_buffer, 0, FRAME_SIZE_MAX);
-
/*
* Display totem configuration
*/
@@ -1577,35 +1565,22 @@
/*
* Convert recovery message into regular message
*/
- if (recovery_message_item->iov_len > 1) {
- mcast = recovery_message_item->iovec[1].iov_base;
- memcpy (®ular_message_item.iovec[0],
- &recovery_message_item->iovec[1],
- sizeof (struct iovec) * recovery_message_item->iov_len);
+ mcast = recovery_message_item->mcast;
+ if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) {
+ /*
+ * Message is a recovery message encapsulated
+ * in a new ring message
+ */
+ regular_message_item.mcast =
+ (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
+ regular_message_item.msg_len =
+ recovery_message_item->msg_len - sizeof (struct mcast);
+ mcast = regular_message_item.mcast;
} else {
- mcast = recovery_message_item->iovec[0].iov_base;
- if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) {
- /*
- * Message is a recovery message encapsulated
- * in a new ring message
- */
- regular_message_item.iovec[0].iov_base =
- (char *)recovery_message_item->iovec[0].iov_base + sizeof (struct mcast);
- regular_message_item.iovec[0].iov_len =
- recovery_message_item->iovec[0].iov_len - sizeof (struct mcast);
- regular_message_item.iov_len = 1;
- mcast = regular_message_item.iovec[0].iov_base;
- } else {
- continue; /* TODO this case shouldn't happen */
- /*
- * Message is originated on new ring and not
- * encapsulated
- */
- regular_message_item.iovec[0].iov_base =
- recovery_message_item->iovec[0].iov_base;
- regular_message_item.iovec[0].iov_len =
- recovery_message_item->iovec[0].iov_len;
- }
+ /*
+ * TODO this case shouldn't happen
+ */
+ continue;
}
log_printf (instance->totemsrp_log_level_debug,
@@ -1620,7 +1595,7 @@
if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
sizeof (struct memb_ring_id)) == 0) {
- regular_message_item.iov_len = recovery_message_item->iov_len;
+ regular_message_item.msg_len = recovery_message_item->msg_len;
res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
if (res == 0) {
sq_item_add (&instance->regular_sort_queue,
@@ -1980,28 +1955,27 @@
low_ring_aru + i, &ptr);
if (res != 0) {
strcat (not_originated, seqno_string_hex);
- continue;
- }
- strcat (is_originated, seqno_string_hex);
- sort_queue_item = ptr;
- assert (sort_queue_item->iov_len > 0);
- assert (sort_queue_item->iov_len <= MAXIOVS);
- messages_originated++;
- memset (&message_item, 0, sizeof (struct message_item));
-// TODO LEAK
- message_item.mcast = malloc (sizeof (struct mcast));
- assert (message_item.mcast);
- message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
- srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
- message_item.mcast->header.encapsulated = MESSAGE_ENCAPSULATED;
- message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
- assert (message_item.mcast->header.nodeid);
- message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
- memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
- sizeof (struct memb_ring_id));
- message_item.iov_len = sort_queue_item->iov_len;
- memcpy (&message_item.iovec, &sort_queue_item->iovec,
- sizeof (struct iovec) * sort_queue_item->iov_len);
+ continue;
+ }
+ strcat (is_originated, seqno_string_hex);
+ sort_queue_item = ptr;
+ messages_originated++;
+ memset (&message_item, 0, sizeof (struct message_item));
+ // TODO LEAK
+ message_item.mcast = malloc (10000);
+ assert (message_item.mcast);
+ message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
+ srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
+ message_item.mcast->header.encapsulated = MESSAGE_ENCAPSULATED;
+ message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
+ assert (message_item.mcast->header.nodeid);
+ message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
+ memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
+ sizeof (struct memb_ring_id));
+ message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
+ memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
+ sort_queue_item->mcast,
+ sort_queue_item->msg_len);
queue_item_add (&instance->retrans_message_queue, &message_item);
}
log_printf (instance->totemsrp_log_level_notice,
@@ -2057,9 +2031,10 @@
int guarantee)
{
int i;
- int j;
struct message_item message_item;
struct totemsrp_instance *instance;
+ char *addr;
+ unsigned int addr_idx;
unsigned int res;
res = hdb_handle_get (&totemsrp_instance_database, handle,
@@ -2072,17 +2047,13 @@
log_printf (instance->totemsrp_log_level_warning, "queue full\n");
return (-1);
}
- for (j = 0, i = 0; i < iov_len; i++) {
- j+= iovec[i].iov_len;
- }
memset (&message_item, 0, sizeof (struct message_item));
/*
* Allocate pending item
*/
-// TODO LEAK
- message_item.mcast = malloc (sizeof (struct mcast));
+ message_item.mcast = malloc (10000);
if (message_item.mcast == 0) {
goto error_mcast;
}
@@ -2099,21 +2070,14 @@
message_item.mcast->guarantee = guarantee;
srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
+ addr = (char *)message_item.mcast;
+ addr_idx = sizeof (struct mcast);
for (i = 0; i < iov_len; i++) {
-// TODO LEAK
- message_item.iovec[i].iov_base = malloc (iovec[i].iov_len);
-
- if (message_item.iovec[i].iov_base == 0) {
- goto error_iovec;
- }
-
- memcpy (message_item.iovec[i].iov_base, iovec[i].iov_base,
- iovec[i].iov_len);
-
- message_item.iovec[i].iov_len = iovec[i].iov_len;
+ memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
+ addr_idx += iovec[i].iov_len;
}
- message_item.iov_len = iov_len;
+ message_item.msg_len = addr_idx;
log_printf (instance->totemsrp_log_level_debug, "mcasted message added to pending queue\n");
queue_item_add (&instance->new_message_queue, &message_item);
@@ -2121,13 +2085,6 @@
hdb_handle_put (&totemsrp_instance_database, handle);
return (0);
-error_iovec:
- for (j = 0; j < i; j++) {
- free (message_item.iovec[j].iov_base);
- }
-
- free(message_item.mcast);
-
error_mcast:
hdb_handle_put (&totemsrp_instance_database, handle);
@@ -2198,9 +2155,10 @@
sort_queue_item = ptr;
- totemrrp_mcast_noflush_send (instance->totemrrp_handle,
- sort_queue_item->iovec,
- sort_queue_item->iov_len);
+ totemrrp_mcast_noflush_send (
+ instance->totemrrp_handle,
+ sort_queue_item->mcast,
+ sort_queue_item->msg_len);
return (0);
}
@@ -2214,7 +2172,7 @@
unsigned int token_aru)
{
struct sort_queue_item *regular_message;
- unsigned int i, j;
+ unsigned int i;
int res;
int log_release = 0;
unsigned int release_to;
@@ -2248,9 +2206,7 @@
instance->last_released + i, &ptr);
if (res == 0) {
regular_message = ptr;
- for (j = 0; j < regular_message->iov_len; j++) {
- free (regular_message->iovec[j].iov_base);
- }
+ free (regular_message->mcast);
}
sq_items_release (&instance->regular_sort_queue,
instance->last_released + i);
@@ -2349,29 +2305,23 @@
* Build IO vector
*/
memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
- sort_queue_item.iovec[0].iov_base = message_item->mcast;
- sort_queue_item.iovec[0].iov_len = sizeof (struct mcast);
+ sort_queue_item.mcast = message_item->mcast;
+ sort_queue_item.msg_len = message_item->msg_len;
- mcast = sort_queue_item.iovec[0].iov_base;
+ mcast = sort_queue_item.mcast;
- memcpy (&sort_queue_item.iovec[1], message_item->iovec,
- message_item->iov_len * sizeof (struct iovec));
-
memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
- sort_queue_item.iov_len = message_item->iov_len + 1;
-
- assert (sort_queue_item.iov_len < 16);
-
/*
* Add message to retransmit queue
*/
sort_queue_item_ptr = sq_item_add (sort_queue,
&sort_queue_item, message_item->mcast->seq);
- totemrrp_mcast_noflush_send (instance->totemrrp_handle,
- sort_queue_item_ptr->iovec,
- sort_queue_item_ptr->iov_len);
+ totemrrp_mcast_noflush_send (
+ instance->totemrrp_handle,
+ message_item->mcast,
+ message_item->msg_len);
/*
* Delete item from pending queue
@@ -2517,14 +2467,9 @@
static void token_retransmit (struct totemsrp_instance *instance)
{
- struct iovec iovec;
-
- iovec.iov_base = instance->orf_token_retransmit;
- iovec.iov_len = instance->orf_token_retransmit_size;
-
totemrrp_token_send (instance->totemrrp_handle,
- &iovec,
- 1);
+ instance->orf_token_retransmit,
+ instance->orf_token_retransmit_size);
}
/*
@@ -2591,13 +2536,14 @@
struct orf_token *orf_token,
int forward_token)
{
- struct iovec iovec;
int res = 0;
- unsigned int iov_len = sizeof (struct orf_token) +
+ unsigned int orf_token_size;
+
+ orf_token_size = sizeof (struct orf_token) +
(orf_token->rtr_list_entries * sizeof (struct rtr_item));
- memcpy (instance->orf_token_retransmit, orf_token, iov_len);
- instance->orf_token_retransmit_size = iov_len;
+ memcpy (instance->orf_token_retransmit, orf_token, orf_token_size);
+ instance->orf_token_retransmit_size = orf_token_size;
orf_token->header.nodeid = instance->my_id.addr[0].nodeid;
assert (orf_token->header.nodeid);
@@ -2605,12 +2551,9 @@
return (0);
}
- iovec.iov_base = orf_token;
- iovec.iov_len = iov_len;
-
totemrrp_token_send (instance->totemrrp_handle,
- &iovec,
- 1);
+ orf_token,
+ orf_token_size);
return (res);
}
@@ -2618,7 +2561,6 @@
static int token_hold_cancel_send (struct totemsrp_instance *instance)
{
struct token_hold_cancel token_hold_cancel;
- struct iovec iovec[2];
/*
* Only cancel if the token is currently held
@@ -2634,19 +2576,15 @@
token_hold_cancel.header.type = MESSAGE_TYPE_TOKEN_HOLD_CANCEL;
token_hold_cancel.header.endian_detector = ENDIAN_LOCAL;
token_hold_cancel.header.nodeid = instance->my_id.addr[0].nodeid;
+ memcpy (&token_hold_cancel.ring_id, &instance->my_ring_id,
+ sizeof (struct memb_ring_id));
assert (token_hold_cancel.header.nodeid);
- iovec[0].iov_base = &token_hold_cancel;
- iovec[0].iov_len = sizeof (struct token_hold_cancel) -
- sizeof (struct memb_ring_id);
- iovec[1].iov_base = &instance->my_ring_id;
- iovec[1].iov_len = sizeof (struct memb_ring_id);
+ totemrrp_mcast_flush_send (instance->totemrrp_handle, &token_hold_cancel,
+ sizeof (struct token_hold_cancel));
- totemrrp_mcast_flush_send (instance->totemrrp_handle, iovec, 2);
-
return (0);
}
-//AAA
static int orf_token_send_initial (struct totemsrp_instance *instance)
{
@@ -2777,27 +2715,26 @@
struct totemsrp_instance *instance,
struct memb_commit_token *commit_token)
{
- struct iovec iovec;
struct srp_addr *addr;
struct memb_commit_token_memb_entry *memb_list;
+ unsigned int commit_token_size;
addr = (struct srp_addr *)commit_token->end_of_commit_token;
memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
commit_token->token_seq++;
- iovec.iov_base = commit_token;
- iovec.iov_len = sizeof (struct memb_commit_token) +
+ commit_token_size = sizeof (struct memb_commit_token) +
((sizeof (struct srp_addr) +
sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
/*
* Make a copy for retransmission if necessary
*/
- memcpy (instance->orf_token_retransmit, commit_token, iovec.iov_len);
- instance->orf_token_retransmit_size = iovec.iov_len;
+ memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
+ instance->orf_token_retransmit_size = commit_token_size;
totemrrp_token_send (instance->totemrrp_handle,
- &iovec,
- 1);
+ commit_token,
+ commit_token_size);
/*
* Request retransmission of the commit token in case it is lost
@@ -2887,65 +2824,71 @@
static void memb_join_message_send (struct totemsrp_instance *instance)
{
- struct memb_join memb_join;
- struct iovec iovec[3];
- unsigned int iovs;
+ char memb_join_data[10000];
+ struct memb_join *memb_join = (struct memb_join *)memb_join_data;
+ char *addr;
+ unsigned int addr_idx;
- memb_join.header.type = MESSAGE_TYPE_MEMB_JOIN;
- memb_join.header.endian_detector = ENDIAN_LOCAL;
- memb_join.header.encapsulated = 0;
- memb_join.header.nodeid = instance->my_id.addr[0].nodeid;
- assert (memb_join.header.nodeid);
+ memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
+ memb_join->header.endian_detector = ENDIAN_LOCAL;
+ memb_join->header.encapsulated = 0;
+ memb_join->header.nodeid = instance->my_id.addr[0].nodeid;
+ assert (memb_join->header.nodeid);
assert (srp_addr_equal (&instance->my_proc_list[0], &instance->my_proc_list[1]) == 0);
- memb_join.ring_seq = instance->my_ring_id.seq;
- memb_join.proc_list_entries = instance->my_proc_list_entries;
- memb_join.failed_list_entries = instance->my_failed_list_entries;
- srp_addr_copy (&memb_join.system_from, &instance->my_id);
+ memb_join->ring_seq = instance->my_ring_id.seq;
+ memb_join->proc_list_entries = instance->my_proc_list_entries;
+ memb_join->failed_list_entries = instance->my_failed_list_entries;
+ srp_addr_copy (&memb_join->system_from, &instance->my_id);
- iovec[0].iov_base = &memb_join;
- iovec[0].iov_len = sizeof (struct memb_join);
- iovec[1].iov_base = &instance->my_proc_list;
- iovec[1].iov_len = instance->my_proc_list_entries *
+ /*
+ * This mess adds the joined and failed processor lists into the join
+ * message
+ */
+ addr = (char *)memb_join;
+ addr_idx = sizeof (struct memb_join);
+ memcpy (&addr[addr_idx],
+ instance->my_proc_list,
+ instance->my_proc_list_entries *
+ sizeof (struct srp_addr));
+ addr_idx +=
+ instance->my_proc_list_entries *
sizeof (struct srp_addr);
- if (instance->my_failed_list_entries == 0) {
- iovs = 2;
- } else {
- iovs = 3;
- iovec[2].iov_base = instance->my_failed_list;
- iovec[2].iov_len = instance->my_failed_list_entries *
- sizeof (struct srp_addr);
- }
+ memcpy (&addr[addr_idx],
+ instance->my_failed_list,
+ instance->my_failed_list_entries *
+ sizeof (struct srp_addr));
+ addr_idx +=
+ instance->my_failed_list_entries *
+ sizeof (struct srp_addr);
+
if (instance->totem_config->send_join_timeout) {
usleep (random() % (instance->totem_config->send_join_timeout * 1000));
}
totemrrp_mcast_flush_send (
instance->totemrrp_handle,
- iovec,
- iovs);
+ memb_join,
+ addr_idx);
}
static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
{
struct memb_merge_detect memb_merge_detect;
- struct iovec iovec[2];
memb_merge_detect.header.type = MESSAGE_TYPE_MEMB_MERGE_DETECT;
memb_merge_detect.header.endian_detector = ENDIAN_LOCAL;
memb_merge_detect.header.encapsulated = 0;
memb_merge_detect.header.nodeid = instance->my_id.addr[0].nodeid;
srp_addr_copy (&memb_merge_detect.system_from, &instance->my_id);
+ memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id,
+ sizeof (struct memb_ring_id));
assert (memb_merge_detect.header.nodeid);
- iovec[0].iov_base = &memb_merge_detect;
- iovec[0].iov_len = sizeof (struct memb_merge_detect) -
- sizeof (struct memb_ring_id);
- iovec[1].iov_base = &instance->my_ring_id;
- iovec[1].iov_len = sizeof (struct memb_ring_id);
-
- totemrrp_mcast_flush_send (instance->totemrrp_handle, iovec, 2);
+ totemrrp_mcast_flush_send (instance->totemrrp_handle,
+ &memb_merge_detect,
+ sizeof (struct memb_merge_detect));
}
static void memb_ring_id_create_or_load (
@@ -3569,7 +3512,7 @@
sort_queue_item_p = ptr;
- mcast_in = sort_queue_item_p->iovec[0].iov_base;
+ mcast_in = sort_queue_item_p->mcast;
assert (mcast_in != (struct mcast *)0xdeadbeef);
endian_conversion_required = 0;
@@ -3604,27 +3547,11 @@
/*
* Message is locally originated multicast
*/
- if (sort_queue_item_p->iov_len > 1 &&
- sort_queue_item_p->iovec[0].iov_len == sizeof (struct mcast)) {
- instance->totemsrp_deliver_fn (
- mcast_header.header.nodeid,
- &sort_queue_item_p->iovec[1],
- sort_queue_item_p->iov_len - 1,
- endian_conversion_required);
- } else {
- sort_queue_item_p->iovec[0].iov_len -= sizeof (struct mcast);
- sort_queue_item_p->iovec[0].iov_base = (char *)sort_queue_item_p->iovec[0].iov_base + sizeof (struct mcast);
-
- instance->totemsrp_deliver_fn (
- mcast_header.header.nodeid,
- sort_queue_item_p->iovec,
- sort_queue_item_p->iov_len,
- endian_conversion_required);
-
- sort_queue_item_p->iovec[0].iov_len += sizeof (struct mcast);
- sort_queue_item_p->iovec[0].iov_base = (char *)sort_queue_item_p->iovec[0].iov_base - sizeof (struct mcast);
- }
-//TODO instance->stats_delv += 1;
+ instance->totemsrp_deliver_fn (
+ mcast_header.header.nodeid,
+ ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
+ sort_queue_item_p->msg_len - sizeof (struct mcast),
+ endian_conversion_required);
}
}
@@ -3726,15 +3653,12 @@
* Allocate new multicast memory block
*/
// TODO LEAK
- sort_queue_item.iovec[0].iov_base = malloc (msg_len);
- if (sort_queue_item.iovec[0].iov_base == 0) {
+ sort_queue_item.mcast = malloc (msg_len);
+ if (sort_queue_item.mcast == NULL) {
return (-1); /* error here is corrected by the algorithm */
}
- memcpy (sort_queue_item.iovec[0].iov_base, msg, msg_len);
- sort_queue_item.iovec[0].iov_len = msg_len;
- assert (sort_queue_item.iovec[0].iov_len > 0);
- assert (sort_queue_item.iovec[0].iov_len < FRAME_SIZE_MAX);
- sort_queue_item.iov_len = 1;
+ memcpy (sort_queue_item.mcast, msg, msg_len);
+ sort_queue_item.msg_len = msg_len;
if (sq_lt_compare (instance->my_high_seq_received,
mcast_header.seq)) {
@@ -4175,7 +4099,7 @@
void main_deliver_fn (
void *context,
const void *msg,
- size_t msg_len)
+ unsigned int msg_len)
{
struct totemsrp_instance *instance = context;
const struct message_header *message_header = msg;
Index: exec/totemrrp.h
===================================================================
--- exec/totemrrp.h (revision 2171)
+++ exec/totemrrp.h (working copy)
@@ -60,7 +60,7 @@
void (*deliver_fn) (
void *context,
const void *msg,
- size_t msg_len),
+ unsigned int msg_len),
void (*iface_change_fn) (
void *context,
@@ -81,18 +81,18 @@
extern int totemrrp_token_send (
hdb_handle_t handle,
- const struct iovec *iovec,
- unsigned int iov_len);
+ const void *msg,
+ unsigned int msg_len);
extern int totemrrp_mcast_noflush_send (
hdb_handle_t handle,
- const struct iovec *iovec,
- unsigned int iov_len);
+ const void *msg,
+ unsigned int msg_len);
extern int totemrrp_mcast_flush_send (
hdb_handle_t handle,
- const struct iovec *iovec,
- unsigned int iov_len);
+ const void *msg,
+ unsigned int msg_len);
extern int totemrrp_recv_flush (hdb_handle_t handle);
Index: exec/totemsrp.h
===================================================================
--- exec/totemsrp.h (revision 2171)
+++ exec/totemsrp.h (working copy)
@@ -53,8 +53,8 @@
void (*deliver_fn) (
unsigned int nodeid,
- const struct iovec *iovec,
- unsigned int iov_len,
+ const void *msg,
+ unsigned int msg_len,
int endian_conversion_required),
void (*confchg_fn) (
enum totem_configuration_type configuration_type,
Index: exec/main.c
===================================================================
--- exec/main.c (revision 2171)
+++ exec/main.c (working copy)
@@ -111,8 +111,6 @@
static struct totem_logging_configuration totem_logging_configuration;
-static char delivery_data[MESSAGE_SIZE_MAX];
-
static int num_config_modules;
static struct config_iface_ver0 *config_modules[MAX_DYNAMIC_SERVICES];
@@ -382,43 +380,30 @@
static void deliver_fn (
unsigned int nodeid,
- const struct iovec *iovec,
- unsigned int iov_len,
+ const void *msg,
+ unsigned int msg_len,
int endian_conversion_required)
{
- coroipc_request_header_t *header;
- int pos = 0;
- int i;
+ const coroipc_request_header_t *header;
int service;
int fn_id;
+ unsigned int id;
+ unsigned int size;
- /*
- * Build buffer without iovecs to make processing easier
- * This is only used for messages which are multicast with iovecs
- * and self-delivered. All other mechanisms avoid the copy.
- */
- if (iov_len > 1) {
- for (i = 0; i < iov_len; i++) {
- memcpy (&delivery_data[pos], iovec[i].iov_base, iovec[i].iov_len);
- pos += iovec[i].iov_len;
- assert (pos < MESSAGE_SIZE_MAX);
- }
- header = (coroipc_request_header_t *)delivery_data;
+ header = msg;
+ if (endian_conversion_required) {
+ id = swab32 (header->id);
+ size = swab32 (header->size);
} else {
- header = (coroipc_request_header_t *)iovec[0].iov_base;
+ id = header->id;
+ size = header->size;
}
- if (endian_conversion_required) {
- header->id = swab32 (header->id);
- header->size = swab32 (header->size);
- }
-// assert(iovec->iov_len == header->size);
-
/*
* Call the proper executive handler
*/
- service = header->id >> 16;
- fn_id = header->id & 0xffff;
+ service = id >> 16;
+ fn_id = id & 0xffff;
if (!ais_service[service])
return;
@@ -427,11 +412,11 @@
if (endian_conversion_required) {
assert(ais_service[service]->exec_engine[fn_id].exec_endian_convert_fn != NULL);
ais_service[service]->exec_engine[fn_id].exec_endian_convert_fn
- (header);
+ ((void *)msg);
}
ais_service[service]->exec_engine[fn_id].exec_handler_fn
- (header, nodeid);
+ (msg, nodeid);
serialize_unlock();
}
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais