This is to be used later for totem patch.

-Angus


Signed-off-by: Angus Salkeld <[email protected]>
---
 exec/coroipcs.c                |   54 ++++++++++++++++++++++++++++++++-------
 exec/main.c                    |   10 +++++++
 include/corosync/coroipc_ipc.h |    5 +++
 lib/coroipcc.c                 |    6 ++--
 4 files changed, 62 insertions(+), 13 deletions(-)

diff --git a/exec/coroipcs.c b/exec/coroipcs.c
index 065a355..2e816e6 100644
--- a/exec/coroipcs.c
+++ b/exec/coroipcs.c
@@ -139,6 +139,7 @@ struct conn_info {
        unsigned int service;
        enum conn_state state;
        int notify_flow_control_enabled;
+       int flow_control_state;
        int refcount;
        hdb_handle_t stats_handle;
 #if _POSIX_THREAD_PROCESS_SHARED < 1
@@ -1188,6 +1189,36 @@ static void memcpy_dwrap (struct conn_info *conn_info, 
void *msg, unsigned int l
        conn_info->control_buffer->write = (write_idx + len) % 
conn_info->dispatch_size;
 }
 
+/**
+ * simulate the behaviour in coroipcc.c
+ */
+static int send_flow_control_event(struct conn_info *conn_info, char event)
+{
+       int new_fc = 0;
+
+       if (event == MESSAGE_RES_OUTQ_NOT_EMPTY ||
+               event == MESSAGE_RES_ENABLE_FLOWCONTROL) {
+               new_fc = 1;
+       }
+
+       if (conn_info->flow_control_state != new_fc) {
+               /* these log should be info, not err */
+               if (new_fc == 1)
+                       api->log_printf ("Enabling flow control for %d, event 
%d\n",
+                                                        conn_info->client_pid, 
event);
+               else
+                       api->log_printf ("Disabling flow control for %d, event 
%d\n",
+                                                        conn_info->client_pid, 
event);
+               conn_info->flow_control_state = new_fc;
+               stats_api->stats_update_value (conn_info->stats_handle, 
"flow_control",
+                                                                          
&conn_info->flow_control_state,
+                                                                          
sizeof(conn_info->flow_control_state));
+               stats_api->stats_increment_value (conn_info->stats_handle, 
"flow_control_count");
+       }
+
+       return send (conn_info->fd, &event, 1, MSG_NOSIGNAL);
+}
+
 static void msg_send (void *conn, const struct iovec *iov, unsigned int 
iov_len,
                      int locked)
 {
@@ -1197,14 +1228,16 @@ static void msg_send (void *conn, const struct iovec 
*iov, unsigned int iov_len,
 #endif
        int res;
        int i;
-       char buf;
 
        for (i = 0; i < iov_len; i++) {
                memcpy_dwrap (conn_info, iov[i].iov_base, iov[i].iov_len);
        }
 
-       buf = !list_empty (&conn_info->outq_head);
-       res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+       if (list_empty (&conn_info->outq_head))
+               res = send_flow_control_event (conn_info, 
MESSAGE_RES_OUTQ_EMPTY);
+       else
+               res = send_flow_control_event (conn_info, 
MESSAGE_RES_OUTQ_NOT_EMPTY);
+
        if (res == -1 && errno == EAGAIN) {
                if (locked == 0) {
                        pthread_mutex_lock (&conn_info->mutex);
@@ -1244,13 +1277,11 @@ static void outq_flush (struct conn_info *conn_info) {
        struct outq_item *outq_item;
        unsigned int bytes_left;
        struct iovec iov;
-       char buf;
        int res;
 
        pthread_mutex_lock (&conn_info->mutex);
        if (list_empty (&conn_info->outq_head)) {
-               buf = 3;
-               res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+               res = send_flow_control_event (conn_info, 
MESSAGE_RES_OUTQ_FLUSH_NR);
                pthread_mutex_unlock (&conn_info->mutex);
                return;
        }
@@ -1692,9 +1723,13 @@ int coroipcs_handler_dispatch (
        coroipcs_refcount_inc (conn_info);
        pthread_mutex_lock (&conn_info->mutex);
        if ((conn_info->state == CONN_STATE_THREAD_ACTIVE) && (revent & 
POLLOUT)) {
-               buf = !list_empty (&conn_info->outq_head);
+               if (list_empty (&conn_info->outq_head))
+                       buf = MESSAGE_RES_OUTQ_EMPTY;
+               else
+                       buf = MESSAGE_RES_OUTQ_NOT_EMPTY;
+
                for (; conn_info->pending_semops;) {
-                       res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+                       res = send_flow_control_event (conn_info, buf);
                        if (res == 1) {
                                conn_info->pending_semops--;
                        } else {
@@ -1702,8 +1737,7 @@ int coroipcs_handler_dispatch (
                        }
                }
                if (conn_info->notify_flow_control_enabled) {
-                       buf = 2;
-                       res = send (conn_info->fd, &buf, 1, MSG_NOSIGNAL);
+                       res = send_flow_control_event (conn_info, 
MESSAGE_RES_ENABLE_FLOWCONTROL);
                        if (res == 1) {
                                conn_info->notify_flow_control_enabled = 0;
                        }
diff --git a/exec/main.c b/exec/main.c
index 1cb12dd..def3013 100644
--- a/exec/main.c
+++ b/exec/main.c
@@ -994,6 +994,16 @@ static hdb_handle_t corosync_stats_create_connection 
(const char* name,
                                                                        
&zero_64, sizeof (zero_64),
                                                                        
OBJDB_VALUETYPE_UINT64);
 
+       objdb->object_key_create_typed (object_handle,
+               "flow_control",
+               &zero_32, sizeof (zero_32),
+               OBJDB_VALUETYPE_UINT32);
+
+       objdb->object_key_create_typed (object_handle,
+               "flow_control_count",
+               &zero_64, sizeof (zero_64),
+               OBJDB_VALUETYPE_UINT64);
+
        return object_handle;
 }
 
diff --git a/include/corosync/coroipc_ipc.h b/include/corosync/coroipc_ipc.h
index de25e48..5126faa 100644
--- a/include/corosync/coroipc_ipc.h
+++ b/include/corosync/coroipc_ipc.h
@@ -62,6 +62,11 @@ enum req_init_types {
 #define MESSAGE_REQ_CHANGE_EUID                1
 #define MESSAGE_REQ_OUTQ_FLUSH         2
 
+#define MESSAGE_RES_OUTQ_EMPTY         0
+#define MESSAGE_RES_OUTQ_NOT_EMPTY     1
+#define MESSAGE_RES_ENABLE_FLOWCONTROL 2
+#define MESSAGE_RES_OUTQ_FLUSH_NR      3
+
 struct control_buffer {
        unsigned int read;
        unsigned int write;
diff --git a/lib/coroipcc.c b/lib/coroipcc.c
index c0861a5..aa9546c 100644
--- a/lib/coroipcc.c
+++ b/lib/coroipcc.c
@@ -849,7 +849,7 @@ coroipcc_dispatch_get (
                goto error_put;
        }
        ipc_instance->flow_control_state = 0;
-       if (buf == 1 || buf == 2) {
+       if (buf == MESSAGE_RES_OUTQ_NOT_EMPTY || buf == 
MESSAGE_RES_ENABLE_FLOWCONTROL) {
                ipc_instance->flow_control_state = 1;
        }
        /*
@@ -864,11 +864,11 @@ coroipcc_dispatch_get (
         * This is just a notification of flow control starting at the addition
         * of a new pending message, not a message to dispatch
         */
-       if (buf == 2) {
+       if (buf == MESSAGE_RES_ENABLE_FLOWCONTROL) {
                error = CS_ERR_TRY_AGAIN;
                goto error_put;
        }
-       if (buf == 3) {
+       if (buf == MESSAGE_RES_OUTQ_FLUSH_NR) {
                error = CS_ERR_TRY_AGAIN;
                goto error_put;
        }
-- 
1.6.3.4

_______________________________________________
Openais mailing list
[email protected]
https://lists.linux-foundation.org/mailman/listinfo/openais

Reply via email to