Here's a revised version that should be pretty close to done, except for needing libasyncns and less logging. Blocking on getaddrinfo doesn't concern me too much for the initial release, given that primary use of this should be with localhost IPs.
If this gets a thumbs-up, I'll send in a proper patch set. The full sabridge branch is here [1], including the sd-event.c fix for EEXIST. [1] https://github.com/davidstrauss/systemd/tree/sabridge
/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ /*** This file is part of systemd. Copyright 2013 David Strauss systemd is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. systemd is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with systemd; If not, see <http://www.gnu.org/licenses/>. ***/ #include <errno.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <netdb.h> #include <sys/fcntl.h> #include <sys/socket.h> #include <sys/un.h> #include <unistd.h> #include "log.h" #include "util.h" #include "sd-daemon.h" #include "sd-event.h" #define BUFFER_SIZE 1024 #define _cleanup_freeaddrinfo_ _cleanup_(freeaddrinfop) unsigned int total_clients = 0; struct proxy { int listen_fd; bool remote_is_inet; const char *remote_host; const char *remote_service; }; struct connection { int fd; char buffer[BUFFER_SIZE]; ssize_t buffer_len; sd_event_source *w_recv; sd_event_source *w_send; struct connection *c_destination; }; static inline void freeaddrinfop(struct addrinfo **ai) { if (*ai) freeaddrinfo(*ai); } static void free_connection(struct connection *c) { sd_event_source_unref(c->w_recv); sd_event_source_unref(c->w_send); close(c->fd); free(c); } static int transfer_data_cb(sd_event_source *s, int fd, uint32_t revents, void *userdata) { struct connection *c = (struct connection *) userdata; int r = 0; assert(revents & (EPOLLIN | EPOLLOUT)); if (revents & EPOLLIN) { log_info("About to recv up to %u bytes from fd=%d.", BUFFER_SIZE, fd); assert(s == c->w_recv); c->buffer_len = recv(fd, &c->buffer, BUFFER_SIZE, 0); if (c->buffer_len == 0) { log_info("Clean disconnection."); free_connection(c->c_destination); free_connection(c); goto finish; } else if (c->buffer_len == -1) { log_error("Error %d in recv from fd=%d: %s", errno, fd, strerror(errno)); r = -1; goto finish; } /* Try sending the data immediately. */ r = send(c->c_destination->fd, &c->buffer, c->buffer_len, 0); if (r < 0 && errno != EWOULDBLOCK && errno != EAGAIN) { log_error("Error %d in send to fd=%d: %s", errno, fd, strerror(errno)); goto finish; } else if (errno == EWOULDBLOCK || errno == EAGAIN) { /* Switch to listening for a sendable state in destination. */ r = sd_event_source_set_mute(c->w_recv, SD_EVENT_MUTED); if (r < 0) { log_error("Error %d muting recv for fd=%d: %s", r, fd, strerror(r)); goto finish; } r = sd_event_source_set_mute(c->c_destination->w_send, SD_EVENT_UNMUTED); if (r < 0) { log_error("Error %d unmuting send for fd=%d: %s", r, c->c_destination->fd, strerror(-r)); goto finish; } log_info("Done with recv for fd=%d. Waiting on send for fd=%d.", fd, c->c_destination->fd); goto finish; } log_info("Done with recv for fd=%d and send for fd=%d.", fd, c->c_destination->fd); } else { log_info("About to send up to %u bytes to fd=%d.", BUFFER_SIZE, fd); assert(s == c->w_send); assert(c->buffer != NULL); r = send(fd, &c->buffer, c->buffer_len, 0); if (r < 0) { log_error("Error %d in send to fd=%d: %s", errno, fd, strerror(errno)); goto finish; } /* Switch to listening for a receivable state in destination. */ r = sd_event_source_set_mute(c->w_send, SD_EVENT_MUTED); if (r < 0) { log_error("Error %d muting send for fd=%d: %s", r, fd, strerror(-r)); goto finish; } r = sd_event_source_set_mute(c->c_destination->w_recv, SD_EVENT_UNMUTED); if (r < 0) { log_error("Error %d unmuting recv for fd=%d: %s", r, c->c_destination->fd, strerror(-r)); goto finish; } log_info("Done with send for fd=%d. Waiting on recv for fd=%d.", fd, c->c_destination->fd); } finish: return r; } /* Once sending to the server is unblocked, set up the real watchers. */ static int connected_to_server_cb(sd_event_source *s, int fd, uint32_t revents, void *userdata) { struct sd_event *e = NULL; struct connection *c_server_to_client = (struct connection *) userdata; struct connection *c_client_to_server = c_server_to_client->c_destination; int r; assert(revents & EPOLLOUT); e = sd_event_get(s); /* Cancel the initial write watcher for the server. */ sd_event_source_unref(s); log_info("Connected to server. Initializing watchers for receiving data."); /* A muted send watcher for the server. */ r = sd_event_add_io(e, c_server_to_client->fd, EPOLLOUT, transfer_data_cb, c_server_to_client, &c_server_to_client->w_send); if (r < 0) { log_error("Error %d creating send watcher for fd=%d: %s", r, c_server_to_client->fd, strerror(-r)); goto fail; } r = sd_event_source_set_mute(c_server_to_client->w_send, SD_EVENT_MUTED); if (r < 0) { log_error("Error %d muting send for fd=%d: %s", r, c_server_to_client->fd, strerror(-r)); goto finish; } /* A recv watcher for the server. */ r = sd_event_add_io(e, c_server_to_client->fd, EPOLLIN, transfer_data_cb, c_server_to_client, &c_server_to_client->w_recv); if (r < 0) { log_error("Error %d creating recv watcher for fd=%d: %s", r, c_server_to_client->fd, strerror(-r)); goto fail; } /* A muted send watcher for the client. */ r = sd_event_add_io(e, c_client_to_server->fd, EPOLLOUT, transfer_data_cb, c_client_to_server, &c_client_to_server->w_send); if (r < 0) { log_error("Error %d creating send watcher for fd=%d: %s", r, c_client_to_server->fd, strerror(-r)); goto fail; } r = sd_event_source_set_mute(c_client_to_server->w_send, SD_EVENT_MUTED); if (r < 0) { log_error("Error %d muting send for fd=%d: %s", r, c_client_to_server->fd, strerror(-r)); goto finish; } /* A recv watcher for the client. */ r = sd_event_add_io(e, c_client_to_server->fd, EPOLLIN, transfer_data_cb, c_client_to_server, &c_client_to_server->w_recv); if (r < 0) { log_error("Error %d creating recv watcher for fd=%d: %s", r, c_client_to_server->fd, strerror(-r)); goto fail; } goto finish; fail: free_connection(c_client_to_server); free_connection(c_server_to_client); finish: return r; } static int get_server_connection_fd(const struct proxy *proxy) { int server_fd; int r = -EBADF; int len; if (proxy->remote_is_inet) { struct addrinfo hints = {}; _cleanup_freeaddrinfo_ struct addrinfo *result = NULL; int s; //memset(&hints, 0, sizeof(struct addrinfo)); hints.ai_family = AF_UNSPEC; /* IPv4 or IPv6 */ hints.ai_socktype = SOCK_STREAM; /* TCP */ hints.ai_flags = AI_PASSIVE; /* Any IP address */ //log_error("Looking up address info for %s:%s", proxy->remote_host, proxy->remote_service); s = getaddrinfo(proxy->remote_host, proxy->remote_service, &hints, &result); if (s != 0) { log_error("getaddrinfo error (%d): %s", s, gai_strerror(s)); goto finish; } if (result == NULL) { log_error("getaddrinfo: no result"); goto finish; } /* @TODO: Try connecting to all results instead of just the first. */ server_fd = socket(result->ai_family, result->ai_socktype | SOCK_NONBLOCK, result->ai_protocol); if (server_fd < 0) { log_error("Error %d creating socket: %s", errno, strerror(errno)); goto finish; } r = connect(server_fd, result->ai_addr, result->ai_addrlen); /* Ignore EINPROGRESS errors because they're expected for a non-blocking socket. */ if (r < 0 && errno != EINPROGRESS) { log_error("Error %d while connecting to socket %s:%s: %s", errno, proxy->remote_host, proxy->remote_service, strerror(errno)); goto finish; } } else { struct sockaddr_un remote; server_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0); if (server_fd < 0) { log_error("Error %d creating socket: %s", errno, strerror(errno)); r = -EBADFD; goto finish; } remote.sun_family = AF_UNIX; strncpy(remote.sun_path, proxy->remote_host, sizeof(remote.sun_path)); len = strlen(remote.sun_path) + sizeof(remote.sun_family); r = connect(server_fd, (struct sockaddr *) &remote, len); if (r < 0 && errno != EINPROGRESS) { log_error("Error %d while connecting to Unix domain socket %s: %s", errno, proxy->remote_host, strerror(errno)); r = -EBADFD; goto finish; } } log_info("Server connection is fd=%d", server_fd); r = server_fd; finish: return r; } static int accept_cb(sd_event_source *s, int fd, uint32_t revents, void *userdata) { struct proxy *proxy = (struct proxy *) userdata; struct connection *c_server_to_client = malloc(sizeof(struct connection)); struct connection *c_client_to_server = malloc(sizeof(struct connection)); int r = 0; /* @TODO: Remove assumption of IPv4. */ struct sockaddr_in client_addr; socklen_t client_len = sizeof(client_addr); assert(revents & EPOLLIN); c_server_to_client->fd = get_server_connection_fd(proxy); if (c_server_to_client->fd < 0) { log_error("Error initiating server connection."); goto fail; } c_client_to_server->fd = accept(fd, (struct sockaddr *) &client_addr, &client_len); if (c_client_to_server->fd < 0) { log_error("Error accepting client connection."); goto fail; } log_info("Client connection accepted with fd=%d", c_client_to_server->fd); total_clients++; log_info("Client successfully connected. Total clients: %u", total_clients); /* Initialize watcher for send to server; this shows connectivity. */ r = sd_event_add_io(sd_event_get(s), c_server_to_client->fd, EPOLLOUT, connected_to_server_cb, c_server_to_client, &c_server_to_client->w_send); if (r < 0) { log_error("Error %d creating connectivity watcher for fd=%d: %s", r, c_server_to_client->fd, strerror(r)); goto fail; } /* Allow lookups of the opposite connection. */ c_server_to_client->c_destination = c_client_to_server; c_client_to_server->c_destination = c_server_to_client; goto finish; fail: log_warning("Accepting a client connection or connecting to the server failed."); free_connection(c_client_to_server); free_connection(c_server_to_client); finish: /* Preserve the main loop even if a single proxy setup fails. */ return 0; } static int run_main_loop(struct proxy *proxy) { int r = EXIT_SUCCESS; struct sd_event *e = NULL; sd_event_source *w_accept = NULL; r = sd_event_new(&e); if (r < 0) goto finish; r = fd_nonblock(proxy->listen_fd, true); if (r < 0) goto finish; log_info("Initializing main listener fd=%d", proxy->listen_fd); sd_event_add_io(e, proxy->listen_fd, EPOLLIN, accept_cb, proxy, &w_accept); log_info("Initialized main listener. Entering loop."); sd_event_loop(e); finish: sd_event_source_unref(w_accept); sd_event_unref(e); return r; } int main(int argc, char *argv[]) { struct proxy proxy; int n, r; log_info("Starting up."); if (argc != 3) { fprintf(stderr, "usage: %s hostname service-or-port\n", argv[0]); return EXIT_FAILURE; } proxy.listen_fd = SD_LISTEN_FDS_START; proxy.remote_host = argv[1]; proxy.remote_service = argv[2]; proxy.remote_is_inet = true; /* @TODO: Support Unix domain sockets. */ assert(proxy.remote_host); assert(proxy.remote_service); n = sd_listen_fds(1); if (n < 0) { log_error("Failed to determine passed sockets: %s", strerror(-n)); return EXIT_FAILURE; } else if (n > 1) { log_error("Can't listen on more than one socket."); return EXIT_FAILURE; } log_info("Initializing main loop."); r = run_main_loop(&proxy); if (r < 0) { log_error("Error %d from main loop.", r); return 1; } return 0; }
_______________________________________________ systemd-devel mailing list systemd-devel@lists.freedesktop.org http://lists.freedesktop.org/mailman/listinfo/systemd-devel