On Fri, Nov 15, 2013 at 08:22:14PM -0800, Shawn Landden wrote: > v3: make each worker its own service > v4: be less intrusive Hi Shawn, unfortunately this doesn't apply cleanly. Can you rebase?
> diff --git a/man/systemd.socket.xml b/man/systemd.socket.xml > index 7c10c58..92a9275 100644 > --- a/man/systemd.socket.xml > +++ b/man/systemd.socket.xml > @@ -519,6 +519,15 @@ > </varlistentry> > > <varlistentry> > + <term><varname>Distribute=</varname></term> > + <listitem><para>Takes an integer > + value. If greater than one, systemd will > spawn > + given number of instances of service each > + listening to the same socket. This option > implies > + <varname>Reuseport=</varname> > above.</para></listitem> > + </varlistentry> > + > + <varlistentry> > <term><varname>SmackLabel=</varname></term> > > <term><varname>SmackLabelIPIn=</varname></term> > > <term><varname>SmackLabelIPOut=</varname></term> > diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c > index 60a8d05..4644007 100644 > --- a/src/core/dbus-socket.c > +++ b/src/core/dbus-socket.c > @@ -68,6 +68,7 @@ > " <property name=\"Listen\" type=\"a(ss)\" access=\"read\"/>\n" \ > " <property name=\"Result\" type=\"s\" access=\"read\"/>\n" \ > " <property name=\"ReusePort\" type=\"b\" access=\"read\"/>\n" \ > + " <property name=\"Distribute\" type=\"u\" access=\"read\"/>\n" \ > " <property name=\"SmackLabel\" type=\"s\" access=\"read\"/>\n" \ > " <property name=\"SmackLabelIPIn\" type=\"s\" access=\"read\"/>\n" > \ > " <property name=\"SmackLabelIPOut\" type=\"s\" > access=\"read\"/>\n" \ > @@ -196,6 +197,7 @@ static const BusProperty bus_socket_properties[] = { > { "MessageQueueMessageSize", bus_property_append_long, "x", > offsetof(Socket, mq_msgsize) }, > { "Result", bus_socket_append_socket_result, "s", > offsetof(Socket, result) }, > { "ReusePort", bus_property_append_bool, "b", > offsetof(Socket, reuseport) }, > + { "Distribute", bus_property_append_unsigned, "u", > offsetof(Socket, distribute) }, > { "SmackLabel", bus_property_append_string, "s", > offsetof(Socket, smack), true }, > { "SmackLabelIPIn", bus_property_append_string, "s", > offsetof(Socket, smack_ip_in), true }, > { "SmackLabelIPOut",bus_property_append_string, "s", > offsetof(Socket, smack_ip_out), true }, > diff --git a/src/core/load-fragment-gperf.gperf.m4 > b/src/core/load-fragment-gperf.gperf.m4 > index b64fdc9..4058a1f 100644 > --- a/src/core/load-fragment-gperf.gperf.m4 > +++ b/src/core/load-fragment-gperf.gperf.m4 > @@ -211,6 +211,7 @@ Socket.PassCredentials, config_parse_bool, > 0, > Socket.PassSecurity, config_parse_bool, 0, > offsetof(Socket, pass_sec) > Socket.TCPCongestion, config_parse_string, 0, > offsetof(Socket, tcp_congestion) > Socket.ReusePort, config_parse_bool, 0, > offsetof(Socket, reuseport) > +Socket.Distribute, config_parse_unsigned, 0, > offsetof(Socket, distribute) > Socket.MessageQueueMaxMessages, config_parse_long, 0, > offsetof(Socket, mq_maxmsg) > Socket.MessageQueueMessageSize, config_parse_long, 0, > offsetof(Socket, mq_msgsize) > Socket.Service, config_parse_socket_service, 0, > 0 > diff --git a/src/core/service.c b/src/core/service.c > index 3da32a1..8fc55a0 100644 > --- a/src/core/service.c > +++ b/src/core/service.c > @@ -3663,7 +3663,6 @@ static void service_bus_query_pid_done( > int service_set_socket_fd(Service *s, int fd, Socket *sock) { > > assert(s); > - assert(fd >= 0); > > /* This is called by the socket code when instantiating a new > * service for a stream socket and the socket needs to be > @@ -3678,8 +3677,10 @@ int service_set_socket_fd(Service *s, int fd, Socket > *sock) { > if (s->state != SERVICE_DEAD) > return -EAGAIN; > > - s->socket_fd = fd; > - s->got_socket_fd = true; > + if (fd >= 0) { > + s->socket_fd = fd; > + s->got_socket_fd = true; > + } > > unit_ref_set(&s->accept_socket, UNIT(sock)); > > diff --git a/src/core/service.h b/src/core/service.h > index 37fa6ff..2ffe7d1 100644 > --- a/src/core/service.h > +++ b/src/core/service.h > @@ -26,7 +26,6 @@ typedef struct Service Service; > #include "unit.h" > #include "path.h" > #include "ratelimit.h" > -#include "service.h" > #include "kill.h" > #include "exit-status.h" > > diff --git a/src/core/socket.c b/src/core/socket.c > index 751f20b..11b649b 100644 > --- a/src/core/socket.c > +++ b/src/core/socket.c > @@ -153,34 +153,30 @@ static void socket_done(Unit *u) { > } > > static int socket_instantiate_service(Socket *s) { > - char *prefix, *name; > + _cleanup_free_ char *prefix = NULL, *name = NULL; > int r; > Unit *u; > > assert(s); > > /* This fills in s->service if it isn't filled in yet. For > - * Accept=yes sockets we create the next connection service > - * here. For Accept=no this is mostly a NOP since the service > + * Accept=yes and Distribute=n sockets we create the next connection > + * service here. Otherwise is mostly a NOP since the service > * is figured out at load time anyway. */ > > - if (UNIT_DEREF(s->service)) > + if (UNIT_DEREF(s->service) && !(s->distribute)) > return 0; > > - assert(s->accept); > + assert(s->accept || s->distribute); > > if (!(prefix = unit_name_to_prefix(UNIT(s)->id))) > return -ENOMEM; > > r = asprintf(&name, "%s@%u.service", prefix, s->n_accepted); Here we could use something like "%.*u", MAX(s->distribute-1), s->n_accepted to have nicely sorted instances... (E.g. systemctl sorts by type ane name). I'm not sure if that's better or not. > - free(prefix); > - > if (r < 0) > return -ENOMEM; > > r = manager_load_unit(UNIT(s)->manager, name, NULL, NULL, &u); > - free(name); > - > if (r < 0) > return r; > > @@ -513,6 +509,11 @@ static void socket_dump(Unit *u, FILE *f, const char > *prefix) { > "%sReusePort: %s\n", > prefix, yes_no(s->reuseport)); > > + if (s->distribute) > + fprintf(f, > + "%sDistribute: %d\n", > + prefix, s->distribute); > + > if (s->smack) > fprintf(f, > "%sSmackLabel: %s\n", > @@ -1454,7 +1455,7 @@ static void socket_enter_running(Socket *s, int cfd) { > return; > } > > - if (cfd < 0) { > + if (cfd < 0 && !(s->distribute)) { > Iterator i; > Unit *u; > bool pending = false; > @@ -1486,56 +1487,66 @@ static void socket_enter_running(Socket *s, int cfd) { > return; > } > > - r = socket_instantiate_service(s); > - if (r < 0) > - goto fail; > - > - r = instance_from_socket(cfd, s->n_accepted, &instance); > - if (r < 0) { > - if (r != -ENOTCONN) > - goto fail; > - > - /* ENOTCONN is legitimate if TCP RST was received. > - * This connection is over, but the socket unit > lives on. */ > - close_nointr_nofail(cfd); > - return; > - } > - > prefix = unit_name_to_prefix(UNIT(s)->id); > if (!prefix) { > r = -ENOMEM; > goto fail; > } > > - name = unit_name_build(prefix, instance, ".service"); > + do { > + r = socket_instantiate_service(s); > + if (r < 0) > + goto fail; > > - if (!name) { > - r = -ENOMEM; > - goto fail; > - } > + if (!(s->distribute)) { What does Distribute=1 mean? Is it treated as a special case of Distribute=n, and just one service@1.service is started? Or is treated as equivalent to Distribute=0, and service.service is started? I kind of like the first version, but then the in the manpage it should be clarified a bit that Distribute=0 is the default, and different from Distribute=1. > + r = instance_from_socket(cfd, s->n_accepted, > &instance); > + if (r < 0) { > + if (r != -ENOTCONN) > + goto fail; > > - r = unit_add_name(UNIT_DEREF(s->service), name); > - if (r < 0) > - goto fail; > + /* ENOTCONN is legitimate if TCP RST > was received. > + * This connection is over, but the > socket unit lives on. */ > + close_nointr_nofail(cfd); > + return; > + } > > - service = SERVICE(UNIT_DEREF(s->service)); > - unit_ref_unset(&s->service); > - s->n_accepted ++; > + name = unit_name_build(prefix, instance, > ".service"); > + if (!name) { > + r = -ENOMEM; > + goto fail; > + } > > - UNIT(service)->no_gc = false; > + r = unit_add_name(UNIT_DEREF(s->service), > name); > + if (r < 0) > + goto fail; > + } > > - unit_choose_id(UNIT(service), name); > + service = SERVICE(UNIT_DEREF(s->service)); > + unit_ref_unset(&s->service); > + s->n_accepted ++; > > - r = service_set_socket_fd(service, cfd, s); > - if (r < 0) > - goto fail; > + UNIT(service)->no_gc = false; > > - cfd = -1; > - s->n_connections ++; > + unit_choose_id(UNIT(service), name); > > - r = manager_add_job(UNIT(s)->manager, JOB_START, > UNIT(service), JOB_REPLACE, true, &error, NULL); > - if (r < 0) > - goto fail; > + r = service_set_socket_fd(service, cfd, s); > + if (r < 0) > + goto fail; > + > + cfd = -1; > + s->n_connections ++; > + > + r = manager_add_job(UNIT(s)->manager, JOB_START, > UNIT(service), JOB_REPLACE, true, &error, NULL); > + if (r < 0) > + goto fail; > + > + if(s->distribute > s->n_connections) { > + /* distribute implies reuseport */ > + s->reuseport = true; > + > + socket_enter_listening(s); > + } > + } while(s->distribute > s->n_connections); > > /* Notify clients about changed counters */ > unit_add_to_dbus_queue(UNIT(s)); > @@ -2263,14 +2274,21 @@ void socket_connection_unref(Socket *s) { > > /* The service is dead. Yay! > * > - * This is strictly for one-instance-per-connection > - * services. */ > + * This is for one-instance-per-connection > + * and Distribute= services */ > > assert(s->n_connections > 0); > s->n_connections--; > > log_debug_unit(UNIT(s)->id, > "%s: One connection closed, %u left.", UNIT(s)->id, > s->n_connections); > + > + if(s->distribute > s->n_connections && s->state == SOCKET_RUNNING){ Could this be 's->n_connections < s->distribute'? It just feels backwards. > + s->reuseport = true; Could this have changed? It seems it should be set in just one place. > + /* (re)enter systemd into SO_REUSEPORT pool, when it gets a > + * connection it will reestablish distribute target */ > + socket_enter_listening(s); > + } > } > > static void socket_reset_failed(Unit *u) { > diff --git a/src/core/socket.h b/src/core/socket.h > index 3d7eadc..5928356 100644 > --- a/src/core/socket.h > +++ b/src/core/socket.h > @@ -93,6 +93,8 @@ struct Socket { > LIST_HEAD(SocketPort, ports); > > unsigned n_accepted; > + /* when Accept=true this is the number of active connectoins > + * when Distribute=n this is the number of active workers */ > unsigned n_connections; Could this become n_instances then? > unsigned max_connections; > > @@ -145,6 +147,8 @@ struct Socket { > char *bind_to_device; > char *tcp_congestion; > bool reuseport; > + /* implies reuseport */ > + unsigned distribute; > long mq_maxmsg; > long mq_msgsize; > Zbyszek _______________________________________________ systemd-devel mailing list systemd-devel@lists.freedesktop.org http://lists.freedesktop.org/mailman/listinfo/systemd-devel