I'm trying to create a fairly low overhead repeater process which will
accept N connections (where N is going to be relatively small, in the
area of 5-12) connections from other processes and which will forward
incoming packets from one connection to all of the others.

 conn1 --'hello'--> repeater --'hello'--> conn{2,3,..N}

Copy elimination FTW, but presumably there will be at least one copy
from the source buffers to the output buffers for each subsequent
connection.

Right now, I have the following, but I'm wondering if there's any huge
"d'oh" in it that I could reduce the amount of copying; not sure if
there's some way to point /multiple/ out buffers at a single in buffer
(well, without chaos as they all randomly drain it).

    static void _onEventBufferRead(struct bufferevent* bev, void* arg)
    {
        const int srcSock = (int)arg;
        struct evbuffer* const input = bufferevent_get_input(bev);
        ABORT_IF(input == nullptr);

        struct evbuffer_iovec ebiov;
        for ( ;; )
        {
            /// consume received data in contiguous blocks so we don't/
            /// expend cycles shaping them; incoming data is kinda/
            /// likely to be in ready-to-go fashion./
            const size_t contigBytes = evbuffer_get_contiguous_space(input);
            if ( contigBytes == 0 )
                break;

            /// get the internal details of the buffer./
            const int numBufs = evbuffer_peek(input, -1, nullptr, &ebiov, 1);
            if ( numBufs <= 0 || ebiov.iov_base == nullptr || ebiov.iov_len == 
0 )
                break;    /// although, this seems like some kind of fail 
condition./

            /// now transfer that buffer to each additional connected client./
            for ( auto it = clients.begin(); it != clients.end(); ++it )
            {
                auto destSock = it->first;
                if ( destSock == srcSock )
                    continue;    /// skip self./
                /// find their output buf and add this buffer to it./
                auto output = bufferevent_get_output(it->second);
                ABORT_IF(output == nullptr);
                evbuffer_add(output, ebiov.iov_base, ebiov.iov_len);
            }

            /// consume the received data./
            evbuffer_drain(input, ebiov.iov_len);
        }
    }

    static void _onEventBufferError(struct bufferevent* bev, short error, void* 
arg)
    {
        const int cliSock = (int)(arg);
        if (error & BEV_EVENT_EOF)
            printf("sock:%d disconnected.\n", cliSock);
        else if (error & BEV_EVENT_ERROR)
            printf("sock:%d error:%d. disconnecting.\n", cliSock, 
dbugGetErrno());
        else if (error & BEV_EVENT_TIMEOUT)
            printf("sock:%d timeout. disconnecting.\n", cliSock);
        else
            printf("sock:%d error %d.\n", cliSock, error);

        bufferevent_free(bev);
        clients.erase(cliSock);
    }

    static void _onAccept(evutil_socket_t listener, short event, void* arg)
    {
        auto base = static_cast<event_base*>(arg);
        struct sockaddr_in cliAddr;
        socklen_t caLen = sizeof(cliAddr);
        int cliSock = EAGAIN;
        while ( (cliSock == EAGAIN) | (cliSock == EINTR) )
        {
            cliSock = accept(listener, (struct sockaddr*)&cliAddr, &caLen);
        };

        if ( cliSock < 0 )
        {
            printf("accept error: %d\n", cliSock);
            return;
        }

        evutil_make_socket_nonblocking(cliSock);
        struct bufferevent* bev = bufferevent_socket_new(base, cliSock, 
BEV_OPT_CLOSE_ON_FREE);
        bufferevent_setcb(bev, _onEventBufferRead, nullptr, 
_onEventBufferError, (void*)cliSock);
        bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST);

        const bool socketAlreadyRegistered = (clients.find(cliSock) != 
clients.end());
        ABORT_IF( socketAlreadyRegistered );

        clients[cliSock] = bev;

        printf("sock:%d connected from %s:%u\n", cliSock, 
teulGetAddress(ntohl(cliAddr.sin_addr.s_addr)), ntohs(cliAddr.sin_port));
    }

Reply via email to