I'm trying to create a fairly low overhead repeater process which will accept N connections (where N is going to be relatively small, in the area of 5-12) connections from other processes and which will forward incoming packets from one connection to all of the others.
conn1 --'hello'--> repeater --'hello'--> conn{2,3,..N} Copy elimination FTW, but presumably there will be at least one copy from the source buffers to the output buffers for each subsequent connection. Right now, I have the following, but I'm wondering if there's any huge "d'oh" in it that I could reduce the amount of copying; not sure if there's some way to point /multiple/ out buffers at a single in buffer (well, without chaos as they all randomly drain it). static void _onEventBufferRead(struct bufferevent* bev, void* arg) { const int srcSock = (int)arg; struct evbuffer* const input = bufferevent_get_input(bev); ABORT_IF(input == nullptr); struct evbuffer_iovec ebiov; for ( ;; ) { /// consume received data in contiguous blocks so we don't/ /// expend cycles shaping them; incoming data is kinda/ /// likely to be in ready-to-go fashion./ const size_t contigBytes = evbuffer_get_contiguous_space(input); if ( contigBytes == 0 ) break; /// get the internal details of the buffer./ const int numBufs = evbuffer_peek(input, -1, nullptr, &ebiov, 1); if ( numBufs <= 0 || ebiov.iov_base == nullptr || ebiov.iov_len == 0 ) break; /// although, this seems like some kind of fail condition./ /// now transfer that buffer to each additional connected client./ for ( auto it = clients.begin(); it != clients.end(); ++it ) { auto destSock = it->first; if ( destSock == srcSock ) continue; /// skip self./ /// find their output buf and add this buffer to it./ auto output = bufferevent_get_output(it->second); ABORT_IF(output == nullptr); evbuffer_add(output, ebiov.iov_base, ebiov.iov_len); } /// consume the received data./ evbuffer_drain(input, ebiov.iov_len); } } static void _onEventBufferError(struct bufferevent* bev, short error, void* arg) { const int cliSock = (int)(arg); if (error & BEV_EVENT_EOF) printf("sock:%d disconnected.\n", cliSock); else if (error & BEV_EVENT_ERROR) printf("sock:%d error:%d. disconnecting.\n", cliSock, dbugGetErrno()); else if (error & BEV_EVENT_TIMEOUT) printf("sock:%d timeout. disconnecting.\n", cliSock); else printf("sock:%d error %d.\n", cliSock, error); bufferevent_free(bev); clients.erase(cliSock); } static void _onAccept(evutil_socket_t listener, short event, void* arg) { auto base = static_cast<event_base*>(arg); struct sockaddr_in cliAddr; socklen_t caLen = sizeof(cliAddr); int cliSock = EAGAIN; while ( (cliSock == EAGAIN) | (cliSock == EINTR) ) { cliSock = accept(listener, (struct sockaddr*)&cliAddr, &caLen); }; if ( cliSock < 0 ) { printf("accept error: %d\n", cliSock); return; } evutil_make_socket_nonblocking(cliSock); struct bufferevent* bev = bufferevent_socket_new(base, cliSock, BEV_OPT_CLOSE_ON_FREE); bufferevent_setcb(bev, _onEventBufferRead, nullptr, _onEventBufferError, (void*)cliSock); bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST); const bool socketAlreadyRegistered = (clients.find(cliSock) != clients.end()); ABORT_IF( socketAlreadyRegistered ); clients[cliSock] = bev; printf("sock:%d connected from %s:%u\n", cliSock, teulGetAddress(ntohl(cliAddr.sin_addr.s_addr)), ntohs(cliAddr.sin_port)); }