[systemd-devel] [PATCH 3/4] core: support Distribute=n to distribute to n SO_REUSEPORT workers

Shawn Landden shawn at churchofgit.com
Sat Nov 16 13:18:14 PST 2013


v3: make each worker its own service
v4: be less intrusive
v5: misc fixups
---
 TODO                                  |   3 -
 man/systemd.socket.xml                |  11 ++++
 src/core/dbus-socket.c                |   2 +
 src/core/load-fragment-gperf.gperf.m4 |   1 +
 src/core/service.c                    |   7 +-
 src/core/service.h                    |   1 -
 src/core/socket.c                     | 119 +++++++++++++++++++---------------
 src/core/socket.h                     |   4 ++
 8 files changed, 89 insertions(+), 59 deletions(-)

diff --git a/TODO b/TODO
index efc7e2a..6067efb 100644
--- a/TODO
+++ b/TODO
@@ -80,8 +80,6 @@ Features:
 
 * rfkill,backlight: we probably should run the load tools inside of the udev rules so that the state is properly initialized by the time other software sees it
 
-* Add a new Distribute=$NUMBER key to socket units that makes use of SO_REUSEPORT to distribute network traffic on $NUMBER instances
-
 * tmpfiles: when applying ownership to /run/log/journal, also do this for the journal fails contained in it
 
 * we probably should replace the left-over uses of strv_append() and replace them by strv_push() or strv_extend()
@@ -259,7 +257,6 @@ Features:
 * teach ConditionKernelCommandLine= globs or regexes (in order to match foobar={no,0,off})
 
 * Support SO_REUSEPORT with socket activation:
-  - Let systemd maintain a pool of servers.
   - Use for seamless upgrades, by running the new server before stopping the
     old.
 
diff --git a/man/systemd.socket.xml b/man/systemd.socket.xml
index 7c10c58..4a2189b 100644
--- a/man/systemd.socket.xml
+++ b/man/systemd.socket.xml
@@ -519,6 +519,17 @@
                         </varlistentry>
 
                         <varlistentry>
+                                <term><varname>Distribute=</varname></term>
+                                <listitem><para>Takes an integer
+                                value. Systemd will spawn
+                                given number of instances of service each
+                                listening to the same socket. Default is 0.
+                                Setting this requires corresponding service to
+                                be an instansiated service (name ends with <literal>@.service</literal>).
+                                This option implies <varname>Reuseport=</varname> above.</para></listitem>
+                        </varlistentry>
+
+                        <varlistentry>
                                 <term><varname>SmackLabel=</varname></term>
                                 <term><varname>SmackLabelIPIn=</varname></term>
                                 <term><varname>SmackLabelIPOut=</varname></term>
diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c
index 60a8d05..4644007 100644
--- a/src/core/dbus-socket.c
+++ b/src/core/dbus-socket.c
@@ -68,6 +68,7 @@
         "  <property name=\"Listen\" type=\"a(ss)\" access=\"read\"/>\n"    \
         "  <property name=\"Result\" type=\"s\" access=\"read\"/>\n"    \
         "  <property name=\"ReusePort\" type=\"b\" access=\"read\"/>\n" \
+        "  <property name=\"Distribute\" type=\"u\" access=\"read\"/>\n" \
         "  <property name=\"SmackLabel\" type=\"s\" access=\"read\"/>\n" \
         "  <property name=\"SmackLabelIPIn\" type=\"s\" access=\"read\"/>\n" \
         "  <property name=\"SmackLabelIPOut\" type=\"s\" access=\"read\"/>\n" \
@@ -196,6 +197,7 @@ static const BusProperty bus_socket_properties[] = {
         { "MessageQueueMessageSize", bus_property_append_long, "x", offsetof(Socket, mq_msgsize)      },
         { "Result",         bus_socket_append_socket_result,   "s", offsetof(Socket, result)          },
         { "ReusePort",      bus_property_append_bool,          "b", offsetof(Socket, reuseport)       },
+        { "Distribute",     bus_property_append_unsigned,      "u", offsetof(Socket, distribute)       },
         { "SmackLabel",     bus_property_append_string,        "s", offsetof(Socket, smack),          true },
         { "SmackLabelIPIn", bus_property_append_string,        "s", offsetof(Socket, smack_ip_in),    true },
         { "SmackLabelIPOut",bus_property_append_string,        "s", offsetof(Socket, smack_ip_out),   true },
diff --git a/src/core/load-fragment-gperf.gperf.m4 b/src/core/load-fragment-gperf.gperf.m4
index b64fdc9..4058a1f 100644
--- a/src/core/load-fragment-gperf.gperf.m4
+++ b/src/core/load-fragment-gperf.gperf.m4
@@ -211,6 +211,7 @@ Socket.PassCredentials,          config_parse_bool,                  0,
 Socket.PassSecurity,             config_parse_bool,                  0,                             offsetof(Socket, pass_sec)
 Socket.TCPCongestion,            config_parse_string,                0,                             offsetof(Socket, tcp_congestion)
 Socket.ReusePort,                config_parse_bool,                  0,                             offsetof(Socket, reuseport)
+Socket.Distribute,               config_parse_unsigned,              0,                             offsetof(Socket, distribute)
 Socket.MessageQueueMaxMessages,  config_parse_long,                  0,                             offsetof(Socket, mq_maxmsg)
 Socket.MessageQueueMessageSize,  config_parse_long,                  0,                             offsetof(Socket, mq_msgsize)
 Socket.Service,                  config_parse_socket_service,        0,                             0
diff --git a/src/core/service.c b/src/core/service.c
index c0ee114..9c8cf9c 100644
--- a/src/core/service.c
+++ b/src/core/service.c
@@ -3679,7 +3679,6 @@ static void service_bus_query_pid_done(
 int service_set_socket_fd(Service *s, int fd, Socket *sock) {
 
         assert(s);
-        assert(fd >= 0);
 
         /* This is called by the socket code when instantiating a new
          * service for a stream socket and the socket needs to be
@@ -3694,8 +3693,10 @@ int service_set_socket_fd(Service *s, int fd, Socket *sock) {
         if (s->state != SERVICE_DEAD)
                 return -EAGAIN;
 
-        s->socket_fd = fd;
-        s->got_socket_fd = true;
+        if (fd >= 0) {
+                s->socket_fd = fd;
+                s->got_socket_fd = true;
+        }
 
         unit_ref_set(&s->accept_socket, UNIT(sock));
 
diff --git a/src/core/service.h b/src/core/service.h
index 37fa6ff..2ffe7d1 100644
--- a/src/core/service.h
+++ b/src/core/service.h
@@ -26,7 +26,6 @@ typedef struct Service Service;
 #include "unit.h"
 #include "path.h"
 #include "ratelimit.h"
-#include "service.h"
 #include "kill.h"
 #include "exit-status.h"
 
diff --git a/src/core/socket.c b/src/core/socket.c
index 751f20b..4d4627a 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -153,34 +153,30 @@ static void socket_done(Unit *u) {
 }
 
 static int socket_instantiate_service(Socket *s) {
-        char *prefix, *name;
+        _cleanup_free_ char *prefix = NULL, *name = NULL;
         int r;
         Unit *u;
 
         assert(s);
 
         /* This fills in s->service if it isn't filled in yet. For
-         * Accept=yes sockets we create the next connection service
-         * here. For Accept=no this is mostly a NOP since the service
+         * Accept=yes and Distribute=n sockets we create the next connection
+         * service here. Otherwise is mostly a NOP since the service
          * is figured out at load time anyway. */
 
-        if (UNIT_DEREF(s->service))
+        if (UNIT_DEREF(s->service) && !(s->distribute))
                 return 0;
 
-        assert(s->accept);
+        assert(s->accept || s->distribute);
 
         if (!(prefix = unit_name_to_prefix(UNIT(s)->id)))
                 return -ENOMEM;
 
         r = asprintf(&name, "%s@%u.service", prefix, s->n_accepted);
-        free(prefix);
-
         if (r < 0)
                 return -ENOMEM;
 
         r = manager_load_unit(UNIT(s)->manager, name, NULL, NULL, &u);
-        free(name);
-
         if (r < 0)
                 return r;
 
@@ -513,6 +509,11 @@ static void socket_dump(Unit *u, FILE *f, const char *prefix) {
                         "%sReusePort: %s\n",
                          prefix, yes_no(s->reuseport));
 
+        if (s->distribute)
+                fprintf(f,
+                        "%sDistribute: %d\n",
+                         prefix, s->distribute);
+
         if (s->smack)
                 fprintf(f,
                         "%sSmackLabel: %s\n",
@@ -577,9 +578,13 @@ static int instance_from_socket(int fd, unsigned nr, char **instance) {
                 struct sockaddr_storage storage;
         } local, remote;
 
-        assert(fd >= 0);
         assert(instance);
 
+        if (fd < 0) {
+                asprintf(&r, "%u", nr);
+                goto shortcut;
+        }
+
         l = sizeof(local);
         if (getsockname(fd, &local.sa, &l) < 0)
                 return -errno;
@@ -663,6 +668,7 @@ static int instance_from_socket(int fd, unsigned nr, char **instance) {
                 assert_not_reached("Unhandled socket type.");
         }
 
+shortcut:
         *instance = r;
         return 0;
 }
@@ -769,8 +775,8 @@ static void socket_apply_socket_options(Socket *s, int fd) {
                 if (setsockopt(fd, SOL_TCP, TCP_CONGESTION, s->tcp_congestion, strlen(s->tcp_congestion)+1) < 0)
                         log_warning_unit(UNIT(s)->id, "TCP_CONGESTION failed: %m");
 
-        if (s->reuseport) {
-                int b = s->reuseport;
+        if (s->reuseport || s->distribute) {
+                int b = true;
                 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &b, sizeof(b)) < 0)
                         log_warning_unit(UNIT(s)->id, "SO_REUSEPORT failed: %m");
         }
@@ -1454,7 +1460,7 @@ static void socket_enter_running(Socket *s, int cfd) {
                 return;
         }
 
-        if (cfd < 0) {
+        if (cfd < 0 && !(s->distribute)) {
                 Iterator i;
                 Unit *u;
                 bool pending = false;
@@ -1478,7 +1484,7 @@ static void socket_enter_running(Socket *s, int cfd) {
                 _cleanup_free_ char *prefix = NULL, *instance = NULL, *name = NULL;
                 Service *service;
 
-                if (s->n_connections >= s->max_connections) {
+                if (s->n_connections >= s->max_connections && !(s->distribute)) {
                         log_warning_unit(UNIT(s)->id,
                                          "%s: Too many incoming connections (%u)",
                                          UNIT(s)->id, s->n_connections);
@@ -1486,56 +1492,60 @@ static void socket_enter_running(Socket *s, int cfd) {
                         return;
                 }
 
-                r = socket_instantiate_service(s);
-                if (r < 0)
-                        goto fail;
-
-                r = instance_from_socket(cfd, s->n_accepted, &instance);
-                if (r < 0) {
-                        if (r != -ENOTCONN)
-                                goto fail;
-
-                        /* ENOTCONN is legitimate if TCP RST was received.
-                         * This connection is over, but the socket unit lives on. */
-                        close_nointr_nofail(cfd);
-                        return;
-                }
-
                 prefix = unit_name_to_prefix(UNIT(s)->id);
                 if (!prefix) {
                         r = -ENOMEM;
                         goto fail;
                 }
 
-                name = unit_name_build(prefix, instance, ".service");
+                do {
+                        r = socket_instantiate_service(s);
+                        if (r < 0)
+                                goto fail;
 
-                if (!name) {
-                        r = -ENOMEM;
-                        goto fail;
-                }
+                        r = instance_from_socket(cfd, s->n_accepted, &instance);
+                        if (r < 0) {
+                                if (r != -ENOTCONN)
+                                        goto fail;
 
-                r = unit_add_name(UNIT_DEREF(s->service), name);
-                if (r < 0)
-                        goto fail;
+                                /* ENOTCONN is legitimate if TCP RST was received.
+                                 * This connection is over, but the socket unit lives on. */
+                                close_nointr_nofail(cfd);
+                                return;
+                        }
 
-                service = SERVICE(UNIT_DEREF(s->service));
-                unit_ref_unset(&s->service);
-                s->n_accepted ++;
+                        name = unit_name_build(prefix, instance, ".service");
+                        if (!name) {
+                                r = -ENOMEM;
+                                goto fail;
+                        }
 
-                UNIT(service)->no_gc = false;
+                        r = unit_add_name(UNIT_DEREF(s->service), name);
+                        if (r < 0)
+                                goto fail;
 
-                unit_choose_id(UNIT(service), name);
+                        service = SERVICE(UNIT_DEREF(s->service));
+                        unit_ref_unset(&s->service);
+                        s->n_accepted ++;
 
-                r = service_set_socket_fd(service, cfd, s);
-                if (r < 0)
-                        goto fail;
+                        UNIT(service)->no_gc = false;
 
-                cfd = -1;
-                s->n_connections ++;
+                        unit_choose_id(UNIT(service), name);
 
-                r = manager_add_job(UNIT(s)->manager, JOB_START, UNIT(service), JOB_REPLACE, true, &error, NULL);
-                if (r < 0)
-                        goto fail;
+                        r = service_set_socket_fd(service, cfd, s);
+                        if (r < 0)
+                                goto fail;
+
+                        cfd = -1;
+                        s->n_connections ++;
+
+                        r = manager_add_job(UNIT(s)->manager, JOB_START, UNIT(service), JOB_REPLACE, true, &error, NULL);
+                        if (r < 0)
+                                goto fail;
+
+                        if (s->n_connections < s->distribute)
+                                socket_enter_listening(s);
+                } while (s->n_connections < s->distribute);
 
                 /* Notify clients about changed counters */
                 unit_add_to_dbus_queue(UNIT(s));
@@ -2263,14 +2273,19 @@ void socket_connection_unref(Socket *s) {
 
         /* The service is dead. Yay!
          *
-         * This is strictly for one-instance-per-connection
-         * services. */
+         * This is for one-instance-per-connection
+         * and Distribute= services */
 
         assert(s->n_connections > 0);
         s->n_connections--;
 
         log_debug_unit(UNIT(s)->id,
                        "%s: One connection closed, %u left.", UNIT(s)->id, s->n_connections);
+
+        if(s->n_connections < s->distribute && s->state == SOCKET_RUNNING)
+                /* (re)enter systemd into SO_REUSEPORT pool, when it gets a
+                 * connection it will reestablish distribute target */
+                socket_enter_listening(s);
 }
 
 static void socket_reset_failed(Unit *u) {
diff --git a/src/core/socket.h b/src/core/socket.h
index 3d7eadc..5928356 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -93,6 +93,8 @@ struct Socket {
         LIST_HEAD(SocketPort, ports);
 
         unsigned n_accepted;
+        /* when Accept=true this is the number of active connectoins
+         * when Distribute=n this is the number of active workers */
         unsigned n_connections;
         unsigned max_connections;
 
@@ -145,6 +147,8 @@ struct Socket {
         char *bind_to_device;
         char *tcp_congestion;
         bool reuseport;
+        /* implies reuseport */
+        unsigned distribute;
         long mq_maxmsg;
         long mq_msgsize;
 
-- 
1.8.4.3



More information about the systemd-devel mailing list