I've attached the initial implementation -- not yet ready to merge --
for an event-oriented socket activation bridge. It performs well under
load. I haven't tied up all potential leaks yet, but the normal
execution paths seem to be clean. I also need to use proper shell
option management.

The bridge adds about 0.569ms to an average request, which is the same
overhead I see from a normal, local-network Ethernet hop.

This is with it wrapping nginx using Fedora's default nginx
configuration and default homepage:

[straussd@olympian straussd]# ab -n1000 -c10 http://localhost:8080/
This is ApacheBench, Version 2.3 <$Revision: 1430300 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Completed 100 requests
Completed 200 requests
Completed 300 requests
Completed 400 requests
Completed 500 requests
Completed 600 requests
Completed 700 requests
Completed 800 requests
Completed 900 requests
Completed 1000 requests
Finished 1000 requests


Server Software:        nginx/1.4.2
Server Hostname:        localhost
Server Port:            8080

Document Path:          /
Document Length:        3700 bytes

Concurrency Level:      10
Time taken for tests:   0.192 seconds
Complete requests:      1000
Failed requests:        0
Write errors:           0
Total transferred:      3933000 bytes
HTML transferred:       3700000 bytes
Requests per second:    5214.39 [#/sec] (mean)
Time per request:       1.918 [ms] (mean)
Time per request:       0.192 [ms] (mean, across all concurrent requests)
Transfer rate:          20027.53 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.1      0       2
Processing:     0    2   0.4      2       4
Waiting:        0    1   0.3      1       3
Total:          1    2   0.4      2       4

Percentage of the requests served within a certain time (ms)
  50%      2
  66%      2
  75%      2
  80%      2
  90%      2
  95%      2
  98%      3
  99%      3
 100%      4 (longest request)

For comparison, here is direct nginx over TCP:

[straussd@olympian straussd]# ab -n1000 -c10 http://localhost:80/
This is ApacheBench, Version 2.3 <$Revision: 1430300 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Completed 100 requests
Completed 200 requests
Completed 300 requests
Completed 400 requests
Completed 500 requests
Completed 600 requests
Completed 700 requests
Completed 800 requests
Completed 900 requests
Completed 1000 requests
Finished 1000 requests


Server Software:        nginx/1.4.2
Server Hostname:        localhost
Server Port:            80

Document Path:          /
Document Length:        3700 bytes

Concurrency Level:      10
Time taken for tests:   0.135 seconds
Complete requests:      1000
Failed requests:        0
Write errors:           0
Total transferred:      3933000 bytes
HTML transferred:       3700000 bytes
Requests per second:    7411.52 [#/sec] (mean)
Time per request:       1.349 [ms] (mean)
Time per request:       0.135 [ms] (mean, across all concurrent requests)
Transfer rate:          28466.34 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    1   0.1      1       1
Processing:     0    1   0.2      1       2
Waiting:        0    1   0.2      1       2
Total:          1    1   0.2      1       3

Percentage of the requests served within a certain time (ms)
  50%      1
  66%      1
  75%      1
  80%      1
  90%      2
  95%      2
  98%      2
  99%      2
 100%      3 (longest request)
/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/

/***
  This file is part of systemd.

  Copyright 2013 David Strauss <da...@davidstrauss.net>

  systemd is free software; you can redistribute it and/or modify it
  under the terms of the GNU General Public License as published by
  the Free Software Foundation; either version 2.1 of the License, or
  (at your option) any later version.

  systemd is distributed in the hope that it will be useful, but
  WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  Lesser General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with systemd; If not, see <http://www.gnu.org/licenses/>.
 ***/

#define __STDC_FORMAT_MACROS
#include <errno.h>
#include <ev.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <netdb.h>
#include <sys/fcntl.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>

#include "log.h"
#include "sd-daemon.h"

#define BUFFER_SIZE 1024

unsigned int total_clients = 0;

struct proxy_t {
    int listen_fd;
    bool remote_is_inet;
    const char *remote_host;
    const char *remote_service;
};

struct connection_t {
    int origin_fd;
    int destination_fd;
    ev_io *w_destination;
    struct connection_t *c_destination;
};

static void transfer_data_cb(EV_P_ ev_io *watcher, int revents) {
    struct connection_t *connection = (struct connection_t *) watcher->data;

    char *buffer = malloc(BUFFER_SIZE);
    ssize_t buffer_len;

    assert(revents & EV_READ);
    assert(watcher->fd == connection->origin_fd);

    //log_info("About to transfer up to %u bytes from %d to %d.", BUFFER_SIZE, connection->origin_fd, connection->destination_fd);

    buffer_len = recv(connection->origin_fd, buffer, BUFFER_SIZE, 0);
    if (buffer_len == 0) {
        //log_info("Clean disconnection.");
        ev_io_stop(EV_A_ connection->w_destination);
        ev_io_stop(EV_A_ watcher);
        close(connection->origin_fd);
        close(connection->destination_fd);
        free(connection->w_destination);
        free(connection->c_destination);
        free(connection);
        free(watcher);
        goto finish;
    }
    else if (buffer_len == -1) {
        log_error("Error %d in recv from fd=%d: %s", errno, connection->origin_fd, strerror(errno));
        exit(EXIT_FAILURE);
    }

    if (send(connection->destination_fd, buffer, buffer_len, 0) < 0) {
        log_error("Error %d in send to fd=%d: %s", errno, connection->destination_fd, strerror(errno));
        exit(EXIT_FAILURE);
    }

finish:
    free(buffer);
}

static void connected_to_server_cb(EV_P_ ev_io *watcher, int revents) {
    struct connection_t *c_server_to_client = (struct connection_t *) watcher->data;

    //log_info("Connected to server. Initializing watchers for sending data.");

    // Start listening for data sent by the client.
    ev_io_init(c_server_to_client->w_destination, transfer_data_cb, c_server_to_client->destination_fd, EV_READ);
    ev_io_start(EV_A_ c_server_to_client->w_destination);

    // Cancel the write watcher for the server.
    ev_io_stop(EV_A_ watcher);

    // Start listening for data sent by the server.
    ev_io_init(watcher, transfer_data_cb, c_server_to_client->origin_fd, EV_READ);
    ev_io_start(EV_A_ watcher);
}


static int set_nonblock(int fd) {
    int flags;
    flags = fcntl(fd, F_GETFL);
    flags |= O_NONBLOCK;
    return fcntl(fd, F_SETFL, flags);
}

static int get_server_connection_fd(const struct proxy_t *proxy) {
    int server_fd;
    int len;

    if (proxy->remote_is_inet) {
        struct addrinfo hints;
        struct addrinfo *result;
        int s;

        memset(&hints, 0, sizeof(struct addrinfo));
        hints.ai_family = AF_UNSPEC; /* IPv4 or IPv6 */
        hints.ai_socktype = SOCK_STREAM;  /* TCP */
        hints.ai_flags = AI_PASSIVE; /* Any IP address */

        //log_error("Looking up address info for %s:%s", proxy->remote_host, proxy->remote_service);
        s = getaddrinfo(proxy->remote_host, proxy->remote_service, &hints, &result);
        if (s != 0) {
            log_error("getaddrinfo error (%d): %s", s, gai_strerror(s));
            exit(EXIT_FAILURE);
        }

        if (result == NULL) {
            log_error("getaddrinfo: no result");
            exit(EXIT_FAILURE);
        }

        // @TODO: Try connecting to all results instead of just the first.
        server_fd = socket(result->ai_family, result->ai_socktype, result->ai_protocol);

        if (-1 == set_nonblock(server_fd)) {
            log_error("Error setting socket to non-blocking.");
            exit(EXIT_FAILURE);
        }

        if (!connect(server_fd, result->ai_addr, result->ai_addrlen)) {
            log_error("Could not connect to socket: %s:%s", proxy->remote_host, proxy->remote_service);
            freeaddrinfo(result);
            exit(EXIT_FAILURE);
        }

        freeaddrinfo(result);
    }
    else {
        struct sockaddr_un remote;

        server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
        if (-1 == server_fd) {
            log_error("Error %d while initializing socket socket: %s", errno, strerror(errno));
            exit(EXIT_FAILURE);
        }

        if (-1 == set_nonblock(server_fd)) {
            log_error("Error setting socket to non-blocking.");
            exit(EXIT_FAILURE);
        }

        remote.sun_family = AF_UNIX;
        strncpy(remote.sun_path, proxy->remote_host, sizeof(remote.sun_path));
        len = strlen(remote.sun_path) + sizeof(remote.sun_family);
        if (-1 == connect(server_fd, (struct sockaddr *) &remote, len)) {
            log_error("Could not connect to Unix domain socket: %s", proxy->remote_host);
            exit(EXIT_FAILURE);
        }
    }

    //log_info("Server connection is fd=%d", server_fd);

    return server_fd;
}

static void accept_cb(EV_P_ ev_io *watcher, int revents) {
    struct proxy_t *proxy = (struct proxy_t *) watcher->data;
    struct connection_t *c_client_to_server = malloc(sizeof(struct connection_t));
    struct connection_t *c_server_to_client = malloc(sizeof(struct connection_t));
    int client_fd, server_fd;
    struct ev_io *w_server = malloc(sizeof(struct ev_io));
    struct ev_io *w_client = malloc(sizeof(struct ev_io));

    // @TODO: Remove assumption of IPv4.
    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);

    if (EV_ERROR & revents) {
      log_error("Invalid event.");
      goto fail;
    }

    server_fd = get_server_connection_fd(proxy);
    if (server_fd < 0) {
      log_error("Error initiating server connection.");
      goto fail;
    }

    client_fd = accept(watcher->fd, (struct sockaddr *) &client_addr, &client_len);
    if (client_fd < 0) {
      log_error("Error accepting client connection.");
      goto fail;
    }

    //log_info("Client connection accepted with fd=%d", client_fd);

    c_server_to_client->origin_fd = server_fd;
    c_server_to_client->destination_fd = client_fd;
    c_server_to_client->w_destination = w_client;
    c_server_to_client->c_destination = c_client_to_server;
    w_server->data = c_server_to_client;

    c_client_to_server->origin_fd = client_fd;
    c_client_to_server->destination_fd = server_fd;
    c_client_to_server->w_destination = w_server;
    c_client_to_server->c_destination = c_server_to_client;
    w_client->data = c_client_to_server;

    total_clients++;
    //log_info("Client successfully connected. Total clients: %u", total_clients);

    // Wait for the server socket to be writable before initializing
    // read events for the client socket.
    ev_io_init(w_server, connected_to_server_cb, server_fd, EV_WRITE);
    ev_io_start(EV_A_ w_server);
    goto finish;

fail:
    free(c_client_to_server);
    free(c_server_to_client);
    free(w_server);
    free(w_client);

finish:
    return;
}

static void run_main_loop(struct proxy_t *proxy) {
    struct ev_loop *loop = ev_default_loop(0);
    struct ev_io w_listen;
    w_listen.data = proxy;
    set_nonblock(proxy->listen_fd);
    ev_io_init(&w_listen, accept_cb, proxy->listen_fd, EV_READ);
    ev_io_start(loop, &w_listen);

    log_info("Initialized main listener fd=%d", proxy->listen_fd);

    ev_loop(loop, 0);
}

int main(int argc, char *argv[]) {
    struct proxy_t proxy;
    int n;

    if (argc != 3) {
        fprintf(stderr, "usage: %s hostname service-or-port\n", argv[0]);
        exit(1);
    }

    proxy.listen_fd = SD_LISTEN_FDS_START;
    proxy.remote_host = argv[1];
    proxy.remote_service = argv[2];
    proxy.remote_is_inet = true;  // @TODO: Support Unix domain sockets.

    assert(proxy.remote_host);
    assert(proxy.remote_service);

    n = sd_listen_fds(1);
    if (n < 0) {
        log_error("Failed to determine passed sockets: %s", strerror(-n));
        exit(EXIT_FAILURE);
    } else if (n > 1) {
        log_error("Can't listen on more than one socket.");
        exit(EXIT_FAILURE);
    }

    run_main_loop(&proxy);
    return 0;
}

Attachment: nginx.service
Description: Binary data

Attachment: nginx.socket
Description: Binary data

Attachment: sabridge.sh
Description: Bourne shell script

_______________________________________________
systemd-devel mailing list
systemd-devel@lists.freedesktop.org
http://lists.freedesktop.org/mailman/listinfo/systemd-devel

Reply via email to