Hello, My colleague has written partial integration of libwebsocket client support into Proton. I've attached the source in this message. The implementation is currently a child class of messaging_handler, but we'd like to fully integrate it into Proton and contribute it to the project if possible.
Before we get too deep into this, we were hoping to solicit advice. Even better if anyone more familiar with connection driver could help out with completing this that would be fantastic! Thanks and Regards, Al Straub SW Dev, NetApp
// // Copyright (C) 2020 by NetApp Inc. All Rights Reserved. // #include <stdio.h> #include <stdlib.h> #include <iostream> #include <vector> #include <map> #include <condition_variable> #include <mutex> #include <atomic> #include <proton/connection.hpp> #include <proton/container.hpp> #include <proton/message.hpp> #include <proton/delivery.hpp> #include <proton/connection_options.hpp> #include <proton/error_condition.hpp> #include <proton/message_id.hpp> #include <proton/messaging_handler.hpp> #include <proton/sender.hpp> #include <proton/io/connection_driver.hpp> #include <libwebsockets/libwebsockets.h> #include "ws_amqp_client.h" #include "ws_amqp_client_private.h" ws_amqp_handler::ws_amqp_handler(const std::string & url) { pimpl_ = new ws_amqp_handler_impl(url, this); } ws_amqp_handler::~ws_amqp_handler() { delete pimpl_; } void ws_amqp_handler::disconnect() { pimpl_->disconnect(); } void ws_amqp_handler::on_container_start(proton::container& container) { pimpl_->set_container_id(container.id()); // connect with ws and let the connection driver handle the communication if (pimpl_->ws_connect()) { lwsl_err("ws_amqp_handler::on_container_start failed to connect\n"); } // since the connection driver concluded the session, stop the container container.stop(); } void ws_amqp_handler::set_connection_options(const proton::connection_options& connopts) { pimpl_->set_connection_options(connopts); } void ws_amqp_handler::add_work(const std::function<void()> & work) { pimpl_->add_work(work); } ws_amqp_handler_impl::ws_amqp_handler_impl(const std::string & url, ws_amqp_handler * base_handler): base_handler_(base_handler), lws_context_(0), connection_(NULL), state_(connecting) { // expect a url of the form host:port/topic where topic is ignored port_ = 80; // TODO 443 for amqpwss std::size_t pos_of_col = url.find(":"); std::size_t pos_of_sl = url.find("/"); if (pos_of_col == std::string::npos) { host_ = url; } else if (pos_of_sl == std::string::npos) { host_ = url.substr(0, pos_of_col); port_ = std::stoi(url.substr(pos_of_col+1)); } else { host_ = url.substr(0, pos_of_col); port_ = std::stoi(url.substr(pos_of_col+1, pos_of_sl)); } }; ws_amqp_handler_impl::~ws_amqp_handler_impl() { std::lock_guard<std::mutex> l(s_impls_lock_); std::map<struct lws_context *, ws_amqp_handler_impl *>::iterator itr = s_impls_.find(lws_context_); if (itr != s_impls_.end()) { s_impls_.erase(itr); } } void ws_amqp_handler_impl::set_container_id(const std::string & id) { container_id_ = id; } void ws_amqp_handler_impl::set_connection_options(const proton::connection_options& connopts) { connopts_ = connopts; } void ws_amqp_handler_impl::add_work(const std::function<void()> & work) { { std::lock_guard<std::mutex> l(work_items_lock_); work_items_.push_back(work); } // trigger an lws event lws_cancel_service(lws_context_); } int ws_amqp_handler_impl::ws_connect() { if (state_ == connected) { return 0; } return service_ws(); } void ws_amqp_handler_impl::disconnect() { if (state_ == connected) { // connection object has been created by the driver. // use the connection object to close. connection_->driver->connection().close(); } } void ws_amqp_handler_impl::change_state(handler_state_t new_state) { state_ = new_state; } std::map<struct lws_context *, ws_amqp_handler_impl *> ws_amqp_handler_impl::s_impls_; std::mutex ws_amqp_handler_impl::s_impls_lock_; int ws_amqp_handler_impl::s_callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { ws_amqp_handler_impl *impl = nullptr; { std::lock_guard<std::mutex> l(s_impls_lock_); std::map<struct lws_context *, ws_amqp_handler_impl *>::iterator itr = s_impls_.find(lws_get_context(wsi)); if (itr != s_impls_.end()) { impl = itr->second; } } if (impl) { return impl->callback_amqpws(wsi, reason, user, in, len); } return lws_callback_http_dummy(wsi, reason, user, in, len); } int ws_amqp_handler_impl::callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { struct per_vhost_data__minimal *vhd = (struct per_vhost_data__minimal *) lws_protocol_vh_priv_get(lws_get_vhost(wsi), lws_get_protocol(wsi)); switch (reason) { case LWS_CALLBACK_PROTOCOL_INIT: { vhd = (struct per_vhost_data__minimal *) lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), lws_get_protocol(wsi), sizeof(struct per_vhost_data__minimal)); vhd->context = lws_get_context(wsi); vhd->protocol = lws_get_protocol(wsi); vhd->vhost = lws_get_vhost(wsi); if (connect_client(vhd)) lws_timed_callback_vh_protocol(vhd->vhost, vhd->protocol, LWS_CALLBACK_USER, 1); break; } case LWS_CALLBACK_PROTOCOL_DESTROY: { change_state(disconnecting); return 0; } // TODO This callback should be in protocol 0, test separately case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: { lwsl_err("CLIENT_CONNECTION_ERROR: %s\n", in ? (char *)in : "(null)"); vhd->client_wsi = NULL; lws_timed_callback_vh_protocol(vhd->vhost, vhd->protocol, LWS_CALLBACK_USER, 1); change_state(connect_failed); break; } case LWS_CALLBACK_CLIENT_ESTABLISHED: { // Upgrade HTTP connection to AMQPWS connection_ = (ws_amqp_connection_t *)user; memset(connection_, 0, sizeof(ws_amqp_connection_t)); connection_->wsi = vhd->client_wsi; // create connection driver connection_->driver = new proton::io::connection_driver(container_id_); connopts_.handler(*base_handler_); connection_->driver->connect(connopts_); change_state(connected); return handle_events(); } case LWS_CALLBACK_CLIENT_WRITEABLE: { if (handle_events() == -1) { return -1; } proton::io::const_buffer dbuf = connection_->driver->write_buffer(); if (dbuf.size) { /* lws_write() demands LWS_PRE bytes of free space before the data, * so we must copy from the driver's buffer to larger temporary wbuf */ buffer_set_size(&connection_->wbuf, LWS_PRE + dbuf.size); if (connection_->wbuf.start == NULL) { return unexpected_close(connection_->wsi, "out-of-memory"); } unsigned char* buf = (unsigned char*)connection_->wbuf.start + LWS_PRE; memcpy(buf, dbuf.data, dbuf.size); ssize_t wrote = lws_write(wsi, buf, dbuf.size, LWS_WRITE_BINARY); if (wrote < 0) { connection_->driver->write_close(); return unexpected_close(connection_->wsi, "write-error"); } else { connection_->driver->write_done(wrote); } } return handle_events(); } case LWS_CALLBACK_CLIENT_RECEIVE: { while (len > 0) { if (handle_events() == -1) { return unexpected_close(connection_->wsi, "handle_events"); } proton::io::mutable_buffer dbuf = connection_->driver->read_buffer(); if (dbuf.size == 0) { return unexpected_close(connection_->wsi, "unexpected-data"); } size_t copy = (len < dbuf.size) ? len : dbuf.size; memcpy(dbuf.data, in, copy); connection_->driver->read_done(copy); len -= copy; in = (char*)in + copy; } return handle_events(); } case LWS_CALLBACK_EVENT_WAIT_CANCELLED: { run_all_work(); if (state_ == connected) { lws_callback_on_writable(connection_->wsi); } break; } case LWS_CALLBACK_CLIENT_CLOSED: { vhd->client_wsi = NULL; lws_timed_callback_vh_protocol(vhd->vhost, vhd->protocol, LWS_CALLBACK_USER, 1); connection_->driver->disconnected(proton::error_condition("terminate")); handle_events(); delete connection_->driver; connection_->driver = NULL; free(connection_->wbuf.start); return -1; } case LWS_CALLBACK_WSI_DESTROY: case LWS_CALLBACK_USER: { change_state(disconnecting); return -1; } default: { break; } } return lws_callback_http_dummy(wsi, reason, user, in, len); } int ws_amqp_handler_impl::unexpected_close(struct lws *wsi, const char *msg) { lws_close_reason(wsi, LWS_CLOSE_STATUS_UNEXPECTED_CONDITION, (unsigned char*)msg, strlen(msg)); return -1; } int ws_amqp_handler_impl::service_ws() { struct lws_context_creation_info info; int n = 0; struct lws_protocols protocols[] = { // http-only protocol comes first { "http-only", lws_callback_http_dummy, 0, }, /* "amqp" is the official oasis AMQP over WebSocket protocol name */ { "amqp", s_callback_amqpws, sizeof(ws_amqp_connection_t), }, { NULL, NULL, 0, 0 } }; memset(&info, 0, sizeof info); /* otherwise uninitialized garbage */ info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */ info.protocols = protocols; /* * since we know this lws context is only ever going to be used with * one client wsis / fds / sockets at a time, let lws know it doesn't * have to use the default allocations for fd tables up to ulimit -n. * It will just allocate for 1 internal and 1 (+ 1 http2 nwsi) that we * will use. */ info.fd_limit_per_thread = 1 + 1 + 1; lws_context_ = lws_create_context(&info); if (!lws_context_) { lwsl_err("lws init failed\n"); return -1; } s_impls_.insert(std::make_pair(lws_context_, this)); while (n >= 0) { if (state_ == disconnecting || state_ == connect_failed) { break; } n = lws_service(lws_context_, 0); } lws_context_destroy(lws_context_); change_state(disconnected); return 0; } int ws_amqp_handler_impl::connect_client(struct per_vhost_data__minimal *vhd) { vhd->i.context = lws_context_; vhd->i.port = port_; vhd->i.address = host_.c_str(); vhd->i.path = "/"; vhd->i.host = vhd->i.address; vhd->i.origin = vhd->i.address; vhd->i.ssl_connection = 0; vhd->i.protocol = "amqp"; vhd->client_wsi = lws_client_connect_via_info(&vhd->i); if (!vhd->client_wsi) { lwsl_err("connect_client failed\n"); } return !vhd->client_wsi; } int ws_amqp_handler_impl::handle_events() { if (!connection_->driver->dispatch()) { lws_close_reason(connection_->wsi, LWS_CLOSE_STATUS_NORMAL, NULL, 0); return -1; } if (connection_->driver->write_buffer().size) { lws_callback_on_writable(connection_->wsi); } return 0; } void ws_amqp_handler_impl::run_all_work() { std::vector<std::function<void()> > works; { std::lock_guard<std::mutex> l(work_items_lock_); if (work_items_.empty()) { return; } std::swap(works, work_items_); } for (const auto & work : works) try { // note: all exceptions are ignored work(); } catch (const std::exception& e) { lwsl_err("Exception occurred while executing work item: %s\n", e.what()); }; } /* Ensure size bytes in buffer, make buf empty if alloc fails */ void ws_amqp_handler_impl::buffer_set_size(buffer_t *buf, size_t size) { if (size > buf->cap) { buf->cap = (size > buf->cap * 2) ? size : buf->cap * 2; buf->start = (char *)realloc(buf->start, buf->cap); } if (buf->start) { buf->size = size; } else { buf->size = buf->cap = 0; } }
// // Copyright (C) 2020 by NetApp Inc. All Rights Reserved. // #pragma once #include <string> #include <functional> #include <proton/messaging_handler.hpp> class ws_amqp_handler_impl; class ws_amqp_handler : public proton::messaging_handler { public: ws_amqp_handler(const std::string & url); virtual ~ws_amqp_handler(); void on_container_start(proton::container& container) override; void set_connection_options(const proton::connection_options& connopt); void disconnect(); void add_work(const std::function<void()> & work); // TODO send, receive from another thread if needed // TODO on_transport_error and other on_error for logging private: ws_amqp_handler_impl *pimpl_; friend class WS_AMQP_Handler_TestSuite; };
class ws_amqp_handler_impl { public: ws_amqp_handler_impl(const std::string & url, ws_amqp_handler * base_handler); virtual ~ws_amqp_handler_impl(); void set_container_id(const std::string & id); void set_connection_options(const proton::connection_options& connopt); void add_work(const std::function<void()> & work); int ws_connect(); void disconnect(); private: /* Intermediate write buffer: LWS needs extra header space on write. */ typedef struct buffer_t { char *start; size_t size, cap; } buffer_t; /* AMQPWS connection: set as lws user data and qd_conn->context */ typedef struct ws_amqp_connection_t { proton::io::connection_driver *driver; buffer_t wbuf; /* LWS requires allocated header space at start of buffer */ struct lws *wsi; } ws_amqp_connection_t; struct per_vhost_data__minimal { struct lws_context *context; struct lws_vhost *vhost; const struct lws_protocols *protocol; struct lws_client_connect_info i; struct lws *client_wsi; }; typedef enum { connecting, connected, connect_failed, disconnecting, disconnected } handler_state_t; std::string host_; int port_; std::string container_id_; proton::connection_options connopts_; ws_amqp_handler *base_handler_; struct lws_context *lws_context_; ws_amqp_connection_t *connection_; std::atomic<handler_state_t> state_; std::vector<std::function<void()> > work_items_; std::mutex work_items_lock_; static std::map<struct lws_context *, ws_amqp_handler_impl *> s_impls_; static std::mutex s_impls_lock_; int connect_client(struct per_vhost_data__minimal *vhd); int handle_events(); int callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); void change_state(handler_state_t new_state); int service_ws(); void run_all_work(); static inline int unexpected_close(struct lws *wsi, const char *msg); static void buffer_set_size(buffer_t *buf, size_t size); static int s_callback_amqpws(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len); friend class WS_AMQP_Handler_TestSuite; };
--------------------------------------------------------------------- To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org For additional commands, e-mail: users-h...@qpid.apache.org