Module: kamailio
Branch: master
Commit: 78c0275b9081d4ee18a89f702f3931e3c3f83489
URL: 
https://github.com/kamailio/kamailio/commit/78c0275b9081d4ee18a89f702f3931e3c3f83489

Author: joelbax <[email protected]>
Committer: GitHub <[email protected]>
Date: 2023-07-17T17:21:36+02:00

rabbitmq: Adding amqps support (#3511)

* rabbitmq: Adding amqps support

Adding support for secure AMQP connections over TLS (amqps).

* rabbitmq: Adding amqps support

Adding support for secure AMQP connections over TLS (amqps).

rabbitmq: Adding amqps support

Adding support for secure AMQP connections over TLS (amqps).

rabbitmq: Format fixes

Some style format fixes

* rabbitmq: Format fixes

Fixing some missing spaces

* rabbitmq: Typo fix

Fixing inilialized by initialized

---

Modified: src/modules/rabbitmq/rabbitmq.c
Modified: src/modules/rabbitmq/rabbitmq.h

---

Diff:  
https://github.com/kamailio/kamailio/commit/78c0275b9081d4ee18a89f702f3931e3c3f83489.diff
Patch: 
https://github.com/kamailio/kamailio/commit/78c0275b9081d4ee18a89f702f3931e3c3f83489.patch

---

diff --git a/src/modules/rabbitmq/rabbitmq.c b/src/modules/rabbitmq/rabbitmq.c
index f1c44ff113b..94341070508 100644
--- a/src/modules/rabbitmq/rabbitmq.c
+++ b/src/modules/rabbitmq/rabbitmq.c
@@ -52,6 +52,7 @@
 
 #include <stdint.h>
 #include <amqp_tcp_socket.h>
+#include <amqp_ssl_socket.h>
 #include <amqp.h>
 #include <amqp_framing.h>
 
@@ -73,10 +74,12 @@ static amqp_connection_state_t amqp_conn = NULL;
 /* module parameters */
 static struct amqp_connection_info amqp_info;
 static char *amqp_url = RABBITMQ_DEFAULT_AMQP_URL;
+static char *rmq_amqps_ca_file = NULL;
 static int max_reconnect_attempts = 1;
 static int timeout_sec = 1;
 static int timeout_usec = 0;
 static int direct_reply_to = 0;
+static int amqp_ssl_init_called = 0;
 
 /* module helper functions */
 static int rabbitmq_connect(amqp_connection_state_t *conn);
@@ -124,6 +127,7 @@ static cmd_export_t cmds[] = {
 
 /* module parameters */
 static param_export_t params[] = {{"url", PARAM_STRING, &amqp_url},
+               {"amqps_ca_file", PARAM_STRING, &rmq_amqps_ca_file},
                {"timeout_sec", PARAM_INT, &timeout_sec},
                {"timeout_usec", PARAM_INT, &timeout_usec},
                {"direct_reply_to", PARAM_INT, &direct_reply_to}, {0, 0, 0}};
@@ -557,25 +561,52 @@ static int rabbitmq_connect(amqp_connection_state_t *conn)
        int ret;
        int log_ret;
        //      amqp_rpc_reply_t reply;
+       
+       // amqp_ssl_init_called should only be called once
+       if(amqp_info.ssl && !amqp_ssl_init_called) {
+               amqp_set_initialize_ssl_library(1);
+               amqp_ssl_init_called = 1;
+               LM_DBG("AMQP SSL library initialized\n");
+       }
 
        // establish a new connection to RabbitMQ server
        *conn = amqp_new_connection();
+       if(!conn) {
+               LM_ERR("FAIL: create AMQP connection\n");
+               return RABBITMQ_ERR_CREATE;
+       }
        log_ret = log_on_amqp_error(
                        amqp_get_rpc_reply(*conn), "amqp_new_connection()");
        if(log_ret != AMQP_RESPONSE_NORMAL && log_ret != AMQP_RESPONSE_NONE) {
                return RABBITMQ_ERR_CONNECT;
        }
 
-       amqp_sock = amqp_tcp_socket_new(*conn);
+       amqp_sock = (amqp_info.ssl) ? amqp_ssl_socket_new(*conn)
+                                                               : 
amqp_tcp_socket_new(*conn);
        if(!amqp_sock) {
                LM_ERR("FAIL: create TCP amqp_sock");
                amqp_destroy_connection(*conn);
                return RABBITMQ_ERR_SOCK;
        }
 
+       if(rmq_amqps_ca_file) {
+               if(amqp_ssl_socket_set_cacert(amqp_sock, rmq_amqps_ca_file)) {
+                       LM_ERR("Failed to set CA certificate for amqps 
connection\n");
+                       return RABBITMQ_ERR_SSL_CACERT;
+               }
+       }
+
+#if AMQP_VERSION_MAJOR == 0 && AMQP_VERSION_MINOR < 8
+       amqp_ssl_socket_set_verify(amqp_sock, 1);
+#else
+       amqp_ssl_socket_set_verify_peer(amqp_sock, 1);
+       amqp_ssl_socket_set_verify_hostname(amqp_sock, 1);
+#endif
+
        ret = amqp_socket_open(amqp_sock, amqp_info.host, amqp_info.port);
        if(ret != AMQP_STATUS_OK) {
-               LM_ERR("FAIL: open TCP sock, amqp_status=%d", ret);
+               LM_ERR("FAIL: open %s sock, amqp_status=%d",
+                               (amqp_info.ssl) ? "SSL" : "TCP", ret);
                // amqp_destroy_connection(*conn);
                return RABBITMQ_ERR_SOCK;
        }
diff --git a/src/modules/rabbitmq/rabbitmq.h b/src/modules/rabbitmq/rabbitmq.h
index 30f39ca7fd5..b37c84674f9 100644
--- a/src/modules/rabbitmq/rabbitmq.h
+++ b/src/modules/rabbitmq/rabbitmq.h
@@ -49,8 +49,10 @@ typedef enum
        RABBITMQ_ERR_CHANNEL,
        RABBITMQ_ERR_QUEUE,
        RABBITMQ_ERR_PUBLISH,
+       RABBITMQ_ERR_CREATE,
        RABBITMQ_ERR_SOCK,
        RABBITMQ_ERR_CONSUME,
+       RABBITMQ_ERR_SSL_CACERT,
        RABBITMQ_ERR_NULL,
 } RABBITMQ_ENUM;
 

_______________________________________________
Kamailio (SER) - Development Mailing List
To unsubscribe send an email to [email protected]

Reply via email to