[systemd-devel] [PATCH 3/4] core: support Distribute=n to distribute to n SO_REUSEPORT workers

Zbigniew Jędrzejewski-Szmek zbyszek at in.waw.pl
Sun Nov 17 14:23:09 PST 2013


On Sat, Nov 16, 2013 at 01:18:14PM -0800, Shawn Landden wrote:
> v3: make each worker its own service
> v4: be less intrusive
> v5: misc fixups
> ---
>  TODO                                  |   3 -
>  man/systemd.socket.xml                |  11 ++++
>  src/core/dbus-socket.c                |   2 +
>  src/core/load-fragment-gperf.gperf.m4 |   1 +
>  src/core/service.c                    |   7 +-
>  src/core/service.h                    |   1 -
>  src/core/socket.c                     | 119 +++++++++++++++++++---------------
>  src/core/socket.h                     |   4 ++
>  8 files changed, 89 insertions(+), 59 deletions(-)
> 
> diff --git a/TODO b/TODO
> index efc7e2a..6067efb 100644
> --- a/TODO
> +++ b/TODO
> @@ -80,8 +80,6 @@ Features:
>  
>  * rfkill,backlight: we probably should run the load tools inside of the udev rules so that the state is properly initialized by the time other software sees it
>  
> -* Add a new Distribute=$NUMBER key to socket units that makes use of SO_REUSEPORT to distribute network traffic on $NUMBER instances
> -
>  * tmpfiles: when applying ownership to /run/log/journal, also do this for the journal fails contained in it
>  
>  * we probably should replace the left-over uses of strv_append() and replace them by strv_push() or strv_extend()
> @@ -259,7 +257,6 @@ Features:
>  * teach ConditionKernelCommandLine= globs or regexes (in order to match foobar={no,0,off})
>  
>  * Support SO_REUSEPORT with socket activation:
> -  - Let systemd maintain a pool of servers.
>    - Use for seamless upgrades, by running the new server before stopping the
>      old.
>  
> diff --git a/man/systemd.socket.xml b/man/systemd.socket.xml
> index 7c10c58..4a2189b 100644
> --- a/man/systemd.socket.xml
> +++ b/man/systemd.socket.xml
> @@ -519,6 +519,17 @@
>                          </varlistentry>
>  
>                          <varlistentry>
> +                                <term><varname>Distribute=</varname></term>
> +                                <listitem><para>Takes an integer
> +                                value. Systemd will spawn
> +                                given number of instances of service each
> +                                listening to the same socket. Default is 0.
"0" sounds scary here - technically true, but it suggests that unless
this is set, no isntances will be spawned. Also, it will not be the *same*
socket, if reuseport is set. Maybe

  Takes an integer value. If positive, systemd
  will spawn that many instances of the service,
  all listening on the same port. In this case the triggered service
  must be a template unit (name ending in <literal>@.service</literal>).
  If zero (the default), just one, non-instantiated service will be triggered.
  

> +                                Setting this requires corresponding service to
> +                                be an instansiated service (name ends with <literal>@.service</literal>).

> +                                This option implies <varname>Reuseport=</varname> above.</para></listitem>
                                                                     ^P

> +                        </varlistentry>
> +
> +                        <varlistentry>
>                                  <term><varname>SmackLabel=</varname></term>
>                                  <term><varname>SmackLabelIPIn=</varname></term>
>                                  <term><varname>SmackLabelIPOut=</varname></term>
> diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c
> index 60a8d05..4644007 100644
> --- a/src/core/dbus-socket.c
> +++ b/src/core/dbus-socket.c
> @@ -68,6 +68,7 @@
>          "  <property name=\"Listen\" type=\"a(ss)\" access=\"read\"/>\n"    \
>          "  <property name=\"Result\" type=\"s\" access=\"read\"/>\n"    \
>          "  <property name=\"ReusePort\" type=\"b\" access=\"read\"/>\n" \
> +        "  <property name=\"Distribute\" type=\"u\" access=\"read\"/>\n" \
>          "  <property name=\"SmackLabel\" type=\"s\" access=\"read\"/>\n" \
>          "  <property name=\"SmackLabelIPIn\" type=\"s\" access=\"read\"/>\n" \
>          "  <property name=\"SmackLabelIPOut\" type=\"s\" access=\"read\"/>\n" \
> @@ -196,6 +197,7 @@ static const BusProperty bus_socket_properties[] = {
>          { "MessageQueueMessageSize", bus_property_append_long, "x", offsetof(Socket, mq_msgsize)      },
>          { "Result",         bus_socket_append_socket_result,   "s", offsetof(Socket, result)          },
>          { "ReusePort",      bus_property_append_bool,          "b", offsetof(Socket, reuseport)       },
> +        { "Distribute",     bus_property_append_unsigned,      "u", offsetof(Socket, distribute)       },
>          { "SmackLabel",     bus_property_append_string,        "s", offsetof(Socket, smack),          true },
>          { "SmackLabelIPIn", bus_property_append_string,        "s", offsetof(Socket, smack_ip_in),    true },
>          { "SmackLabelIPOut",bus_property_append_string,        "s", offsetof(Socket, smack_ip_out),   true },
> diff --git a/src/core/load-fragment-gperf.gperf.m4 b/src/core/load-fragment-gperf.gperf.m4
> index b64fdc9..4058a1f 100644
> --- a/src/core/load-fragment-gperf.gperf.m4
> +++ b/src/core/load-fragment-gperf.gperf.m4
> @@ -211,6 +211,7 @@ Socket.PassCredentials,          config_parse_bool,                  0,
>  Socket.PassSecurity,             config_parse_bool,                  0,                             offsetof(Socket, pass_sec)
>  Socket.TCPCongestion,            config_parse_string,                0,                             offsetof(Socket, tcp_congestion)
>  Socket.ReusePort,                config_parse_bool,                  0,                             offsetof(Socket, reuseport)
> +Socket.Distribute,               config_parse_unsigned,              0,                             offsetof(Socket, distribute)
>  Socket.MessageQueueMaxMessages,  config_parse_long,                  0,                             offsetof(Socket, mq_maxmsg)
>  Socket.MessageQueueMessageSize,  config_parse_long,                  0,                             offsetof(Socket, mq_msgsize)
>  Socket.Service,                  config_parse_socket_service,        0,                             0
> diff --git a/src/core/service.c b/src/core/service.c
> index c0ee114..9c8cf9c 100644
> --- a/src/core/service.c
> +++ b/src/core/service.c
> @@ -3679,7 +3679,6 @@ static void service_bus_query_pid_done(
>  int service_set_socket_fd(Service *s, int fd, Socket *sock) {
>  
>          assert(s);
> -        assert(fd >= 0);
>  
>          /* This is called by the socket code when instantiating a new
>           * service for a stream socket and the socket needs to be
> @@ -3694,8 +3693,10 @@ int service_set_socket_fd(Service *s, int fd, Socket *sock) {
>          if (s->state != SERVICE_DEAD)
>                  return -EAGAIN;
>  
> -        s->socket_fd = fd;
> -        s->got_socket_fd = true;
> +        if (fd >= 0) {
> +                s->socket_fd = fd;
> +                s->got_socket_fd = true;
> +        }
>  
>          unit_ref_set(&s->accept_socket, UNIT(sock));
>  
> diff --git a/src/core/service.h b/src/core/service.h
> index 37fa6ff..2ffe7d1 100644
> --- a/src/core/service.h
> +++ b/src/core/service.h
> @@ -26,7 +26,6 @@ typedef struct Service Service;
>  #include "unit.h"
>  #include "path.h"
>  #include "ratelimit.h"
> -#include "service.h"
>  #include "kill.h"
>  #include "exit-status.h"
>  
> diff --git a/src/core/socket.c b/src/core/socket.c
> index 751f20b..4d4627a 100644
> --- a/src/core/socket.c
> +++ b/src/core/socket.c
> @@ -153,34 +153,30 @@ static void socket_done(Unit *u) {
>  }
>  
>  static int socket_instantiate_service(Socket *s) {
> -        char *prefix, *name;
> +        _cleanup_free_ char *prefix = NULL, *name = NULL;
>          int r;
>          Unit *u;
>  
>          assert(s);
>  
>          /* This fills in s->service if it isn't filled in yet. For
> -         * Accept=yes sockets we create the next connection service
> -         * here. For Accept=no this is mostly a NOP since the service
> +         * Accept=yes and Distribute=n sockets we create the next connection
> +         * service here. Otherwise is mostly a NOP since the service
>           * is figured out at load time anyway. */
>  
> -        if (UNIT_DEREF(s->service))
> +        if (UNIT_DEREF(s->service) && !(s->distribute))
>                  return 0;
>  
> -        assert(s->accept);
> +        assert(s->accept || s->distribute);
>  
>          if (!(prefix = unit_name_to_prefix(UNIT(s)->id)))
>                  return -ENOMEM;
>  
>          r = asprintf(&name, "%s@%u.service", prefix, s->n_accepted);
> -        free(prefix);
> -
>          if (r < 0)
>                  return -ENOMEM;
>  
>          r = manager_load_unit(UNIT(s)->manager, name, NULL, NULL, &u);
> -        free(name);
> -
>          if (r < 0)
>                  return r;
>  
> @@ -513,6 +509,11 @@ static void socket_dump(Unit *u, FILE *f, const char *prefix) {
>                          "%sReusePort: %s\n",
>                           prefix, yes_no(s->reuseport));
>  
> +        if (s->distribute)
> +                fprintf(f,
> +                        "%sDistribute: %d\n",
> +                         prefix, s->distribute);
> +
>          if (s->smack)
>                  fprintf(f,
>                          "%sSmackLabel: %s\n",
> @@ -577,9 +578,13 @@ static int instance_from_socket(int fd, unsigned nr, char **instance) {
>                  struct sockaddr_storage storage;
>          } local, remote;
>  
> -        assert(fd >= 0);
>          assert(instance);
>  
> +        if (fd < 0) {
> +                asprintf(&r, "%u", nr);
oom handling?

> +                goto shortcut;
> +        }
> +
>          l = sizeof(local);
>          if (getsockname(fd, &local.sa, &l) < 0)
>                  return -errno;
> @@ -663,6 +668,7 @@ static int instance_from_socket(int fd, unsigned nr, char **instance) {
>                  assert_not_reached("Unhandled socket type.");
>          }
>  
> +shortcut:
>          *instance = r;
>          return 0;
>  }
> @@ -769,8 +775,8 @@ static void socket_apply_socket_options(Socket *s, int fd) {
>                  if (setsockopt(fd, SOL_TCP, TCP_CONGESTION, s->tcp_congestion, strlen(s->tcp_congestion)+1) < 0)
>                          log_warning_unit(UNIT(s)->id, "TCP_CONGESTION failed: %m");
>  
> -        if (s->reuseport) {
> -                int b = s->reuseport;
> +        if (s->reuseport || s->distribute) {
> +                int b = true;
>                  if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &b, sizeof(b)) < 0)
>                          log_warning_unit(UNIT(s)->id, "SO_REUSEPORT failed: %m");
>          }
> @@ -1454,7 +1460,7 @@ static void socket_enter_running(Socket *s, int cfd) {
>                  return;
>          }
>  
> -        if (cfd < 0) {
> +        if (cfd < 0 && !(s->distribute)) {
>                  Iterator i;
>                  Unit *u;
>                  bool pending = false;
> @@ -1478,7 +1484,7 @@ static void socket_enter_running(Socket *s, int cfd) {
>                  _cleanup_free_ char *prefix = NULL, *instance = NULL, *name = NULL;
>                  Service *service;
>  
> -                if (s->n_connections >= s->max_connections) {
> +                if (s->n_connections >= s->max_connections && !(s->distribute)) {
>                          log_warning_unit(UNIT(s)->id,
>                                           "%s: Too many incoming connections (%u)",
>                                           UNIT(s)->id, s->n_connections);
> @@ -1486,56 +1492,60 @@ static void socket_enter_running(Socket *s, int cfd) {
>                          return;
>                  }
>  
> -                r = socket_instantiate_service(s);
> -                if (r < 0)
> -                        goto fail;
> -
> -                r = instance_from_socket(cfd, s->n_accepted, &instance);
> -                if (r < 0) {
> -                        if (r != -ENOTCONN)
> -                                goto fail;
> -
> -                        /* ENOTCONN is legitimate if TCP RST was received.
> -                         * This connection is over, but the socket unit lives on. */
> -                        close_nointr_nofail(cfd);
> -                        return;
> -                }
> -
>                  prefix = unit_name_to_prefix(UNIT(s)->id);
>                  if (!prefix) {
>                          r = -ENOMEM;
>                          goto fail;
>                  }
>  
> -                name = unit_name_build(prefix, instance, ".service");
> +                do {
> +                        r = socket_instantiate_service(s);
> +                        if (r < 0)
> +                                goto fail;
>  
> -                if (!name) {
> -                        r = -ENOMEM;
> -                        goto fail;
> -                }
> +                        r = instance_from_socket(cfd, s->n_accepted, &instance);
> +                        if (r < 0) {
> +                                if (r != -ENOTCONN)
> +                                        goto fail;
>  
> -                r = unit_add_name(UNIT_DEREF(s->service), name);
> -                if (r < 0)
> -                        goto fail;
> +                                /* ENOTCONN is legitimate if TCP RST was received.
> +                                 * This connection is over, but the socket unit lives on. */
> +                                close_nointr_nofail(cfd);
> +                                return;
> +                        }
>  
> -                service = SERVICE(UNIT_DEREF(s->service));
> -                unit_ref_unset(&s->service);
> -                s->n_accepted ++;
> +                        name = unit_name_build(prefix, instance, ".service");
> +                        if (!name) {
> +                                r = -ENOMEM;
> +                                goto fail;
> +                        }
>  
> -                UNIT(service)->no_gc = false;
> +                        r = unit_add_name(UNIT_DEREF(s->service), name);
> +                        if (r < 0)
> +                                goto fail;
>  
> -                unit_choose_id(UNIT(service), name);
> +                        service = SERVICE(UNIT_DEREF(s->service));
> +                        unit_ref_unset(&s->service);
> +                        s->n_accepted ++;
>  
> -                r = service_set_socket_fd(service, cfd, s);
> -                if (r < 0)
> -                        goto fail;
> +                        UNIT(service)->no_gc = false;
>  
> -                cfd = -1;
> -                s->n_connections ++;
> +                        unit_choose_id(UNIT(service), name);
>  
> -                r = manager_add_job(UNIT(s)->manager, JOB_START, UNIT(service), JOB_REPLACE, true, &error, NULL);
> -                if (r < 0)
> -                        goto fail;
> +                        r = service_set_socket_fd(service, cfd, s);
> +                        if (r < 0)
> +                                goto fail;
> +
> +                        cfd = -1;
> +                        s->n_connections ++;
> +
> +                        r = manager_add_job(UNIT(s)->manager, JOB_START, UNIT(service), JOB_REPLACE, true, &error, NULL);
> +                        if (r < 0)
> +                                goto fail;
> +
> +                        if (s->n_connections < s->distribute)
> +                                socket_enter_listening(s);
> +                } while (s->n_connections < s->distribute);
>  
>                  /* Notify clients about changed counters */
>                  unit_add_to_dbus_queue(UNIT(s));
> @@ -2263,14 +2273,19 @@ void socket_connection_unref(Socket *s) {
>  
>          /* The service is dead. Yay!
>           *
> -         * This is strictly for one-instance-per-connection
> -         * services. */
> +         * This is for one-instance-per-connection
> +         * and Distribute= services */
>  
>          assert(s->n_connections > 0);
>          s->n_connections--;
>  
>          log_debug_unit(UNIT(s)->id,
>                         "%s: One connection closed, %u left.", UNIT(s)->id, s->n_connections);
> +
> +        if(s->n_connections < s->distribute && s->state == SOCKET_RUNNING)
> +                /* (re)enter systemd into SO_REUSEPORT pool, when it gets a
> +                 * connection it will reestablish distribute target */
> +                socket_enter_listening(s);
>  }
>  
>  static void socket_reset_failed(Unit *u) {
> diff --git a/src/core/socket.h b/src/core/socket.h
> index 3d7eadc..5928356 100644
> --- a/src/core/socket.h
> +++ b/src/core/socket.h
> @@ -93,6 +93,8 @@ struct Socket {
>          LIST_HEAD(SocketPort, ports);
>  
>          unsigned n_accepted;
> +        /* when Accept=true this is the number of active connectoins
> +         * when Distribute=n this is the number of active workers */
>          unsigned n_connections;
>          unsigned max_connections;
>  
> @@ -145,6 +147,8 @@ struct Socket {
>          char *bind_to_device;
>          char *tcp_congestion;
>          bool reuseport;
> +        /* implies reuseport */
> +        unsigned distribute;
>          long mq_maxmsg;
>          long mq_msgsize;
>  

Zbyszek


More information about the systemd-devel mailing list