[systemd-devel] [PATCH 3/3] core: support Distribute=n to distribute to n SO_REUSEPORT workers

Shawn Landden shawn at churchofgit.com
Thu Nov 14 07:50:44 PST 2013


---
 TODO                                  |  3 +-
 src/core/dbus-socket.c                |  2 ++
 src/core/load-fragment-gperf.gperf.m4 |  1 +
 src/core/service.c                    |  2 +-
 src/core/service.h                    | 13 +++++++-
 src/core/socket.c                     | 63 +++++++++++++++++++++++++++++++++++
 src/core/socket.h                     |  2 ++
 7 files changed, 82 insertions(+), 4 deletions(-)

diff --git a/TODO b/TODO
index efc7e2a..0db4dc6 100644
--- a/TODO
+++ b/TODO
@@ -80,7 +80,7 @@ Features:
 
 * rfkill,backlight: we probably should run the load tools inside of the udev rules so that the state is properly initialized by the time other software sees it
 
-* Add a new Distribute=$NUMBER key to socket units that makes use of SO_REUSEPORT to distribute network traffic on $NUMBER instances
+* respawn Distribute= worker threads when they die unexpectedly
 
 * tmpfiles: when applying ownership to /run/log/journal, also do this for the journal fails contained in it
 
@@ -259,7 +259,6 @@ Features:
 * teach ConditionKernelCommandLine= globs or regexes (in order to match foobar={no,0,off})
 
 * Support SO_REUSEPORT with socket activation:
-  - Let systemd maintain a pool of servers.
   - Use for seamless upgrades, by running the new server before stopping the
     old.
 
diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c
index 60a8d05..4644007 100644
--- a/src/core/dbus-socket.c
+++ b/src/core/dbus-socket.c
@@ -68,6 +68,7 @@
         "  <property name=\"Listen\" type=\"a(ss)\" access=\"read\"/>\n"    \
         "  <property name=\"Result\" type=\"s\" access=\"read\"/>\n"    \
         "  <property name=\"ReusePort\" type=\"b\" access=\"read\"/>\n" \
+        "  <property name=\"Distribute\" type=\"u\" access=\"read\"/>\n" \
         "  <property name=\"SmackLabel\" type=\"s\" access=\"read\"/>\n" \
         "  <property name=\"SmackLabelIPIn\" type=\"s\" access=\"read\"/>\n" \
         "  <property name=\"SmackLabelIPOut\" type=\"s\" access=\"read\"/>\n" \
@@ -196,6 +197,7 @@ static const BusProperty bus_socket_properties[] = {
         { "MessageQueueMessageSize", bus_property_append_long, "x", offsetof(Socket, mq_msgsize)      },
         { "Result",         bus_socket_append_socket_result,   "s", offsetof(Socket, result)          },
         { "ReusePort",      bus_property_append_bool,          "b", offsetof(Socket, reuseport)       },
+        { "Distribute",     bus_property_append_unsigned,      "u", offsetof(Socket, distribute)       },
         { "SmackLabel",     bus_property_append_string,        "s", offsetof(Socket, smack),          true },
         { "SmackLabelIPIn", bus_property_append_string,        "s", offsetof(Socket, smack_ip_in),    true },
         { "SmackLabelIPOut",bus_property_append_string,        "s", offsetof(Socket, smack_ip_out),   true },
diff --git a/src/core/load-fragment-gperf.gperf.m4 b/src/core/load-fragment-gperf.gperf.m4
index b64fdc9..4058a1f 100644
--- a/src/core/load-fragment-gperf.gperf.m4
+++ b/src/core/load-fragment-gperf.gperf.m4
@@ -211,6 +211,7 @@ Socket.PassCredentials,          config_parse_bool,                  0,
 Socket.PassSecurity,             config_parse_bool,                  0,                             offsetof(Socket, pass_sec)
 Socket.TCPCongestion,            config_parse_string,                0,                             offsetof(Socket, tcp_congestion)
 Socket.ReusePort,                config_parse_bool,                  0,                             offsetof(Socket, reuseport)
+Socket.Distribute,               config_parse_unsigned,              0,                             offsetof(Socket, distribute)
 Socket.MessageQueueMaxMessages,  config_parse_long,                  0,                             offsetof(Socket, mq_maxmsg)
 Socket.MessageQueueMessageSize,  config_parse_long,                  0,                             offsetof(Socket, mq_msgsize)
 Socket.Service,                  config_parse_socket_service,        0,                             0
diff --git a/src/core/service.c b/src/core/service.c
index 3da32a1..cc337cf 100644
--- a/src/core/service.c
+++ b/src/core/service.c
@@ -1668,7 +1668,7 @@ fail:
         return r;
 }
 
-static int service_spawn(
+int service_spawn(
                 Service *s,
                 ExecCommand *c,
                 bool timeout,
diff --git a/src/core/service.h b/src/core/service.h
index 37fa6ff..95aa707 100644
--- a/src/core/service.h
+++ b/src/core/service.h
@@ -26,7 +26,6 @@ typedef struct Service Service;
 #include "unit.h"
 #include "path.h"
 #include "ratelimit.h"
-#include "service.h"
 #include "kill.h"
 #include "exit-status.h"
 
@@ -201,6 +200,18 @@ extern const UnitVTable service_vtable;
 
 struct Socket;
 
+int service_spawn(
+                Service *s,
+                ExecCommand *c,
+                bool timeout,
+                bool pass_fds,
+                bool apply_permissions,
+                bool apply_chroot,
+                bool apply_tty_stdin,
+                bool set_notify_socket,
+                bool is_control,
+                pid_t *_pid);
+
 int service_set_socket_fd(Service *s, int fd, struct Socket *socket);
 
 const char* service_state_to_string(ServiceState i) _const_;
diff --git a/src/core/socket.c b/src/core/socket.c
index 751f20b..9ada14d 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -1422,6 +1422,65 @@ fail:
         socket_enter_dead(s, SOCKET_FAILURE_RESOURCES);
 }
 
+static void socket_distribute(Socket *socket) {
+        int r;
+        Service *s;
+
+        assert(socket);
+
+        s = SERVICE(UNIT_DEREF(socket->service));
+
+        assert(s);
+        assert(s->exec_command[SERVICE_EXEC_START]);
+        assert(!s->exec_command[SERVICE_EXEC_START]->command_next);
+
+        if (socket->distribute <= 1)
+                return;
+
+        if (!s->type == SERVICE_SIMPLE) {
+                log_warning_unit(UNIT(s)->id,
+                                         "%s is not Type=simple required for Distribute. Not distributing.",
+                                         UNIT(s)->id);
+                return;
+        }
+
+        /* distribute implies reuseport */
+        socket->reuseport = true;
+
+        /* the first worker has already been started */
+        for (unsigned i=1;i < socket->distribute;i++) {
+                ExecCommand *c;
+                pid_t pid;
+
+                r = socket_open_fds(socket);
+                if (r < 0) {
+                        log_warning_unit(UNIT(socket)->id,
+                                         "%s failed to open socket(s) to distribute. Ignoring: %s",
+                                         UNIT(socket)->id, strerror(-r));
+                        return;
+                }
+
+                c = s->main_command = s->exec_command[SERVICE_EXEC_START];
+
+                r = service_spawn(s,
+                                  c,
+                                  NULL,
+                                  true,
+                                  true,
+                                  true,
+                                  true,
+                                  s->notify_access != NOTIFY_NONE,
+                                  false,
+                                  &pid);
+                if (r < 0) {
+                        log_warning_unit(UNIT(socket)->id,
+                                         "%s failed to distribute. Ignoring: %s",
+                                         UNIT(socket)->id, strerror(-r));
+                        return;
+                }
+        }
+}
+
 static void socket_enter_running(Socket *s, int cfd) {
         int r;
         DBusError error;
@@ -1471,6 +1530,8 @@ static void socket_enter_running(Socket *s, int cfd) {
                         r = manager_add_job(UNIT(s)->manager, JOB_START, UNIT_DEREF(s->service), JOB_REPLACE, true, &error, NULL);
                         if (r < 0)
                                 goto fail;
+
+                        socket_distribute(s);
                 }
 
                 socket_set_state(s, SOCKET_RUNNING);
@@ -1537,6 +1598,8 @@ static void socket_enter_running(Socket *s, int cfd) {
                 if (r < 0)
                         goto fail;
 
+                socket_distribute(s);
+
                 /* Notify clients about changed counters */
                 unit_add_to_dbus_queue(UNIT(s));
         }
diff --git a/src/core/socket.h b/src/core/socket.h
index 3d7eadc..b307822 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -145,6 +145,8 @@ struct Socket {
         char *bind_to_device;
         char *tcp_congestion;
         bool reuseport;
+        /* implies reuseport */
+        unsigned distribute;
         long mq_maxmsg;
         long mq_msgsize;
 
-- 
1.8.4.3



More information about the systemd-devel mailing list