On Sat, Nov 16, 2013 at 7:38 AM, Zbigniew Jędrzejewski-Szmek
<zbys...@in.waw.pl> wrote:
> On Fri, Nov 15, 2013 at 08:22:14PM -0800, Shawn Landden wrote:
>> v3: make each worker its own service
>> v4: be less intrusive
> Hi Shawn,
> unfortunately this doesn't apply cleanly. Can you rebase?
thats because it is 3rd in a series, I will send the whole series
right after this email
>
>> diff --git a/man/systemd.socket.xml b/man/systemd.socket.xml
>> index 7c10c58..92a9275 100644
>> --- a/man/systemd.socket.xml
>> +++ b/man/systemd.socket.xml
>> @@ -519,6 +519,15 @@
>>                          </varlistentry>
>>
>>                          <varlistentry>
>> +                                <term><varname>Distribute=</varname></term>
>> +                                <listitem><para>Takes an integer
>> +                                value. If greater than one, systemd will 
>> spawn
>> +                                given number of instances of service each
>> +                                listening to the same socket. This option 
>> implies
>> +                                <varname>Reuseport=</varname> 
>> above.</para></listitem>
>> +                        </varlistentry>
>> +
>> +                        <varlistentry>
>>                                  <term><varname>SmackLabel=</varname></term>
>>                                  
>> <term><varname>SmackLabelIPIn=</varname></term>
>>                                  
>> <term><varname>SmackLabelIPOut=</varname></term>
>> diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c
>> index 60a8d05..4644007 100644
>> --- a/src/core/dbus-socket.c
>> +++ b/src/core/dbus-socket.c
>> @@ -68,6 +68,7 @@
>>          "  <property name=\"Listen\" type=\"a(ss)\" access=\"read\"/>\n"    
>> \
>>          "  <property name=\"Result\" type=\"s\" access=\"read\"/>\n"    \
>>          "  <property name=\"ReusePort\" type=\"b\" access=\"read\"/>\n" \
>> +        "  <property name=\"Distribute\" type=\"u\" access=\"read\"/>\n" \
>>          "  <property name=\"SmackLabel\" type=\"s\" access=\"read\"/>\n" \
>>          "  <property name=\"SmackLabelIPIn\" type=\"s\" 
>> access=\"read\"/>\n" \
>>          "  <property name=\"SmackLabelIPOut\" type=\"s\" 
>> access=\"read\"/>\n" \
>> @@ -196,6 +197,7 @@ static const BusProperty bus_socket_properties[] = {
>>          { "MessageQueueMessageSize", bus_property_append_long, "x", 
>> offsetof(Socket, mq_msgsize)      },
>>          { "Result",         bus_socket_append_socket_result,   "s", 
>> offsetof(Socket, result)          },
>>          { "ReusePort",      bus_property_append_bool,          "b", 
>> offsetof(Socket, reuseport)       },
>> +        { "Distribute",     bus_property_append_unsigned,      "u", 
>> offsetof(Socket, distribute)       },
>>          { "SmackLabel",     bus_property_append_string,        "s", 
>> offsetof(Socket, smack),          true },
>>          { "SmackLabelIPIn", bus_property_append_string,        "s", 
>> offsetof(Socket, smack_ip_in),    true },
>>          { "SmackLabelIPOut",bus_property_append_string,        "s", 
>> offsetof(Socket, smack_ip_out),   true },
>> diff --git a/src/core/load-fragment-gperf.gperf.m4 
>> b/src/core/load-fragment-gperf.gperf.m4
>> index b64fdc9..4058a1f 100644
>> --- a/src/core/load-fragment-gperf.gperf.m4
>> +++ b/src/core/load-fragment-gperf.gperf.m4
>> @@ -211,6 +211,7 @@ Socket.PassCredentials,          config_parse_bool,      
>>             0,
>>  Socket.PassSecurity,             config_parse_bool,                  0,     
>>                         offsetof(Socket, pass_sec)
>>  Socket.TCPCongestion,            config_parse_string,                0,     
>>                         offsetof(Socket, tcp_congestion)
>>  Socket.ReusePort,                config_parse_bool,                  0,     
>>                         offsetof(Socket, reuseport)
>> +Socket.Distribute,               config_parse_unsigned,              0,     
>>                         offsetof(Socket, distribute)
>>  Socket.MessageQueueMaxMessages,  config_parse_long,                  0,     
>>                         offsetof(Socket, mq_maxmsg)
>>  Socket.MessageQueueMessageSize,  config_parse_long,                  0,     
>>                         offsetof(Socket, mq_msgsize)
>>  Socket.Service,                  config_parse_socket_service,        0,     
>>                         0
>> diff --git a/src/core/service.c b/src/core/service.c
>> index 3da32a1..8fc55a0 100644
>> --- a/src/core/service.c
>> +++ b/src/core/service.c
>> @@ -3663,7 +3663,6 @@ static void service_bus_query_pid_done(
>>  int service_set_socket_fd(Service *s, int fd, Socket *sock) {
>>
>>          assert(s);
>> -        assert(fd >= 0);
>>
>>          /* This is called by the socket code when instantiating a new
>>           * service for a stream socket and the socket needs to be
>> @@ -3678,8 +3677,10 @@ int service_set_socket_fd(Service *s, int fd, Socket 
>> *sock) {
>>          if (s->state != SERVICE_DEAD)
>>                  return -EAGAIN;
>>
>> -        s->socket_fd = fd;
>> -        s->got_socket_fd = true;
>> +        if (fd >= 0) {
>> +                s->socket_fd = fd;
>> +                s->got_socket_fd = true;
>> +        }
>>
>>          unit_ref_set(&s->accept_socket, UNIT(sock));
>>
>> diff --git a/src/core/service.h b/src/core/service.h
>> index 37fa6ff..2ffe7d1 100644
>> --- a/src/core/service.h
>> +++ b/src/core/service.h
>> @@ -26,7 +26,6 @@ typedef struct Service Service;
>>  #include "unit.h"
>>  #include "path.h"
>>  #include "ratelimit.h"
>> -#include "service.h"
>>  #include "kill.h"
>>  #include "exit-status.h"
>>
>> diff --git a/src/core/socket.c b/src/core/socket.c
>> index 751f20b..11b649b 100644
>> --- a/src/core/socket.c
>> +++ b/src/core/socket.c
>> @@ -153,34 +153,30 @@ static void socket_done(Unit *u) {
>>  }
>>
>>  static int socket_instantiate_service(Socket *s) {
>> -        char *prefix, *name;
>> +        _cleanup_free_ char *prefix = NULL, *name = NULL;
>>          int r;
>>          Unit *u;
>>
>>          assert(s);
>>
>>          /* This fills in s->service if it isn't filled in yet. For
>> -         * Accept=yes sockets we create the next connection service
>> -         * here. For Accept=no this is mostly a NOP since the service
>> +         * Accept=yes and Distribute=n sockets we create the next connection
>> +         * service here. Otherwise is mostly a NOP since the service
>>           * is figured out at load time anyway. */
>>
>> -        if (UNIT_DEREF(s->service))
>> +        if (UNIT_DEREF(s->service) && !(s->distribute))
>>                  return 0;
>>
>> -        assert(s->accept);
>> +        assert(s->accept || s->distribute);
>>
>>          if (!(prefix = unit_name_to_prefix(UNIT(s)->id)))
>>                  return -ENOMEM;
>>
>>          r = asprintf(&name, "%s@%u.service", prefix, s->n_accepted);
> Here we could use something like "%.*u", MAX(s->distribute-1), s->n_accepted
> to have nicely sorted instances... (E.g. systemctl sorts by type ane name).
> I'm not sure if that's better or not.
I don't think this is a good idea. It needlessly special-cases
s->distribute, and as exited workers are
respawned the number can go above s->distribute, so it isn't really
right. This issues is purely cosmetic,
as all the workers are the same, and even then is really only a
systemctl problem, not a pid 1 problem,
and should be fixed as such, with more fanciful ordering.
>
>> -        free(prefix);
>> -
>>          if (r < 0)
>>                  return -ENOMEM;
>>
>>          r = manager_load_unit(UNIT(s)->manager, name, NULL, NULL, &u);
>> -        free(name);
>> -
>>          if (r < 0)
>>                  return r;
>>
>> @@ -513,6 +509,11 @@ static void socket_dump(Unit *u, FILE *f, const char 
>> *prefix) {
>>                          "%sReusePort: %s\n",
>>                           prefix, yes_no(s->reuseport));
>>
>> +        if (s->distribute)
>> +                fprintf(f,
>> +                        "%sDistribute: %d\n",
>> +                         prefix, s->distribute);
>> +
>>          if (s->smack)
>>                  fprintf(f,
>>                          "%sSmackLabel: %s\n",
>> @@ -1454,7 +1455,7 @@ static void socket_enter_running(Socket *s, int cfd) {
>>                  return;
>>          }
>>
>> -        if (cfd < 0) {
>> +        if (cfd < 0 && !(s->distribute)) {
>>                  Iterator i;
>>                  Unit *u;
>>                  bool pending = false;
>> @@ -1486,56 +1487,66 @@ static void socket_enter_running(Socket *s, int cfd) 
>> {
>>                          return;
>>                  }
>>
>> -                r = socket_instantiate_service(s);
>> -                if (r < 0)
>> -                        goto fail;
>> -
>> -                r = instance_from_socket(cfd, s->n_accepted, &instance);
>> -                if (r < 0) {
>> -                        if (r != -ENOTCONN)
>> -                                goto fail;
>> -
>> -                        /* ENOTCONN is legitimate if TCP RST was received.
>> -                         * This connection is over, but the socket unit 
>> lives on. */
>> -                        close_nointr_nofail(cfd);
>> -                        return;
>> -                }
>> -
>>                  prefix = unit_name_to_prefix(UNIT(s)->id);
>>                  if (!prefix) {
>>                          r = -ENOMEM;
>>                          goto fail;
>>                  }
>>
>> -                name = unit_name_build(prefix, instance, ".service");
>> +                do {
>> +                        r = socket_instantiate_service(s);
>> +                        if (r < 0)
>> +                                goto fail;
>>
>> -                if (!name) {
>> -                        r = -ENOMEM;
>> -                        goto fail;
>> -                }
>> +                        if (!(s->distribute)) {
> What does Distribute=1 mean? Is it treated as a special case of Distribute=n,
> and just one service@1.service is started? Or is treated as equivalent to
> Distribute=0, and service.service is started? I kind of like the first 
> version,
> but then the in the manpage it should be clarified a bit that Distribute=0 is
> the default, and different from Distribute=1.
the latter, clarified in man page.
>
>> +                                r = instance_from_socket(cfd, 
>> s->n_accepted, &instance);
>> +                                if (r < 0) {
>> +                                        if (r != -ENOTCONN)
>> +                                                goto fail;
>>
>> -                r = unit_add_name(UNIT_DEREF(s->service), name);
>> -                if (r < 0)
>> -                        goto fail;
>> +                                        /* ENOTCONN is legitimate if TCP 
>> RST was received.
>> +                                         * This connection is over, but the 
>> socket unit lives on. */
>> +                                        close_nointr_nofail(cfd);
>> +                                        return;
>> +                                }
>>
>> -                service = SERVICE(UNIT_DEREF(s->service));
>> -                unit_ref_unset(&s->service);
>> -                s->n_accepted ++;
>> +                                name = unit_name_build(prefix, instance, 
>> ".service");
>> +                                if (!name) {
>> +                                        r = -ENOMEM;
>> +                                        goto fail;
>> +                                }
>>
>> -                UNIT(service)->no_gc = false;
>> +                                r = unit_add_name(UNIT_DEREF(s->service), 
>> name);
>> +                                if (r < 0)
>> +                                        goto fail;
>> +                        }
>>
>> -                unit_choose_id(UNIT(service), name);
>> +                        service = SERVICE(UNIT_DEREF(s->service));
>> +                        unit_ref_unset(&s->service);
>> +                        s->n_accepted ++;
>>
>> -                r = service_set_socket_fd(service, cfd, s);
>> -                if (r < 0)
>> -                        goto fail;
>> +                        UNIT(service)->no_gc = false;
>>
>> -                cfd = -1;
>> -                s->n_connections ++;
>> +                        unit_choose_id(UNIT(service), name);
>>
>> -                r = manager_add_job(UNIT(s)->manager, JOB_START, 
>> UNIT(service), JOB_REPLACE, true, &error, NULL);
>> -                if (r < 0)
>> -                        goto fail;
>> +                        r = service_set_socket_fd(service, cfd, s);
>> +                        if (r < 0)
>> +                                goto fail;
>> +
>> +                        cfd = -1;
>> +                        s->n_connections ++;
>> +
>> +                        r = manager_add_job(UNIT(s)->manager, JOB_START, 
>> UNIT(service), JOB_REPLACE, true, &error, NULL);
>> +                        if (r < 0)
>> +                                goto fail;
>> +
>> +                        if(s->distribute > s->n_connections) {
>> +                                /* distribute implies reuseport */
>> +                                s->reuseport = true;
>> +
>> +                                socket_enter_listening(s);
>> +                        }
>> +                } while(s->distribute > s->n_connections);
>>
>>                  /* Notify clients about changed counters */
>>                  unit_add_to_dbus_queue(UNIT(s));
>> @@ -2263,14 +2274,21 @@ void socket_connection_unref(Socket *s) {
>>
>>          /* The service is dead. Yay!
>>           *
>> -         * This is strictly for one-instance-per-connection
>> -         * services. */
>> +         * This is for one-instance-per-connection
>> +         * and Distribute= services */
>>
>>          assert(s->n_connections > 0);
>>          s->n_connections--;
>>
>>          log_debug_unit(UNIT(s)->id,
>>                         "%s: One connection closed, %u left.", UNIT(s)->id, 
>> s->n_connections);
>> +
>> +        if(s->distribute > s->n_connections && s->state == SOCKET_RUNNING){
> Could this be 's->n_connections < s->distribute'? It just feels backwards.
changed everywhere
>
>> +                s->reuseport = true;
> Could this have changed? It seems it should be set in just one place.
moved to socket_apply_socket_options()
>
>> +                /* (re)enter systemd into SO_REUSEPORT pool, when it gets a
>> +                 * connection it will reestablish distribute target */
>> +                socket_enter_listening(s);
>> +        }
>>  }
>>
>>  static void socket_reset_failed(Unit *u) {
>> diff --git a/src/core/socket.h b/src/core/socket.h
>> index 3d7eadc..5928356 100644
>> --- a/src/core/socket.h
>> +++ b/src/core/socket.h
>> @@ -93,6 +93,8 @@ struct Socket {
>>          LIST_HEAD(SocketPort, ports);
>>
>>          unsigned n_accepted;
>> +        /* when Accept=true this is the number of active connectoins
>> +         * when Distribute=n this is the number of active workers */
>>          unsigned n_connections;
> Could this become n_instances then?
I did this, but it was too ugly as NConnections is an exported API.
>
>>          unsigned max_connections;
>>
>> @@ -145,6 +147,8 @@ struct Socket {
>>          char *bind_to_device;
>>          char *tcp_congestion;
>>          bool reuseport;
>> +        /* implies reuseport */
>> +        unsigned distribute;
>>          long mq_maxmsg;
>>          long mq_msgsize;
>>
>
> Zbyszek
> _______________________________________________
> systemd-devel mailing list
> systemd-devel@lists.freedesktop.org
> http://lists.freedesktop.org/mailman/listinfo/systemd-devel
#include <stdio.h>
#include <sys/types.h> 
#include <sys/socket.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <systemd/sd-daemon.h>

int main( int argc, char *argv[] )
{
    int sockfd, newsockfd, portno, clilen;
    char buffer[256];
    struct sockaddr_in serv_addr, cli_addr;
    int  n;

    if (sd_listen_fds(0) != 1) {
        fprintf(stderr, "No or too many file descriptors received.\n");
        exit(1);
    }

    sockfd = SD_LISTEN_FDS_START + 0;

    listen(sockfd,5);
    clilen = sizeof(cli_addr);

while(1) {
	    /* Accept actual connection from the client */
	    newsockfd = accept(sockfd, (struct sockaddr *)&cli_addr, 
		                        &clilen);
	    if (newsockfd < 0) 
	    {
		perror("ERROR on accept");
		exit(1);
	    }
	    /* If connection is established then start communicating */
	    bzero(buffer,256);
	    n = read( newsockfd,buffer,255 );
	    if (n < 0)
	    {
		perror("ERROR reading from socket");
		exit(1);
	    }
	    buffer[strchr(buffer, '\n') - buffer] = '\0';
	    printf("PID %d got: %s\n", getpid(), buffer);

char resp[1024];
snprintf(resp, 1024, "PID %d got your message\n", getpid());
	    n = write(newsockfd,resp,strlen(resp));
	    if (n < 0)
	    {
		perror("ERROR writing to socket");
		exit(1);
	    }
	    close(newsockfd);
}
    return 0; 
}

Attachment: reuseport.socket
Description: Binary data

Attachment: reuseport@.service
Description: Binary data

_______________________________________________
systemd-devel mailing list
systemd-devel@lists.freedesktop.org
http://lists.freedesktop.org/mailman/listinfo/systemd-devel

Reply via email to