Attached is an initial set of changes for the Message Service (B.03.01).
This patch includes changes to the library interface and the stubbed-in
functions for the exec interface. Note that this patch does not provide
a fully functional messaging service implementation. The exec handlers
are still being developed.
Comments welcome.
Ryan
Index: trunk/include/saAis.h
===================================================================
--- trunk/include/saAis.h (revision 1499)
+++ trunk/include/saAis.h (working copy)
@@ -132,6 +132,14 @@
SA_AIS_ERR_NO_SECTIONS = 27
} SaAisErrorT;
+typedef union {
+ SaInt64T int64Value;
+ SaUint64T uint64Value;
+ SaTimeT timeValue;
+ SaFloatT floatValue;
+ SaDoubleT doubleValue;
+} SaLimitValueT;
+
typedef SaUint64T SaSelectionObjectT;
typedef SaUint64T SaInvocationT;
Index: trunk/include/saMsg.h
===================================================================
--- trunk/include/saMsg.h (revision 1499)
+++ trunk/include/saMsg.h (working copy)
@@ -144,13 +144,40 @@
SaMsgMessageReceivedCallbackT saMsgMessageReceivedCallback;
} SaMsgCallbacksT;
+typedef enum {
+ SA_MSG_QUEUE_CAPACITY_REACHED = 1,
+ SA_MSG_QUEUE_CAPACITY_AVAILABLE = 2,
+ SA_MSG_QUEUE_GROUP_CAPACITY_REACHED = 3,
+ SA_MSG_QUEUE_GROUP_CAPACITY_AVAILABLE = 4
+} SaMsgMessageCapacityStatusT;
+
+typedef struct {
+ SaSizeT capacityReached[SA_MSG_MESSAGE_LOWEST_PRIORITY + 1];
+ SaSizeT capacityAvailable[SA_MSG_MESSAGE_LOWEST_PRIORITY + 1];
+} SaMsgQueueThresholdsT;
+
+typedef enum {
+ SA_MSG_DEST_CAPACITY_STATUS = 1
+} SaMsgStateT;
+
+typedef enum {
+ SA_MSG_MAX_PRIORITY_AREA_SIZE_ID = 1,
+ SA_MSG_MAX_QUEUE_SIZE_ID = 2,
+ SA_MSG_MAX_NUM_QUEUES_ID = 3,
+ SA_MSG_MAX_NUM_QUEUE_GROUPS_ID = 4,
+ SA_MSG_MAX_NUM_QUEUES_PER_GROUP_ID = 5,
+ SA_MSG_MAX_MESSAGE_SIZE_ID = 6,
+ SA_MSG_MAX_REPLY_SIZE_ID = 7
+} SaMsgLimitIdT;
+
SaAisErrorT
saMsgInitialize (
SaMsgHandleT *msgHandle,
const SaMsgCallbacksT *msgCallbacks,
SaVersionT *version);
-SaAisErrorT saMsgSelectionObjectGet (
+SaAisErrorT
+saMsgSelectionObjectGet (
SaMsgHandleT msgHandle,
SaSelectionObjectT *selectionObject);
@@ -191,6 +218,11 @@
SaMsgQueueStatusT *queueStatus);
SaAisErrorT
+saMsgQueueRetentionTimeSet (
+ SaMsgQueueHandleT queueHandle,
+ SaTimeT *retentionTime);
+
+SaAisErrorT
saMsgQueueUnlink (
SaMsgQueueHandleT msgHandle,
const SaNameT *queueName);
@@ -231,6 +263,11 @@
const SaNameT *queueGroupName);
SaAisErrorT
+saMsgQueueGroupNotificationFree (
+ SaMsgHandleT msgHandle,
+ SaMsgQueueGroupNotificationT *notification);
+
+SaAisErrorT
saMsgMessageSend (
SaMsgHandleT msgHandle,
const SaNameT *destination,
@@ -254,6 +291,11 @@
SaTimeT timeout);
SaAisErrorT
+saMsgMessageDataFree (
+ SaMsgHandleT msgHandle,
+ void *data);
+
+SaAisErrorT
saMsgMessageCancel (
SaMsgQueueHandleT queueHandle);
@@ -273,12 +315,33 @@
const SaMsgSenderIdT *senderId,
SaTimeT timeout);
-SaAisErrorT saMsgMessageReplyAsync (
+SaAisErrorT
+saMsgMessageReplyAsync (
SaMsgHandleT msgHandle,
SaInvocationT invocation,
const SaMsgMessageT *replyMessage,
const SaMsgSenderIdT *senderId,
SaMsgAckFlagsT ackFlags);
+SaAisErrorT
+saMsgQueueCapacityThresholdsSet (
+ SaMsgQueueHandleT queueHandle,
+ const SaMsgQueueThresholdsT *thresholds);
+
+SaAisErrorT
+saMsgQueueCapacityThresholdsGet (
+ SaMsgQueueHandleT queueHandle,
+ SaMsgQueueThresholdsT *thresholds);
+
+SaAisErrorT
+saMsgMetadataSizeGet (
+ SaMsgHandleT msgHandle,
+ SaUint32T *metadataSize);
+
+SaAisErrorT
+saMsgLimitGet (
+ SaMsgHandleT msgHandle,
+ SaMsgLimitIdT limitId,
+ SaLimitValueT *limitValue);
+
#endif /* SAMSG_H_DEFINED */
-
Index: trunk/include/ipc_msg.h
===================================================================
--- trunk/include/ipc_msg.h (revision 1499)
+++ trunk/include/ipc_msg.h (working copy)
@@ -43,25 +43,33 @@
MESSAGE_REQ_MSG_QUEUEOPENASYNC = 1,
MESSAGE_REQ_MSG_QUEUECLOSE = 2,
MESSAGE_REQ_MSG_QUEUESTATUSGET = 3,
- MESSAGE_REQ_MSG_QUEUEUNLINK = 4,
- MESSAGE_REQ_MSG_QUEUEGROUPCREATE = 5,
- MESSAGE_REQ_MSG_QUEUEGROUPINSERT = 6,
- MESSAGE_REQ_MSG_QUEUEGROUPREMOVE = 7,
- MESSAGE_REQ_MSG_QUEUEGROUPDELETE = 8,
- MESSAGE_REQ_MSG_QUEUEGROUPTRACK = 9,
- MESSAGE_REQ_MSG_QUEUEGROUPTRACKSTOP = 10,
- MESSAGE_REQ_MSG_MESSAGESEND = 11,
- MESSAGE_REQ_MSG_MESSAGEGET = 12,
- MESSAGE_REQ_MSG_MESSAGECANCEL = 13,
- MESSAGE_REQ_MSG_MESSAGESENDRECEIVE = 14,
- MESSAGE_REQ_MSG_MESSAGEREPLY = 15
+ MESSAGE_REQ_MSG_QUEUERETENTIONTIMESET = 4,
+ MESSAGE_REQ_MSG_QUEUEUNLINK = 5,
+ MESSAGE_REQ_MSG_QUEUEGROUPCREATE = 6,
+ MESSAGE_REQ_MSG_QUEUEGROUPINSERT = 7,
+ MESSAGE_REQ_MSG_QUEUEGROUPREMOVE = 8,
+ MESSAGE_REQ_MSG_QUEUEGROUPDELETE = 9,
+ MESSAGE_REQ_MSG_QUEUEGROUPTRACK = 10,
+ MESSAGE_REQ_MSG_QUEUEGROUPTRACKSTOP = 11,
+ MESSAGE_REQ_MSG_MESSAGESEND = 12,
+ MESSAGE_REQ_MSG_MESSAGESENDASYNC = 13,
+ MESSAGE_REQ_MSG_MESSAGEGET = 14,
+ MESSAGE_REQ_MSG_MESSAGECANCEL = 15,
+ MESSAGE_REQ_MSG_MESSAGESENDRECEIVE = 16,
+ MESSAGE_REQ_MSG_MESSAGEREPLY = 17,
+ MESSAGE_REQ_MSG_MESSAGEREPLYASYNC = 18,
+ MESSAGE_REQ_MSG_QUEUECAPACITYTHRESHOLDSET = 19,
+ MESSAGE_REQ_MSG_QUEUECAPACITYTHRESHOLDGET = 20,
+ MESSAGE_REQ_MSG_METADATASIZEGET = 21,
+ MESSAGE_REQ_MSG_LIMITGET = 22
};
enum res_lib_msg_queue_types {
MESSAGE_RES_MSG_QUEUEOPEN = 0,
- MESSAGE_RES_MSG_QUEUEOPENASYNC = 2,
- MESSAGE_RES_MSG_QUEUECLOSE = 3,
- MESSAGE_RES_MSG_QUEUESTATUSGET = 4,
+ MESSAGE_RES_MSG_QUEUEOPENASYNC = 1,
+ MESSAGE_RES_MSG_QUEUECLOSE = 2,
+ MESSAGE_RES_MSG_QUEUESTATUSGET = 3,
+ MESSAGE_RES_MSG_QUEUERETENTIONTIMESET = 4,
MESSAGE_RES_MSG_QUEUEUNLINK = 5,
MESSAGE_RES_MSG_QUEUEGROUPCREATE = 6,
MESSAGE_RES_MSG_QUEUEGROUPINSERT = 7,
@@ -75,7 +83,11 @@
MESSAGE_RES_MSG_MESSAGECANCEL = 15,
MESSAGE_RES_MSG_MESSAGESENDRECEIVE = 16,
MESSAGE_RES_MSG_MESSAGEREPLY = 17,
- MESSAGE_RES_MSG_MESSAGEREPLYASYNC = 18
+ MESSAGE_RES_MSG_MESSAGEREPLYASYNC = 18,
+ MESSAGE_RES_MSG_QUEUECAPACITYTHRESHOLDSET = 19,
+ MESSAGE_RES_MSG_QUEUECAPACITYTHRESHOLDGET = 20,
+ MESSAGE_RES_MSG_METADATASIZEGET = 21,
+ MESSAGE_RES_MSG_LIMITGET = 22
};
struct req_lib_msg_queueopen {
@@ -123,6 +135,16 @@
SaMsgQueueStatusT queueStatus;
};
+struct req_lib_msg_queueretentiontimeset {
+ mar_res_header_t header;
+ SaNameT queueName;
+ SaTimeT retentionTime;
+};
+
+struct res_lib_msg_queueretentiontimeset {
+ mar_res_header_t header;
+};
+
struct req_lib_msg_queueunlink {
mar_req_header_t header;
SaNameT queueName;
@@ -263,4 +285,43 @@
mar_res_header_t header;
};
+struct req_lib_msg_queuecapacitythresholdset {
+ mar_res_header_t header;
+ SaNameT queueName;
+ SaMsgQueueThresholdsT thresholds;
+};
+
+struct res_lib_msg_queuecapacitythresholdset {
+ mar_res_header_t header;
+};
+
+struct req_lib_msg_queuecapacitythresholdget {
+ mar_res_header_t header;
+ SaNameT queueName;
+};
+
+struct res_lib_msg_queuecapacitythresholdget {
+ mar_res_header_t header;
+ SaMsgQueueThresholdsT thresholds;
+};
+
+struct req_lib_msg_metadatasizeget {
+ mar_res_header_t header;
+};
+
+struct res_lib_msg_metadatasizeget {
+ mar_res_header_t header;
+ SaUint32T metadataSize;
+};
+
+struct req_lib_msg_limitget {
+ mar_res_header_t header;
+ SaMsgLimitIdT limitId;
+};
+
+struct res_lib_msg_limitget {
+ mar_res_header_t header;
+ SaLimitValueT limitValue;
+};
+
#endif /* IPC_MSG_H_DEFINED */
Index: trunk/exec/msg.c
===================================================================
--- trunk/exec/msg.c (revision 1499)
+++ trunk/exec/msg.c (working copy)
@@ -70,18 +70,23 @@
MESSAGE_REQ_EXEC_MSG_QUEUEOPEN = 0,
MESSAGE_REQ_EXEC_MSG_QUEUECLOSE = 1,
MESSAGE_REQ_EXEC_MSG_QUEUESTATUSGET = 2,
- MESSAGE_REQ_EXEC_MSG_QUEUEUNLINK = 3,
- MESSAGE_REQ_EXEC_MSG_QUEUEGROUPCREATE = 4,
- MESSAGE_REQ_EXEC_MSG_QUEUEGROUPINSERT = 5,
- MESSAGE_REQ_EXEC_MSG_QUEUEGROUPREMOVE = 6,
- MESSAGE_REQ_EXEC_MSG_QUEUEGROUPDELETE = 7,
- MESSAGE_REQ_EXEC_MSG_QUEUEGROUPTRACK = 8,
- MESSAGE_REQ_EXEC_MSG_QUEUEGROUPTRACKSTOP = 9,
- MESSAGE_REQ_EXEC_MSG_MESSAGESEND = 10,
- MESSAGE_REQ_EXEC_MSG_MESSAGEGET = 11,
- MESSAGE_REQ_EXEC_MSG_MESSAGECANCEL = 12,
- MESSAGE_REQ_EXEC_MSG_MESSAGESENDRECEIVE = 13,
- MESSAGE_REQ_EXEC_MSG_MESSAGEREPLY = 14
+ MESSAGE_REQ_EXEC_MSG_QUEUERETENTIONTIMESET = 3,
+ MESSAGE_REQ_EXEC_MSG_QUEUEUNLINK = 4,
+ MESSAGE_REQ_EXEC_MSG_QUEUEGROUPCREATE = 5,
+ MESSAGE_REQ_EXEC_MSG_QUEUEGROUPINSERT = 6,
+ MESSAGE_REQ_EXEC_MSG_QUEUEGROUPREMOVE = 7,
+ MESSAGE_REQ_EXEC_MSG_QUEUEGROUPDELETE = 8,
+ MESSAGE_REQ_EXEC_MSG_QUEUEGROUPTRACK = 9,
+ MESSAGE_REQ_EXEC_MSG_QUEUEGROUPTRACKSTOP = 10,
+ MESSAGE_REQ_EXEC_MSG_MESSAGESEND = 11,
+ MESSAGE_REQ_EXEC_MSG_MESSAGEGET = 12,
+ MESSAGE_REQ_EXEC_MSG_MESSAGECANCEL = 13,
+ MESSAGE_REQ_EXEC_MSG_MESSAGESENDRECEIVE = 14,
+ MESSAGE_REQ_EXEC_MSG_MESSAGEREPLY = 15,
+ MESSAGE_REQ_EXEC_MSG_QUEUECAPACITYTHRESHOLDSET = 16,
+ MESSAGE_REQ_EXEC_MSG_QUEUECAPACITYTHRESHOLDGET = 17,
+ MESSAGE_REQ_EXEC_MSG_METADATASIZEGET = 18,
+ MESSAGE_REQ_EXEC_MSG_LIMITGET = 19
};
struct message_queue {
@@ -132,6 +137,10 @@
void *message,
unsigned int nodeid);
+static void message_handler_req_exec_msg_queueretentiontimeset (
+ void *message,
+ unsigned int nodeid);
+
static void message_handler_req_exec_msg_queueunlink (
void *message,
unsigned int nodeid);
@@ -180,6 +189,22 @@
void *message,
unsigned int nodeid);
+static void message_handler_req_exec_msg_queuecapacitythresholdset (
+ void *message,
+ unsigned int nodeid);
+
+static void message_handler_req_exec_msg_queuecapacitythresholdget (
+ void *message,
+ unsigned int nodeid);
+
+static void message_handler_req_exec_msg_metadatasizeget (
+ void *message,
+ unsigned int nodeid);
+
+static void message_handler_req_exec_msg_limitget (
+ void *message,
+ unsigned int nodeit);
+
static void message_handler_req_lib_msg_queueopen (
void *conn,
void *msg);
@@ -196,6 +221,10 @@
void *conn,
void *msg);
+static void message_handler_req_lib_msg_queueretentiontimeset (
+ void *conn,
+ void *msg);
+
static void message_handler_req_lib_msg_queueunlink (
void *conn,
void *msg);
@@ -252,6 +281,22 @@
void *conn,
void *msg);
+static void message_handler_req_lib_msg_queuecapacitythresholdset (
+ void *conn,
+ void *msg);
+
+static void message_handler_req_lib_msg_queuecapacitythresholdget (
+ void *conn,
+ void *msg);
+
+static void message_handler_req_lib_msg_metadatasizeget (
+ void *conn,
+ void *msg);
+
+static void message_handler_req_lib_msg_limitget (
+ void *conn,
+ void *msg);
+
#ifdef TODO
static void msg_sync_init (void);
#endif
@@ -273,7 +318,6 @@
struct list_head queue_cleanup_list;
};
-
/*
* Executive Handler Definition
*/
@@ -304,137 +348,181 @@
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
{ /* 4 */
+ .lib_handler_fn = message_handler_req_lib_msg_queueretentiontimeset,
+ .response_size = sizeof (struct res_lib_msg_queueretentiontimeset),
+ .response_id = MESSAGE_RES_MSG_QUEUERETENTIONTIMESET,
+ .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
+ },
+ { /* 5 */
.lib_handler_fn = message_handler_req_lib_msg_queueunlink,
.response_size = sizeof (struct res_lib_msg_queueunlink),
.response_id = MESSAGE_RES_MSG_QUEUEUNLINK,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
- { /* 5 */
+ { /* 6 */
.lib_handler_fn = message_handler_req_lib_msg_queuegroupcreate,
.response_size = sizeof (struct res_lib_msg_queuegroupcreate),
.response_id = MESSAGE_RES_MSG_QUEUEGROUPCREATE,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
- { /* 6 */
+ { /* 7 */
.lib_handler_fn = message_handler_req_lib_msg_queuegroupinsert,
.response_size = sizeof (struct res_lib_msg_queuegroupinsert),
.response_id = MESSAGE_RES_MSG_QUEUEGROUPINSERT,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
- { /* 7 */
+ { /* 8 */
.lib_handler_fn = message_handler_req_lib_msg_queuegroupremove,
.response_size = sizeof (struct res_lib_msg_queuegroupremove),
.response_id = MESSAGE_RES_MSG_QUEUEGROUPREMOVE,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
- { /* 8 */
+ { /* 9 */
.lib_handler_fn = message_handler_req_lib_msg_queuegroupdelete,
.response_size = sizeof (struct res_lib_msg_queuegroupdelete),
.response_id = MESSAGE_RES_MSG_QUEUEGROUPDELETE,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
- { /* 9 */
+ { /* 10 */
.lib_handler_fn = message_handler_req_lib_msg_queuegrouptrack,
.response_size = sizeof (struct res_lib_msg_queuegrouptrack),
.response_id = MESSAGE_RES_MSG_QUEUEGROUPTRACK,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
- { /* 10 */
+ { /* 11 */
.lib_handler_fn = message_handler_req_lib_msg_queuegrouptrackstop,
.response_size = sizeof (struct res_lib_msg_queuegrouptrackstop),
.response_id = MESSAGE_RES_MSG_QUEUEGROUPTRACKSTOP,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
- { /* 11 */
+ { /* 12 */
.lib_handler_fn = message_handler_req_lib_msg_messagesend,
.response_size = sizeof (struct res_lib_msg_messagesend),
.response_id = MESSAGE_RES_MSG_MESSAGESEND,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
- { /* 12 */
+ { /* 13 */
.lib_handler_fn = message_handler_req_lib_msg_messagesendasync,
.response_size = sizeof (struct res_lib_msg_messagesendasync),
.response_id = MESSAGE_RES_MSG_MESSAGESENDASYNC,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
- { /* 13 */
+ { /* 14 */
.lib_handler_fn = message_handler_req_lib_msg_messageget,
.response_size = sizeof (struct res_lib_msg_messageget),
.response_id = MESSAGE_RES_MSG_MESSAGEGET,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
- { /* 14 */
+ { /* 15 */
.lib_handler_fn = message_handler_req_lib_msg_messagecancel,
.response_size = sizeof (struct res_lib_msg_messagecancel),
.response_id = MESSAGE_RES_MSG_MESSAGECANCEL,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
- { /* 15 */
+ { /* 16 */
.lib_handler_fn = message_handler_req_lib_msg_messagesendreceive,
.response_size = sizeof (struct res_lib_msg_messagesendreceive),
.response_id = MESSAGE_RES_MSG_MESSAGESENDRECEIVE,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
- { /* 16 */
+ { /* 17 */
.lib_handler_fn = message_handler_req_lib_msg_messagereply,
.response_size = sizeof (struct res_lib_msg_messagereply),
.response_id = MESSAGE_RES_MSG_MESSAGEREPLY,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
- { /* 17 */
+ { /* 18 */
.lib_handler_fn = message_handler_req_lib_msg_messagereplyasync,
.response_size = sizeof (struct res_lib_msg_messagereplyasync),
.response_id = MESSAGE_RES_MSG_MESSAGEREPLYASYNC,
.flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
},
+ { /* 19 */
+ .lib_handler_fn = message_handler_req_lib_msg_queuecapacitythresholdset,
+ .response_size = sizeof (struct res_lib_msg_queuecapacitythresholdset),
+ .response_id = MESSAGE_RES_MSG_QUEUECAPACITYTHRESHOLDSET,
+ .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
+ },
+ { /* 20 */
+ .lib_handler_fn = message_handler_req_lib_msg_queuecapacitythresholdget,
+ .response_size = sizeof (struct res_lib_msg_queuecapacitythresholdget),
+ .response_id = MESSAGE_RES_MSG_QUEUECAPACITYTHRESHOLDGET,
+ .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
+ },
+ { /* 21 */
+ .lib_handler_fn = message_handler_req_lib_msg_metadatasizeget,
+ .response_size = sizeof (struct res_lib_msg_metadatasizeget),
+ .response_id = MESSAGE_RES_MSG_METADATASIZEGET,
+ .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
+ },
+ { /* 22 */
+ .lib_handler_fn = message_handler_req_lib_msg_limitget,
+ .response_size = sizeof (struct res_lib_msg_limitget),
+ .response_id = MESSAGE_RES_MSG_LIMITGET,
+ .flow_control = OPENAIS_FLOW_CONTROL_REQUIRED
+ }
};
-
static struct openais_exec_handler msg_exec_service[] = {
- {
+ { /* 0 */
.exec_handler_fn = message_handler_req_exec_msg_queueopen,
},
- {
+ { /* 1 */
.exec_handler_fn = message_handler_req_exec_msg_queueclose,
},
- {
+ { /* 2 */
.exec_handler_fn = message_handler_req_exec_msg_queuestatusget,
},
- {
+ { /* 3 */
+ .exec_handler_fn = message_handler_req_exec_msg_queueretentiontimeset,
+ },
+ { /* 4 */
.exec_handler_fn = message_handler_req_exec_msg_queueunlink,
},
- {
+ { /* 5 */
.exec_handler_fn = message_handler_req_exec_msg_queuegroupcreate,
},
- {
+ { /* 6 */
.exec_handler_fn = message_handler_req_exec_msg_queuegroupinsert,
},
- {
+ { /* 7 */
.exec_handler_fn = message_handler_req_exec_msg_queuegroupremove,
},
- {
+ { /* 8 */
.exec_handler_fn = message_handler_req_exec_msg_queuegroupdelete,
},
- {
+ { /* 9 */
.exec_handler_fn = message_handler_req_exec_msg_queuegrouptrack,
},
- {
+ { /* 10 */
.exec_handler_fn = message_handler_req_exec_msg_queuegrouptrackstop,
},
- {
+ { /* 11 */
.exec_handler_fn = message_handler_req_exec_msg_messagesend,
},
- {
+ { /* 12 */
.exec_handler_fn = message_handler_req_exec_msg_messageget,
},
- {
+ { /* 13 */
.exec_handler_fn = message_handler_req_exec_msg_messagecancel,
},
- {
+ { /* 14 */
.exec_handler_fn = message_handler_req_exec_msg_messagesendreceive,
},
- {
- .exec_handler_fn = message_handler_req_exec_msg_messagereply
+ { /* 15 */
+ .exec_handler_fn = message_handler_req_exec_msg_messagereply,
+ },
+ { /* 16 */
+ .exec_handler_fn = message_handler_req_exec_msg_queuecapacitythresholdset,
+ },
+ { /* 17 */
+ .exec_handler_fn = message_handler_req_exec_msg_queuecapacitythresholdget,
+ },
+ { /* 18 */
+ .exec_handler_fn = message_handler_req_exec_msg_metadatasizeget,
+ },
+ { /* 19 */
+ .exec_handler_fn = message_handler_req_exec_msg_limitget,
}
};
@@ -521,6 +609,12 @@
SaNameT queue_name;
};
+struct req_exec_msg_queueretentiontimeset {
+ mar_req_header_t header;
+ mar_message_source_t source;
+ SaTimeT retentionTime;
+};
+
struct req_exec_msg_queueunlink {
mar_req_header_t header;
mar_message_source_t source;
@@ -532,54 +626,64 @@
mar_message_source_t source;
SaNameT queue_group_name;
};
+
struct req_exec_msg_queuegroupinsert {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_name;
SaNameT queue_group_name;
};
+
struct req_exec_msg_queuegroupremove {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_name;
SaNameT queue_group_name;
};
+
struct req_exec_msg_queuegroupdelete {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_group_name;
};
+
struct req_exec_msg_queuegrouptrack {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_group_name;
};
+
struct req_exec_msg_queuegrouptrackstop {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_group_name;
};
+
struct req_exec_msg_messagesend {
mar_req_header_t header;
mar_message_source_t source;
SaNameT destination;
int async_call;
};
+
struct req_exec_msg_messageget {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_name;
};
+
struct req_exec_msg_messagecancel {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_name;
};
+
struct req_exec_msg_messagesendreceive {
mar_req_header_t header;
mar_message_source_t source;
SaNameT queue_name;
};
+
struct req_exec_msg_messagereply {
mar_req_header_t header;
mar_message_source_t source;
@@ -587,6 +691,27 @@
int async_call;
};
+struct req_exec_msg_queuecapacitythresholdset {
+ mar_req_header_t header;
+ mar_message_source_t source;
+ SaMsgQueueThresholdsT thresholds;
+};
+
+struct req_exec_msg_queuecapacitythresholdget {
+ mar_req_header_t header;
+ mar_message_source_t source;
+};
+
+struct req_exec_msg_metadatasizeget {
+ mar_req_header_t header;
+ mar_message_source_t source;
+};
+
+struct req_exec_msg_limitget {
+ mar_req_header_t header;
+ mar_message_source_t source;
+};
+
#ifdef TODO
static void msg_sync_init (void)
{
@@ -884,10 +1009,21 @@
#if 0
struct req_exec_msg_queuestatusget *req_exec_msg_queuestatusget =
(struct req_exec_msg_queuestatusget *)message;
- struct res_lib_msg_queueclose res_lib_msg_queuestatusget;
+ struct res_lib_msg_queuestatusget res_lib_msg_queuestatusget;
#endif
}
+static void message_handler_req_exec_msg_queueretentiontimeset (
+ void *message,
+ unsigned int nodeid)
+{
+#if 0
+ struct req_exec_msg_queueretentiontimeset *req_exec_msg_queueretentiontimeset =
+ (struct req_exec_msg_queueretentiontimeset *)message;
+ struct res_lib_msg_queueretentiontimeset res_lib_msg_queueretentiontimeset;
+#endif
+}
+
static void message_handler_req_exec_msg_queueunlink (
void *message,
unsigned int nodeid)
@@ -895,7 +1031,7 @@
#if 0
struct req_exec_msg_queueunlink *req_exec_msg_queueunlink =
(struct req_exec_msg_queueunlink *)message;
- struct res_lib_msg_queueclose res_lib_msg_queueunlink;
+ struct res_lib_msg_queueunlink res_lib_msg_queueunlink;
#endif
}
@@ -1070,7 +1206,7 @@
#if 0
struct req_exec_msg_queuegrouptrack *req_exec_msg_queuegrouptrack =
(struct req_exec_msg_queuegrouptrack *)message;
- struct res_lib_msg_queueclose res_lib_msg_queuegrouptrack;
+ struct res_lib_msg_queuegrouptrack res_lib_msg_queuegrouptrack;
#endif
}
@@ -1081,7 +1217,7 @@
#if 0
struct req_exec_msg_queuegrouptrackstop *req_exec_msg_queuegrouptrackstop =
(struct req_exec_msg_queuegrouptrackstop *)message;
- struct res_lib_msg_queueclose res_lib_msg_queuegrouptrackstop;
+ struct res_lib_msg_queuegrouptrackstop res_lib_msg_queuegrouptrackstop;
#endif
}
@@ -1092,7 +1228,7 @@
#if 0
struct req_exec_msg_messagesend *req_exec_msg_messagesend =
(struct req_exec_msg_messagesend *)message;
- struct res_lib_msg_queueclose res_lib_msg_messagesend;
+ struct res_lib_msg_messagesend res_lib_msg_messagesend;
#endif
}
@@ -1103,7 +1239,7 @@
#if 0
struct req_exec_msg_messageget *req_exec_msg_messageget =
(struct req_exec_msg_messageget *)message;
- struct res_lib_msg_queueclose res_lib_msg_messageget;
+ struct res_lib_msg_messageget res_lib_msg_messageget;
#endif
}
@@ -1114,7 +1250,7 @@
#if 0
struct req_exec_msg_messagecancel *req_exec_msg_messagecancel =
(struct req_exec_msg_messagecancel *)message;
- struct res_lib_msg_queueclose res_lib_msg_messagecancel;
+ struct res_lib_msg_messagecancel res_lib_msg_messagecancel;
#endif
}
@@ -1136,11 +1272,54 @@
#if 0
struct req_exec_msg_messagereply *req_exec_msg_messagereply =
(struct req_exec_msg_messagereply *)message;
- struct res_lib_msg_queueclose res_lib_msg_messagereply;
+ struct res_lib_msg_messagereply res_lib_msg_messagereply;
#endif
}
+static void message_handler_req_exec_msg_queuecapacitythresholdset (
+ void *message,
+ unsigned int nodeid)
+{
+#if 0
+ struct req_exec_msg_queuecapacitythresholdset *req_exec_msg_queuecapacitythresholdset =
+ (struct req_exec_msg_queuecapacitythresholdset *)message;
+ struct res_lib_msg_queuecapacitythresholdset res_lib_msg_queuecapacitythresholdset;
+#endif
+}
+static void message_handler_req_exec_msg_queuecapacitythresholdget (
+ void *message,
+ unsigned int nodeid)
+{
+#if 0
+ struct req_exec_msg_queuecapacitythresholdget *req_exec_msg_queuecapacitythresholdget =
+ (struct req_exec_msg_queuecapacitythresholdget *)message;
+ struct res_lib_msg_queuecapacitythresholdget res_lib_msg_queuecapacitythresholdget;
+#endif
+}
+
+static void message_handler_req_exec_msg_metadatasizeget (
+ void *message,
+ unsigned int nodeid)
+{
+#if 0
+ struct req_exec_msg_metadatasizeget *req_exec_msg_metadatasizeget =
+ (struct req_exec_msg_metadatasizeget *)message;
+ struct res_lib_msg_metadatasizeget res_lib_msg_metadatasizeget;
+#endif
+}
+
+static void message_handler_req_exec_msg_limitget (
+ void *message,
+ unsigned int nodeid)
+{
+#if 0
+ struct req_exec_msg_limitget *req_exec_msg_limitget =
+ (struct req_exec_msg_limitget *)message;
+ struct res_lib_msg_limitget res_lib_msg_limitget;
+#endif
+}
+
static void message_handler_req_lib_msg_queueopen (
void *conn,
void *msg)
@@ -1273,6 +1452,19 @@
TOTEMPG_AGREED) == 0);
}
+static void message_handler_req_lib_msg_queueretentiontimeset (
+ void *conn,
+ void *msg)
+{
+ struct req_lib_msg_queueretentiontimeset *req_lib_msg_queueretentiontimeset =
+ (struct req_lib_msg_queueretentiontimeset *)msg;
+ struct req_exec_msg_queueretentiontimeset req_exec_msg_queueretentiontimeset;
+ struct iovec iovec;
+
+ log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgQueueRetentiontimeSet %s\n",
+ getSaNameT (&req_lib_msg_queueretentiontimeset->queueName));
+}
+
static void message_handler_req_lib_msg_queueunlink (
void *conn,
void *msg)
@@ -1686,3 +1878,53 @@
assert (totempg_groups_mcast_joined (openais_group_handle, &iovec, 1,
TOTEMPG_AGREED) == 0);
}
+
+static void message_handler_req_lib_msg_queuecapacitythresholdset (
+ void *conn,
+ void *msg)
+{
+ struct req_lib_msg_queuecapacitythresholdset *req_lib_msg_queuecapacitythresholdset =
+ (struct req_lib_msg_queuecapacitythresholdset *)msg;
+ struct req_exec_msg_queuecapacitythresholdset req_exec_msg_queuecapacitythresholdset;
+ struct iovec iovec;
+
+ log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgQueueCapacityThresholdSet %s\n",
+ getSaNameT (&req_lib_msg_queuecapacitythresholdset->queueName));
+}
+
+static void message_handler_req_lib_msg_queuecapacitythresholdget (
+ void *conn,
+ void *msg)
+{
+ struct req_lib_msg_queuecapacitythresholdget *req_lib_msg_queuecapacitythresholdget =
+ (struct req_lib_msg_queuecapacitythresholdget *)msg;
+ struct req_exec_msg_queuecapacitythresholdget req_exec_msg_queuecapacitythresholdget;
+ struct iovec iovec;
+
+ log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgQueueCapacityThresholdGet %s\n",
+ getSaNameT (&req_lib_msg_queuecapacitythresholdget->queueName));
+}
+
+static void message_handler_req_lib_msg_metadatasizeget (
+ void *conn,
+ void *msg)
+{
+ struct req_lib_msg_metadatasizeget *req_lib_msg_metadatasizeget =
+ (struct req_lib_msg_metadatasizeget *)msg;
+ struct req_exec_msg_metadatasizeget req_exec_msg_metadatasizeget;
+ struct iovec iovec;
+
+ log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgMetadataSizeGet\n");
+}
+
+static void message_handler_req_lib_msg_limitget (
+ void *conn,
+ void *msg)
+{
+ struct req_lib_msg_limitget *req_lib_msg_limitget =
+ (struct req_lib_msg_limitget *)msg;
+ struct req_exec_msg_limitget req_exec_msg_limitget;
+ struct iovec iovec;
+
+ log_printf (LOG_LEVEL_NOTICE, "LIB request: saMsgLimitGet\n");
+}
Index: trunk/lib/msg.c
===================================================================
--- trunk/lib/msg.c (revision 1499)
+++ trunk/lib/msg.c (working copy)
@@ -83,7 +83,6 @@
pthread_mutex_t *response_mutex;
};
-
void msgHandleInstanceDestructor (void *instance);
void queueHandleInstanceDestructor (void *instance);
@@ -124,7 +123,6 @@
unsigned char data[0];
};
-
/*
* Implementation
*/
@@ -750,16 +748,55 @@
saHandleInstancePut (&msgHandleDatabase, msgHandle);
- if (error == SA_AIS_OK)
+ if (error == SA_AIS_OK) {
error = res_lib_msg_queuestatusget.header.error;
+ }
if (error == SA_AIS_OK) {
memcpy (queueStatus, &res_lib_msg_queuestatusget.queueStatus,
sizeof (SaMsgQueueStatusT));
}
+
return (error);
}
+SaAisErrorT
+saMsgQueueRetentionTimeSet (
+ SaMsgQueueHandleT queueHandle,
+ SaTimeT *retentionTime)
+{
+ SaAisErrorT error;
+ struct msgQueueInstance *msgQueueInstance;
+ struct req_lib_msg_queueretentiontimeset req_lib_msg_queueretentiontimeset;
+ struct res_lib_msg_queueretentiontimeset res_lib_msg_queueretentiontimeset;
+ if (retentionTime == NULL) {
+ return (SA_AIS_ERR_INVALID_PARAM);
+ }
+
+ error = saHandleInstanceGet (&queueHandleDatabase, queueHandle, (void *)&msgQueueInstance);
+ if (error != SA_AIS_OK) {
+ return (error);
+ }
+
+ req_lib_msg_queueretentiontimeset.header.size = sizeof (struct req_lib_msg_queueretentiontimeset);
+ req_lib_msg_queueretentiontimeset.header.id = MESSAGE_REQ_MSG_QUEUERETENTIONTIMESET;
+ memcpy (&req_lib_msg_queueretentiontimeset.retentionTime, retentionTime, sizeof (SaTimeT));
+
+ pthread_mutex_lock (msgQueueInstance->response_mutex);
+
+ error = saSendReceiveReply (msgQueueInstance->response_fd,
+ &req_lib_msg_queueretentiontimeset,
+ sizeof (struct req_lib_msg_queueretentiontimeset),
+ &res_lib_msg_queueretentiontimeset,
+ sizeof (struct res_lib_msg_queueretentiontimeset));
+
+ pthread_mutex_unlock (msgQueueInstance->response_mutex);
+
+ saHandleInstancePut (&queueHandleDatabase, queueHandle);
+
+ return (error == SA_AIS_OK ? res_lib_msg_queueretentiontimeset.header.error : error);
+}
+
SaAisErrorT
saMsgQueueUnlink (
SaMsgHandleT msgHandle,
@@ -1035,6 +1072,28 @@
}
SaAisErrorT
+saMsgQueueGroupNotificationFree (
+ SaMsgHandleT msgHandle,
+ SaMsgQueueGroupNotificationT *notification)
+{
+ SaAisErrorT error;
+ struct msgInstance *msgInstance;
+
+ error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
+ if (error != SA_AIS_OK) {
+ return (error);
+ }
+
+ if (notification == NULL) {
+ return (SA_AIS_ERR_INVALID_PARAM);
+ }
+
+ free (notification);
+
+ return (SA_AIS_OK);
+}
+
+SaAisErrorT
saMsgMessageSend (
SaMsgHandleT msgHandle,
const SaNameT *destination,
@@ -1141,17 +1200,41 @@
saHandleInstancePut (&queueHandleDatabase, queueHandle);
- if (error == SA_AIS_OK)
+ if (error == SA_AIS_OK) {
error = res_lib_msg_messageget.header.error;
+ }
if (error == SA_AIS_OK) {
*sendTime = res_lib_msg_messageget.sendTime;
memcpy (senderId, &res_lib_msg_messageget.senderId,
sizeof (SaMsgSenderIdT));
}
+
return (error);
}
SaAisErrorT
+saMsgMessageDataFree (
+ SaMsgHandleT msgHandle,
+ void *data)
+{
+ SaAisErrorT error;
+ struct msgInstance *msgInstance;
+
+ error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
+ if (error != SA_AIS_OK) {
+ return (error);
+ }
+
+ if (data == NULL) {
+ return (SA_AIS_ERR_INVALID_PARAM);
+ }
+
+ free (data);
+
+ return (SA_AIS_OK);
+}
+
+SaAisErrorT
saMsgMessageCancel (
SaMsgQueueHandleT queueHandle)
{
@@ -1220,15 +1303,16 @@
saHandleInstancePut (&msgHandleDatabase, msgHandle);
- if (error == SA_AIS_OK)
+ if (error == SA_AIS_OK) {
error = res_lib_msg_messagesendreceive.header.error;
+ }
if (error == SA_AIS_OK) {
*replySendTime = res_lib_msg_messagesendreceive.replySendTime;
}
+
return (error);
}
-
SaAisErrorT
saMsgMessageReply (
SaMsgHandleT msgHandle,
@@ -1265,7 +1349,8 @@
return (error == SA_AIS_OK ? res_lib_msg_messagereply.header.error : error);
}
-SaAisErrorT saMsgMessageReplyAsync (
+SaAisErrorT
+saMsgMessageReplyAsync (
SaMsgHandleT msgHandle,
SaInvocationT invocation,
const SaMsgMessageT *replyMessage,
@@ -1300,3 +1385,162 @@
return (error == SA_AIS_OK ? res_lib_msg_messagereplyasync.header.error : error);
}
+
+SaAisErrorT
+saMsgQueueCapacityThresholdSet (
+ SaMsgQueueHandleT queueHandle,
+ const SaMsgQueueThresholdsT *thresholds)
+{
+ SaAisErrorT error;
+ struct msgQueueInstance *msgQueueInstance;
+ struct req_lib_msg_queuecapacitythresholdset req_lib_msg_queuecapacitythresholdset;
+ struct res_lib_msg_queuecapacitythresholdset res_lib_msg_queuecapacitythresholdset;
+
+ error = saHandleInstanceGet (&queueHandleDatabase, queueHandle, (void *)&msgQueueInstance);
+ if (error != SA_AIS_OK) {
+ return (error);
+ }
+
+ req_lib_msg_queuecapacitythresholdset.header.size = sizeof (struct req_lib_msg_queuecapacitythresholdset);
+ req_lib_msg_queuecapacitythresholdset.header.id = MESSAGE_REQ_MSG_QUEUECAPACITYTHRESHOLDSET;
+ memcpy (&req_lib_msg_queuecapacitythresholdset.thresholds, thresholds, sizeof (SaMsgQueueThresholdsT));
+
+ pthread_mutex_lock (msgQueueInstance->response_mutex);
+
+ error = saSendReceiveReply (msgQueueInstance->response_fd,
+ &req_lib_msg_queuecapacitythresholdset,
+ sizeof (struct req_lib_msg_queuecapacitythresholdset),
+ &res_lib_msg_queuecapacitythresholdset,
+ sizeof (struct res_lib_msg_queuecapacitythresholdset));
+
+ pthread_mutex_unlock (msgQueueInstance->response_mutex);
+
+ saHandleInstancePut (&queueHandleDatabase, queueHandle);
+
+ return (error = SA_AIS_OK ? res_lib_msg_queuecapacitythresholdset.header.error : error);
+}
+
+SaAisErrorT
+saMsgQueueCapacityThresholdsGet (
+ SaMsgQueueHandleT queueHandle,
+ SaMsgQueueThresholdsT *thresholds)
+{
+ SaAisErrorT error;
+ struct msgQueueInstance *msgQueueInstance;
+ struct req_lib_msg_queuecapacitythresholdget req_lib_msg_queuecapacitythresholdget;
+ struct res_lib_msg_queuecapacitythresholdget res_lib_msg_queuecapacitythresholdget;
+
+ error = saHandleInstanceGet (&queueHandleDatabase, queueHandle, (void *)&msgQueueInstance);
+ if (error != SA_AIS_OK) {
+ return (error);
+ }
+
+ req_lib_msg_queuecapacitythresholdget.header.size = sizeof (struct req_lib_msg_queuecapacitythresholdget);
+ req_lib_msg_queuecapacitythresholdget.header.id = MESSAGE_REQ_MSG_QUEUECAPACITYTHRESHOLDGET;
+
+ pthread_mutex_lock (msgQueueInstance->response_mutex);
+
+ error = saSendReceiveReply (msgQueueInstance->response_fd,
+ &req_lib_msg_queuecapacitythresholdget,
+ sizeof (struct req_lib_msg_queuecapacitythresholdget),
+ &res_lib_msg_queuecapacitythresholdget,
+ sizeof (struct res_lib_msg_queuecapacitythresholdget));
+
+ pthread_mutex_unlock (msgQueueInstance->response_mutex);
+
+ saHandleInstancePut (&queueHandleDatabase, queueHandle);
+
+ if (error == SA_AIS_OK) {
+ error = res_lib_msg_queuecapacitythresholdget.header.error;
+ }
+ if (error == SA_AIS_OK) {
+ memcpy (thresholds, &res_lib_msg_queuecapacitythresholdget.thresholds,
+ sizeof (SaMsgQueueThresholdsT));
+ }
+
+ return (error);
+}
+
+SaAisErrorT
+saMsgMetadataSizeGet (
+ SaMsgHandleT msgHandle,
+ SaUint32T *metadataSize)
+{
+ SaAisErrorT error;
+ struct msgInstance *msgInstance;
+ struct req_lib_msg_metadatasizeget req_lib_msg_metadatasizeget;
+ struct res_lib_msg_metadatasizeget res_lib_msg_metadatasizeget;
+
+ error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
+ if (error != SA_AIS_OK) {
+ return (error);
+ }
+
+ req_lib_msg_metadatasizeget.header.size = sizeof (struct req_lib_msg_metadatasizeget);
+ req_lib_msg_metadatasizeget.header.id = MESSAGE_REQ_MSG_METADATASIZEGET;
+
+ pthread_mutex_lock (&msgInstance->response_mutex);
+
+ error = saSendReceiveReply (msgInstance->response_fd,
+ &req_lib_msg_metadatasizeget,
+ sizeof (struct req_lib_msg_metadatasizeget),
+ &res_lib_msg_metadatasizeget,
+ sizeof (struct res_lib_msg_metadatasizeget));
+
+ pthread_mutex_unlock (&msgInstance->response_mutex);
+
+ saHandleInstancePut (&msgHandleDatabase, msgHandle);
+
+ if (error == SA_AIS_OK) {
+ error = res_lib_msg_metadatasizeget.header.error;
+ }
+ if (error == SA_AIS_OK) {
+ memcpy (metadataSize, &res_lib_msg_metadatasizeget.metadataSize,
+ sizeof (SaUint32T));
+ }
+
+ return (error);
+}
+
+SaAisErrorT
+saMsgLimitGet (
+ SaMsgHandleT msgHandle,
+ SaMsgLimitIdT limitId,
+ SaLimitValueT *limitvalue)
+{
+ SaAisErrorT error;
+ struct msgInstance *msgInstance;
+ struct req_lib_msg_limitget req_lib_msg_limitget;
+ struct res_lib_msg_limitget res_lib_msg_limitget;
+
+ error = saHandleInstanceGet (&msgHandleDatabase, msgHandle, (void *)&msgInstance);
+ if (error != SA_AIS_OK) {
+ return (error);
+ }
+
+ req_lib_msg_limitget.header.size = sizeof (struct req_lib_msg_limitget);
+ req_lib_msg_limitget.header.id = MESSAGE_REQ_MSG_LIMITGET;
+ req_lib_msg_limitget.limitId = limitId;
+
+ pthread_mutex_lock (&msgInstance->response_mutex);
+
+ error = saSendReceiveReply (msgInstance->response_fd,
+ &req_lib_msg_limitget,
+ sizeof (struct req_lib_msg_limitget),
+ &res_lib_msg_limitget,
+ sizeof (struct res_lib_msg_limitget));
+
+ pthread_mutex_unlock (&msgInstance->response_mutex);
+
+ saHandleInstancePut (&msgHandleDatabase, msgHandle);
+
+ if (error == SA_AIS_OK) {
+ error = res_lib_msg_limitget.header.error;
+ }
+ if (error == SA_AIS_OK) {
+ memcpy (limitvalue, &res_lib_msg_limitget.limitValue,
+ sizeof (SaLimitValueT));
+ }
+
+ return (error);
+}
_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais