[systemd-devel] [PATCH] core: support Distribute=n to distribute to n SO_REUSEPORT workers

Zbigniew Jędrzejewski-Szmek zbyszek at in.waw.pl
Sat Nov 16 07:38:42 PST 2013


On Fri, Nov 15, 2013 at 08:22:14PM -0800, Shawn Landden wrote:
> v3: make each worker its own service
> v4: be less intrusive
Hi Shawn,
unfortunately this doesn't apply cleanly. Can you rebase?

> diff --git a/man/systemd.socket.xml b/man/systemd.socket.xml
> index 7c10c58..92a9275 100644
> --- a/man/systemd.socket.xml
> +++ b/man/systemd.socket.xml
> @@ -519,6 +519,15 @@
>                          </varlistentry>
>  
>                          <varlistentry>
> +                                <term><varname>Distribute=</varname></term>
> +                                <listitem><para>Takes an integer
> +                                value. If greater than one, systemd will spawn
> +                                given number of instances of service each
> +                                listening to the same socket. This option implies
> +                                <varname>Reuseport=</varname> above.</para></listitem>
> +                        </varlistentry>
> +
> +                        <varlistentry>
>                                  <term><varname>SmackLabel=</varname></term>
>                                  <term><varname>SmackLabelIPIn=</varname></term>
>                                  <term><varname>SmackLabelIPOut=</varname></term>
> diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c
> index 60a8d05..4644007 100644
> --- a/src/core/dbus-socket.c
> +++ b/src/core/dbus-socket.c
> @@ -68,6 +68,7 @@
>          "  <property name=\"Listen\" type=\"a(ss)\" access=\"read\"/>\n"    \
>          "  <property name=\"Result\" type=\"s\" access=\"read\"/>\n"    \
>          "  <property name=\"ReusePort\" type=\"b\" access=\"read\"/>\n" \
> +        "  <property name=\"Distribute\" type=\"u\" access=\"read\"/>\n" \
>          "  <property name=\"SmackLabel\" type=\"s\" access=\"read\"/>\n" \
>          "  <property name=\"SmackLabelIPIn\" type=\"s\" access=\"read\"/>\n" \
>          "  <property name=\"SmackLabelIPOut\" type=\"s\" access=\"read\"/>\n" \
> @@ -196,6 +197,7 @@ static const BusProperty bus_socket_properties[] = {
>          { "MessageQueueMessageSize", bus_property_append_long, "x", offsetof(Socket, mq_msgsize)      },
>          { "Result",         bus_socket_append_socket_result,   "s", offsetof(Socket, result)          },
>          { "ReusePort",      bus_property_append_bool,          "b", offsetof(Socket, reuseport)       },
> +        { "Distribute",     bus_property_append_unsigned,      "u", offsetof(Socket, distribute)       },
>          { "SmackLabel",     bus_property_append_string,        "s", offsetof(Socket, smack),          true },
>          { "SmackLabelIPIn", bus_property_append_string,        "s", offsetof(Socket, smack_ip_in),    true },
>          { "SmackLabelIPOut",bus_property_append_string,        "s", offsetof(Socket, smack_ip_out),   true },
> diff --git a/src/core/load-fragment-gperf.gperf.m4 b/src/core/load-fragment-gperf.gperf.m4
> index b64fdc9..4058a1f 100644
> --- a/src/core/load-fragment-gperf.gperf.m4
> +++ b/src/core/load-fragment-gperf.gperf.m4
> @@ -211,6 +211,7 @@ Socket.PassCredentials,          config_parse_bool,                  0,
>  Socket.PassSecurity,             config_parse_bool,                  0,                             offsetof(Socket, pass_sec)
>  Socket.TCPCongestion,            config_parse_string,                0,                             offsetof(Socket, tcp_congestion)
>  Socket.ReusePort,                config_parse_bool,                  0,                             offsetof(Socket, reuseport)
> +Socket.Distribute,               config_parse_unsigned,              0,                             offsetof(Socket, distribute)
>  Socket.MessageQueueMaxMessages,  config_parse_long,                  0,                             offsetof(Socket, mq_maxmsg)
>  Socket.MessageQueueMessageSize,  config_parse_long,                  0,                             offsetof(Socket, mq_msgsize)
>  Socket.Service,                  config_parse_socket_service,        0,                             0
> diff --git a/src/core/service.c b/src/core/service.c
> index 3da32a1..8fc55a0 100644
> --- a/src/core/service.c
> +++ b/src/core/service.c
> @@ -3663,7 +3663,6 @@ static void service_bus_query_pid_done(
>  int service_set_socket_fd(Service *s, int fd, Socket *sock) {
>  
>          assert(s);
> -        assert(fd >= 0);
>  
>          /* This is called by the socket code when instantiating a new
>           * service for a stream socket and the socket needs to be
> @@ -3678,8 +3677,10 @@ int service_set_socket_fd(Service *s, int fd, Socket *sock) {
>          if (s->state != SERVICE_DEAD)
>                  return -EAGAIN;
>  
> -        s->socket_fd = fd;
> -        s->got_socket_fd = true;
> +        if (fd >= 0) {
> +                s->socket_fd = fd;
> +                s->got_socket_fd = true;
> +        }
>  
>          unit_ref_set(&s->accept_socket, UNIT(sock));
>  
> diff --git a/src/core/service.h b/src/core/service.h
> index 37fa6ff..2ffe7d1 100644
> --- a/src/core/service.h
> +++ b/src/core/service.h
> @@ -26,7 +26,6 @@ typedef struct Service Service;
>  #include "unit.h"
>  #include "path.h"
>  #include "ratelimit.h"
> -#include "service.h"
>  #include "kill.h"
>  #include "exit-status.h"
>  
> diff --git a/src/core/socket.c b/src/core/socket.c
> index 751f20b..11b649b 100644
> --- a/src/core/socket.c
> +++ b/src/core/socket.c
> @@ -153,34 +153,30 @@ static void socket_done(Unit *u) {
>  }
>  
>  static int socket_instantiate_service(Socket *s) {
> -        char *prefix, *name;
> +        _cleanup_free_ char *prefix = NULL, *name = NULL;
>          int r;
>          Unit *u;
>  
>          assert(s);
>  
>          /* This fills in s->service if it isn't filled in yet. For
> -         * Accept=yes sockets we create the next connection service
> -         * here. For Accept=no this is mostly a NOP since the service
> +         * Accept=yes and Distribute=n sockets we create the next connection
> +         * service here. Otherwise is mostly a NOP since the service
>           * is figured out at load time anyway. */
>  
> -        if (UNIT_DEREF(s->service))
> +        if (UNIT_DEREF(s->service) && !(s->distribute))
>                  return 0;
>  
> -        assert(s->accept);
> +        assert(s->accept || s->distribute);
>  
>          if (!(prefix = unit_name_to_prefix(UNIT(s)->id)))
>                  return -ENOMEM;
>  
>          r = asprintf(&name, "%s@%u.service", prefix, s->n_accepted);
Here we could use something like "%.*u", MAX(s->distribute-1), s->n_accepted
to have nicely sorted instances... (E.g. systemctl sorts by type ane name).
I'm not sure if that's better or not.

> -        free(prefix);
> -
>          if (r < 0)
>                  return -ENOMEM;
>  
>          r = manager_load_unit(UNIT(s)->manager, name, NULL, NULL, &u);
> -        free(name);
> -
>          if (r < 0)
>                  return r;
>  
> @@ -513,6 +509,11 @@ static void socket_dump(Unit *u, FILE *f, const char *prefix) {
>                          "%sReusePort: %s\n",
>                           prefix, yes_no(s->reuseport));
>  
> +        if (s->distribute)
> +                fprintf(f,
> +                        "%sDistribute: %d\n",
> +                         prefix, s->distribute);
> +
>          if (s->smack)
>                  fprintf(f,
>                          "%sSmackLabel: %s\n",
> @@ -1454,7 +1455,7 @@ static void socket_enter_running(Socket *s, int cfd) {
>                  return;
>          }
>  
> -        if (cfd < 0) {
> +        if (cfd < 0 && !(s->distribute)) {
>                  Iterator i;
>                  Unit *u;
>                  bool pending = false;
> @@ -1486,56 +1487,66 @@ static void socket_enter_running(Socket *s, int cfd) {
>                          return;
>                  }
>  
> -                r = socket_instantiate_service(s);
> -                if (r < 0)
> -                        goto fail;
> -
> -                r = instance_from_socket(cfd, s->n_accepted, &instance);
> -                if (r < 0) {
> -                        if (r != -ENOTCONN)
> -                                goto fail;
> -
> -                        /* ENOTCONN is legitimate if TCP RST was received.
> -                         * This connection is over, but the socket unit lives on. */
> -                        close_nointr_nofail(cfd);
> -                        return;
> -                }
> -
>                  prefix = unit_name_to_prefix(UNIT(s)->id);
>                  if (!prefix) {
>                          r = -ENOMEM;
>                          goto fail;
>                  }
>  
> -                name = unit_name_build(prefix, instance, ".service");
> +                do {
> +                        r = socket_instantiate_service(s);
> +                        if (r < 0)
> +                                goto fail;
>  
> -                if (!name) {
> -                        r = -ENOMEM;
> -                        goto fail;
> -                }
> +                        if (!(s->distribute)) {
What does Distribute=1 mean? Is it treated as a special case of Distribute=n,
and just one service at 1.service is started? Or is treated as equivalent to
Distribute=0, and service.service is started? I kind of like the first version,
but then the in the manpage it should be clarified a bit that Distribute=0 is
the default, and different from Distribute=1.

> +                                r = instance_from_socket(cfd, s->n_accepted, &instance);
> +                                if (r < 0) {
> +                                        if (r != -ENOTCONN)
> +                                                goto fail;
>  
> -                r = unit_add_name(UNIT_DEREF(s->service), name);
> -                if (r < 0)
> -                        goto fail;
> +                                        /* ENOTCONN is legitimate if TCP RST was received.
> +                                         * This connection is over, but the socket unit lives on. */
> +                                        close_nointr_nofail(cfd);
> +                                        return;
> +                                }
>  
> -                service = SERVICE(UNIT_DEREF(s->service));
> -                unit_ref_unset(&s->service);
> -                s->n_accepted ++;
> +                                name = unit_name_build(prefix, instance, ".service");
> +                                if (!name) {
> +                                        r = -ENOMEM;
> +                                        goto fail;
> +                                }
>  
> -                UNIT(service)->no_gc = false;
> +                                r = unit_add_name(UNIT_DEREF(s->service), name);
> +                                if (r < 0)
> +                                        goto fail;
> +                        }
>  
> -                unit_choose_id(UNIT(service), name);
> +                        service = SERVICE(UNIT_DEREF(s->service));
> +                        unit_ref_unset(&s->service);
> +                        s->n_accepted ++;
>  
> -                r = service_set_socket_fd(service, cfd, s);
> -                if (r < 0)
> -                        goto fail;
> +                        UNIT(service)->no_gc = false;
>  
> -                cfd = -1;
> -                s->n_connections ++;
> +                        unit_choose_id(UNIT(service), name);
>  
> -                r = manager_add_job(UNIT(s)->manager, JOB_START, UNIT(service), JOB_REPLACE, true, &error, NULL);
> -                if (r < 0)
> -                        goto fail;
> +                        r = service_set_socket_fd(service, cfd, s);
> +                        if (r < 0)
> +                                goto fail;
> +
> +                        cfd = -1;
> +                        s->n_connections ++;
> +
> +                        r = manager_add_job(UNIT(s)->manager, JOB_START, UNIT(service), JOB_REPLACE, true, &error, NULL);
> +                        if (r < 0)
> +                                goto fail;
> +
> +                        if(s->distribute > s->n_connections) {
> +                                /* distribute implies reuseport */
> +                                s->reuseport = true;
> +
> +                                socket_enter_listening(s);
> +                        }
> +                } while(s->distribute > s->n_connections);
>  
>                  /* Notify clients about changed counters */
>                  unit_add_to_dbus_queue(UNIT(s));
> @@ -2263,14 +2274,21 @@ void socket_connection_unref(Socket *s) {
>  
>          /* The service is dead. Yay!
>           *
> -         * This is strictly for one-instance-per-connection
> -         * services. */
> +         * This is for one-instance-per-connection
> +         * and Distribute= services */
>  
>          assert(s->n_connections > 0);
>          s->n_connections--;
>  
>          log_debug_unit(UNIT(s)->id,
>                         "%s: One connection closed, %u left.", UNIT(s)->id, s->n_connections);
> +
> +        if(s->distribute > s->n_connections && s->state == SOCKET_RUNNING){
Could this be 's->n_connections < s->distribute'? It just feels backwards.

> +                s->reuseport = true;
Could this have changed? It seems it should be set in just one place.

> +                /* (re)enter systemd into SO_REUSEPORT pool, when it gets a
> +                 * connection it will reestablish distribute target */
> +                socket_enter_listening(s);
> +        }
>  }
>  
>  static void socket_reset_failed(Unit *u) {
> diff --git a/src/core/socket.h b/src/core/socket.h
> index 3d7eadc..5928356 100644
> --- a/src/core/socket.h
> +++ b/src/core/socket.h
> @@ -93,6 +93,8 @@ struct Socket {
>          LIST_HEAD(SocketPort, ports);
>  
>          unsigned n_accepted;
> +        /* when Accept=true this is the number of active connectoins
> +         * when Distribute=n this is the number of active workers */
>          unsigned n_connections;
Could this become n_instances then?

>          unsigned max_connections;
>  
> @@ -145,6 +147,8 @@ struct Socket {
>          char *bind_to_device;
>          char *tcp_congestion;
>          bool reuseport;
> +        /* implies reuseport */
> +        unsigned distribute;
>          long mq_maxmsg;
>          long mq_msgsize;
>  

Zbyszek


More information about the systemd-devel mailing list