Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 1f0aff947 -> 69f52f283


DISPATCH-577: Server logs show unexpected POLLNVAL errors

Fixed locking mistake - must mark connector closed *before* closing socket.
Simplified poll logic.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/978e8090
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/978e8090
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/978e8090

Branch: refs/heads/master
Commit: 978e80909b44dcdd11b3b519acc7f5a36bf779c1
Parents: 1f0aff9
Author: Alan Conway <acon...@redhat.com>
Authored: Thu Dec 1 09:44:45 2016 -0500
Committer: Alan Conway <acon...@redhat.com>
Committed: Thu Dec 1 11:12:14 2016 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/driver.h |  2 +-
 src/aprintf.h                  |  2 +-
 src/http-libwebsockets.c       |  4 +--
 src/posix/driver.c             | 70 ++++++++++++++++++++-----------------
 4 files changed, 41 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/978e8090/include/qpid/dispatch/driver.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/driver.h b/include/qpid/dispatch/driver.h
index 28552f7..cf68776 100644
--- a/include/qpid/dispatch/driver.h
+++ b/include/qpid/dispatch/driver.h
@@ -338,7 +338,7 @@ void qdpn_connector_close(qdpn_connector_t *connector);
 
 /** Call when the socket is already closed, an the connector needs updating.
  *
- * @param[in] connector the connector whose socket will be closed
+ * @param[in] connector the connector whose socket has been closed
  */
 void qdpn_connector_after_close(qdpn_connector_t *connector);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/978e8090/src/aprintf.h
----------------------------------------------------------------------
diff --git a/src/aprintf.h b/src/aprintf.h
index f14c012..b73f7ff 100644
--- a/src/aprintf.h
+++ b/src/aprintf.h
@@ -49,7 +49,7 @@ static int vaprintf(char **begin, char *end, const char 
*format, va_list ap_in)
 /**
    Appending printf.
 
-   Print to buffer at *begin with null terminatr, do not go beyond end.
+   Print to buffer at *begin with null terminator, end points after end of 
buffer.
    Advance *begin to point to the null terminator.
 .  Return value:
    - 0 on success: advance *begin to the null terminator.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/978e8090/src/http-libwebsockets.c
----------------------------------------------------------------------
diff --git a/src/http-libwebsockets.c b/src/http-libwebsockets.c
index bbc3b15..c7c0043 100644
--- a/src/http-libwebsockets.c
+++ b/src/http-libwebsockets.c
@@ -157,7 +157,7 @@ static int callback_amqpws(struct lws *wsi, enum 
lws_callback_reasons reason,
 
     case LWS_CALLBACK_CLOSED: {
         qd_log(h->log, QD_LOG_DEBUG, "AMQP/WebSocket from %s closed", name);
-        qdpn_connector_after_close(c);
+        qdpn_connector_mark_closed(c);
         break;
     }
 
@@ -179,7 +179,7 @@ static int callback_http(struct lws *wsi, enum 
lws_callback_reasons reason, void
         break;
     }
     case LWS_CALLBACK_CLOSED: {
-        qdpn_connector_after_close(c);
+        qdpn_connector_mark_closed(c);
     }
     default:
         break;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/978e8090/src/posix/driver.c
----------------------------------------------------------------------
diff --git a/src/posix/driver.c b/src/posix/driver.c
index c4381b5..53ea809 100644
--- a/src/posix/driver.c
+++ b/src/posix/driver.c
@@ -45,13 +45,14 @@
 #include <qpid/dispatch/driver.h>
 #include <qpid/dispatch/error.h>
 #include <qpid/dispatch/threading.h>
-#include "alloc.h"
-#include "http.h"
 #include <proton/error.h>
 #include <proton/ssl.h>
 #include <proton/object.h>
 #include <qpid/dispatch/ctools.h>
-#include <qpid/dispatch/log.h>
+#include "alloc.h"
+#include "aprintf.h"
+#include "http.h"
+#include "log_private.h"
 
 /* Decls */
 
@@ -144,11 +145,17 @@ ALLOC_DEFINE(qdpn_connector_t);
 
 /* Impls */
 
-static void qdpn_log_errno(qdpn_driver_t *d, const char *msg)
+static void qdpn_log_errno(qdpn_driver_t *d, const char *fmt, ...)
 {
-    char ebuf[ERROR_MAX];
-    strerror_r(errno, ebuf, ERROR_MAX);
-    qd_log(d->log, QD_LOG_ERROR, "%s: %s", msg, ebuf);
+    char msg[QD_LOG_TEXT_MAX];
+    char *begin = msg, *end = msg+sizeof(msg);
+    va_list ap;
+    va_start(ap, fmt);
+    vaprintf(&begin, end, fmt, ap);
+    va_end(ap);
+    aprintf(&begin, end, ": ");
+    strerror_r(errno, begin, end - begin);
+    qd_log(d->log, QD_LOG_ERROR, "%s", msg);
 }
 
 
@@ -616,24 +623,25 @@ qdpn_listener_t *qdpn_connector_listener(qdpn_connector_t 
*ctor)
 }
 
 /* FD is already closed, update the connector state */
-void qdpn_connector_after_close(qdpn_connector_t *ctor)
+void qdpn_connector_mark_closed(qdpn_connector_t *ctor)
 {
     if (!ctor) return;
+    sys_mutex_lock(ctor->driver->lock);
     ctor->status = 0;
     if (!ctor->closed) {
-        sys_mutex_lock(ctor->driver->lock);
+        qd_log(ctor->driver->log, QD_LOG_TRACE, "closed %s", ctor->name);
         ctor->closed = true;
         ctor->driver->closed_count++;
-        sys_mutex_unlock(ctor->driver->lock);
     }
+    sys_mutex_unlock(ctor->driver->lock);
 }
 
 void qdpn_connector_close(qdpn_connector_t *ctor)
 {
     if (ctor && !ctor->closed) {
+        qdpn_connector_mark_closed(ctor);
         if (close(ctor->fd) == -1)
-            qdpn_log_errno(ctor->driver, "close");
-        qdpn_connector_after_close(ctor);
+            qdpn_log_errno(ctor->driver, "close %s", ctor->name);
     }
 }
 
@@ -734,7 +742,7 @@ static void connector_process(qdpn_connector_t *c)
             ssize_t n =  recv(c->fd, pn_transport_tail(transport), capacity, 
0);
             if (n < 0) {
                 if (errno != EAGAIN) {
-                    qdpn_log_errno(c->driver, "read");
+                    qdpn_log_errno(c->driver, "recv %s", c->name);
                     pn_transport_close_tail( transport );
                 }
             } else if (n == 0) { /* HUP */
@@ -763,9 +771,8 @@ static void connector_process(qdpn_connector_t *c)
             ssize_t n = send(c->fd, pn_transport_head(transport), pending, 0);
 #endif
             if (n < 0) {
-                // XXX
                 if (errno != EAGAIN) {
-                    qdpn_log_errno(c->driver, "send");
+                    qdpn_log_errno(c->driver, "send %s", c->name);
                     pn_transport_close_head( transport );
                 }
             } else if (n) {
@@ -776,7 +783,6 @@ static void connector_process(qdpn_connector_t *c)
 
     c->status = 0;
     if (pn_transport_closed(c->transport)) {
-        qd_log(c->driver->log, QD_LOG_TRACE, "Closed %s", c->name);
         qdpn_connector_close(c);
     } else {
         if (pn_transport_capacity(transport) > 0) c->status |= PN_SEL_RD;
@@ -931,27 +937,25 @@ int qdpn_driver_wait_3(qdpn_driver_t *d)
             c->pending_read = false;
             c->pending_write = false;
             c->pending_tick = false;
-        } else {
-            int idx = c->idx;
-            c->pending_read = (idx && d->fds[idx].revents & POLLIN);
-            c->pending_write = (idx && d->fds[idx].revents & POLLOUT);
+        } else if (c->idx) {
+            short revents = d->fds[c->idx].revents;
+            c->pending_read = (revents & POLLIN);
+            c->pending_write = (revents & POLLOUT);
+            c->socket_error = (revents & POLLERR);
             c->pending_tick = (c->wakeup &&  c->wakeup <= now);
-            if (idx && d->fds[idx].revents & POLLERR) {
-                c->socket_error = true;
-            } else if (idx && (d->fds[idx].revents & POLLHUP)) {
-                qd_log(c->driver->log, QD_LOG_TRACE, "hangup on connector %s", 
c->name);
-                /* poll() is signalling POLLHUP. to see what happened we need
+            if (revents & ~(POLLIN|POLLOUT|POLLERR|POLLHUP)) {
+                qd_log(c->driver->log, QD_LOG_ERROR, "unexpected poll events 
%04x on %s",
+                       revents, c->name);
+            }
+            if (revents & POLLHUP) {
+                qd_log(c->driver->log, QD_LOG_TRACE, "hangup on %s", c->name);
+                /* poll() is signalling POLLHUP. To see what happened we need
                  * to do an actual recv() to get the error code. But we might
                  * be in a state where we're not interested in input, in that
                  * case try to get the error code via send() */
-                if (d->fds[idx].events & POLLIN)
-                    c->pending_read = true;
-                else if (d->fds[idx].events & POLLOUT)
-                    c->pending_write = true;
-            } else if (idx && (d->fds[idx].revents & 
~(POLLIN|POLLOUT|POLLERR|POLLHUP))) {
-                qd_log(c->driver->log, QD_LOG_ERROR, "Unexpected poll events: 
%04x on %s",
-                       d->fds[idx].revents, c->name);
-                c->socket_error = true;
+                short events = d->fds[c->idx].events;
+                if (events & POLLIN) c->pending_read = true;
+                else if (events & POLLOUT) c->pending_write = true;
             }
         }
         c = DEQ_NEXT(c);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to