[qpid-dispatch] branch main updated: DISPATCH-2194: Fix CONNECTOR - ENTITY_CACHE lock inversion

2021-07-13 Thread chug
This is an automated email from the ASF dual-hosted git repository.

chug pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
 new 269bfd4  DISPATCH-2194: Fix CONNECTOR - ENTITY_CACHE lock inversion
269bfd4 is described below

commit 269bfd46f3ac43ef6d1199f8cb271d117fd1ed9a
Author: Chuck Rolke 
AuthorDate: Fri Jul 9 16:26:59 2021 -0400

DISPATCH-2194: Fix CONNECTOR - ENTITY_CACHE lock inversion

The two locks are taken in both orders.

 * entity_cache first, connector second is routinely used by management
   entity updates in a pattern shared by all entities.

 * connector first, entity_cache second is used only when a connector
   creates or deletes an associated connection. The allocation and
   disposal of the connection causes an implicit entity_cache lock.

The fix is to avoid the connector - entity_cache lock order by allocating
the connection before taking the connector lock and by freeing the
connection after releasing the connector lock.

This patch also corrects an improper free call in a failure path.
---
 include/qpid/dispatch/server.h | 29 +++
 src/connection_manager.c   |  9 --
 src/server.c   | 66 +++---
 3 files changed, 90 insertions(+), 14 deletions(-)

diff --git a/include/qpid/dispatch/server.h b/include/qpid/dispatch/server.h
index faf8366..7f71912 100644
--- a/include/qpid/dispatch/server.h
+++ b/include/qpid/dispatch/server.h
@@ -569,6 +569,35 @@ void qd_connection_invoke_deferred(qd_connection_t *conn, 
qd_deferred_t call, vo
 
 
 /**
+ * Schedule a call to be invoked on a thread that has ownership of this 
connection
+ * when it will be safe for the callback to perform operations related to this 
connection.
+ * A qd_deferred_call_t object has been allocated before hand to avoid taking
+ * the ENTITY_CACHE lock.
+ *
+ * @param conn Connection object
+ * @param call The function to be invoked on the connection's thread
+ * @param context The context to be passed back in the callback
+ * @param dct Pointer to preallocated qd_deferred_call_t object
+ */
+void qd_connection_invoke_deferred_impl(qd_connection_t *conn, qd_deferred_t 
call, void *context, void *dct);
+
+
+/**
+ * Allocate a qd_deferred_call_t object
+ */
+void *qd_connection_new_qd_deferred_call_t();
+
+
+/**
+ * Deallocate a qd_deferred_call_t object
+ *
+ * @param dct Pointer to preallocated qd_deferred_call_t object
+ */
+void qd_connection_free_qd_deferred_call_t(void *dct);
+
+
+
+/**
  * Listen for incoming connections, return true if listening succeeded.
  */
 bool qd_listener_listen(qd_listener_t *l);
diff --git a/src/connection_manager.c b/src/connection_manager.c
index 6ef14cd..c77999b 100644
--- a/src/connection_manager.c
+++ b/src/connection_manager.c
@@ -1081,16 +1081,19 @@ void 
qd_connection_manager_delete_connector(qd_dispatch_t *qd, void *impl)
 // cannot free the timer while holding ct->lock since the
 // timer callback may be running during the call to qd_timer_free
 qd_timer_t *timer = 0;
+void *dct = qd_connection_new_qd_deferred_call_t();
 sys_mutex_lock(ct->lock);
 timer = ct->timer;
 ct->timer = 0;
 ct->state = CXTR_STATE_DELETED;
 qd_connection_t *conn = ct->qd_conn;
 if (conn && conn->pn_conn) {
-qd_connection_invoke_deferred(conn, deferred_close, conn->pn_conn);
+qd_connection_invoke_deferred_impl(conn, deferred_close, 
conn->pn_conn, dct);
+sys_mutex_unlock(ct->lock);
+} else {
+sys_mutex_unlock(ct->lock);
+qd_connection_free_qd_deferred_call_t(dct);
 }
-sys_mutex_unlock(ct->lock);
-
 qd_timer_free(timer);
 DEQ_REMOVE(qd->connection_manager->connectors, ct);
 qd_connector_decref(ct);
diff --git a/src/server.c b/src/server.c
index 4910f37..24e73ee 100644
--- a/src/server.c
+++ b/src/server.c
@@ -561,11 +561,13 @@ static void connection_wake(qd_connection_t *ctx) {
 if (ctx->pn_conn) pn_connection_wake(ctx->pn_conn);
 }
 
-/* Construct a new qd_connection. Thread safe. */
-qd_connection_t *qd_server_connection(qd_server_t *server, qd_server_config_t 
*config)
+/* Construct a new qd_connection. Thread safe.
+ * Does not allocate any managed objects and therefore
+ * does not take ENTITY_CACHE lock.
+ */
+qd_connection_t *qd_server_connection_impl(qd_server_t *server, 
qd_server_config_t *config, qd_connection_t *ctx)
 {
-qd_connection_t *ctx = new_qd_connection_t();
-if (!ctx) return NULL;
+assert(ctx);
 ZERO(ctx);
 ctx->pn_conn   = pn_connection();
 ctx->deferred_call_lock = sys_mutex();
@@ -574,7 +576,7 @@ qd_connection_t *qd_server_connection(qd_server_t *server, 
qd_server_config_t *c
 if (ctx->pn_conn) 

[qpid-dispatch] branch main updated: DISPATCH-2192: disable tcp window on terminal outcome or settlement

2021-07-13 Thread kgiusti
This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
 new 733c1df  DISPATCH-2192: disable tcp window on terminal outcome or 
settlement
733c1df is described below

commit 733c1dfc80edd0c3505d3dc62c9f6562381239f7
Author: Kenneth Giusti 
AuthorDate: Thu Jul 8 16:08:26 2021 -0400

DISPATCH-2192: disable tcp window on terminal outcome or settlement

This closes #1288
---
 src/adaptors/tcp_adaptor.c | 72 ++
 1 file changed, 41 insertions(+), 31 deletions(-)

diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index a5bd0d4..1de86aa 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -83,6 +83,7 @@ struct qdr_tcp_connection_t {
 sys_atomic_t  raw_closed_write;  // proton event seen or 
write_close called
 bool  raw_read_shutdown; // stream closed
 bool  read_eos_seen;
+bool  window_disabled;   // true: ignore unacked byte 
window
 qdr_delivery_t   *initial_delivery;
 qd_timer_t   *activate_timer;
 qd_tcp_bridge_t  *bridge; // config and stats
@@ -140,6 +141,15 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* 
conn);
 static void free_bridge_config(qd_tcp_bridge_t *config);
 static void qdr_tcp_open_server_side_connection(qdr_tcp_connection_t* tc);
 
+
+// is the incoming byte window full
+//
+inline static bool read_window_full(const qdr_tcp_connection_t* conn)
+{
+return !conn->window_disabled && conn->bytes_unacked >= TCP_MAX_CAPACITY;
+}
+
+
 static void allocate_tcp_buffer(pn_raw_buffer_t *buffer)
 {
 buffer->bytes = malloc(TCP_BUFFER_SIZE);
@@ -239,7 +249,7 @@ void qdr_tcp_q2_unblocked_handler(const qd_alloc_safe_ptr_t 
context)
 static int handle_incoming_raw_read(qdr_tcp_connection_t *conn, 
qd_buffer_list_t *buffers)
 {
 pn_raw_buffer_t raw_buffer;
-if ( conn->bytes_unacked >= TCP_MAX_CAPACITY || 
!pn_raw_connection_take_read_buffers(conn->pn_raw_conn, _buffer, 1)) {
+if (read_window_full(conn) || 
!pn_raw_connection_take_read_buffers(conn->pn_raw_conn, _buffer, 1)) {
 return 0;
 }
 int result = raw_buffer.size;
@@ -263,7 +273,7 @@ static int handle_incoming_raw_read(qdr_tcp_connection_t 
*conn, qd_buffer_list_t
 conn->bridge->bytes_in += result;
 UNLOCK(conn->bridge->stats_lock);
 conn->bytes_unacked += result;
-if (conn->bytes_unacked >= TCP_MAX_CAPACITY) {
+if (read_window_full(conn)) {
 qd_log(tcp_adaptor->log_source, QD_LOG_TRACE,
"[C%"PRIu64"] TCP RX window CLOSED: bytes in=%"PRIu64" 
unacked=%"PRIu64,
conn->conn_id, conn->bytes_in, conn->bytes_unacked);
@@ -1613,40 +1623,40 @@ static void qdr_tcp_delivery_update(void *context, 
qdr_delivery_t *dlv, uint64_t
 pn_raw_connection_close(tc->pn_raw_conn);
 }
 
-if (disp == PN_RECEIVED) {
-//
-// the consumer of this TCP flow has updated its tx_sequence:
-//
-bool window_opened = false;
-uint64_t ignore;
-qd_delivery_state_t *dstate = 
qdr_delivery_take_local_delivery_state(dlv, );
+// handle read window updates
 
-if (!dstate) {
-qd_log(tcp_adaptor->log_source, QD_LOG_ERROR,
-   "[C%"PRIu64"] BAD PN_RECEIVED - missing 
delivery-state!!", tc->conn_id);
-} else {
-// note: the PN_RECEIVED is generated by the remote TCP
-// adaptor, for simplicity we ignore the section_number since
-// all we really need is a byte offset:
+const bool window_was_full = read_window_full(tc);
+tc->window_disabled = settled || tc->window_disabled;
+
+if (!tc->window_disabled) {
+
+if (disp == PN_RECEIVED) {
+//
+// the consumer of this TCP flow has updated its tx_sequence:
 //
-const bool was_closed = tc->bytes_unacked >= TCP_MAX_CAPACITY;
-tc->bytes_unacked = tc->bytes_in - dstate->section_offset;
-window_opened = tc->bytes_unacked < TCP_MAX_CAPACITY;
-if (was_closed && window_opened) {
-qd_log(tcp_adaptor->log_source, QD_LOG_TRACE,
-   "[C%"PRIu64"] TCP RX window OPEN: bytes in=%"PRIu64
-   " unacked=%"PRIu64" remote bytes out=%"PRIu64,
-   tc->conn_id, tc->bytes_in, tc->bytes_unacked,
-   dstate->section_offset);
+uint64_t ignore;
+qd_delivery_state_t *dstate = 
qdr_delivery_take_local_delivery_state(dlv, );
+
+if (!dstate)