[systemd-devel] [PATCH] core: support Distribute=n to distribute to n workers

Shawn Landden shawn at churchofgit.com
Fri Dec 13 20:23:25 PST 2013


If Distribute=n, turns SO_REUSEPORT on, and spawns
n workers to handling incoming requests.

SO_REUSEPORT sockets on the same port must all be created
by the same uid, therefore using the option allows
other root programs (or programs of the same user
if running in --user mode) to "hijack" this port, even
after systemd reserves it.

We spawn workers at a rate approximentally reverse
exponentially proportianal to the number of incoming connections.
Faster based on the time for new workers to start accept()ing
and their load, or slower if systemd is under load.
---
 TODO                                  |  3 +-
 man/systemd.socket.xml                | 15 +++++++-
 src/core/dbus-socket.c                |  4 +--
 src/core/load-fragment-gperf.gperf.m4 |  3 +-
 src/core/service.c                    |  4 +--
 src/core/socket.c                     | 68 ++++++++++++++++++++++-------------
 src/core/socket.h                     |  5 ++-
 src/shared/conf-parser.c              | 32 +++++++++++++++++
 src/shared/conf-parser.h              |  1 +
 9 files changed, 101 insertions(+), 34 deletions(-)

diff --git a/TODO b/TODO
index 0b43888..2abe1b4 100644
--- a/TODO
+++ b/TODO
@@ -73,7 +73,7 @@ Features:
 
 * rfkill,backlight: we probably should run the load tools inside of the udev rules so that the state is properly initialized by the time other software sees it
 
-* Add a new Distribute=$NUMBER key to socket units that makes use of SO_REUSEPORT to distribute network traffic on $NUMBER instances
+* tmpfiles: when applying ownership to /run/log/journal, also do this for the journal fails contained in it
 
 * move config_parse_path_strv() out of conf-parser.c
 
@@ -181,7 +181,6 @@ Features:
 * teach ConditionKernelCommandLine= globs or regexes (in order to match foobar={no,0,off})
 
 * Support SO_REUSEPORT with socket activation:
-  - Let systemd maintain a pool of servers.
   - Use for seamless upgrades, by running the new server before stopping the
     old.
 
diff --git a/man/systemd.socket.xml b/man/systemd.socket.xml
index 7c10c58..6799020 100644
--- a/man/systemd.socket.xml
+++ b/man/systemd.socket.xml
@@ -404,7 +404,8 @@
                                 designed for usage with
                                 <citerefentry><refentrytitle>inetd</refentrytitle><manvolnum>8</manvolnum></citerefentry>
                                 to work unmodified with systemd socket
-                                activation.</para></listitem>
+                                activation. Incompatible with
+                                <varname>Distribute=</varname></para></listitem>
                         </varlistentry>
 
                         <varlistentry>
@@ -519,6 +520,18 @@
                         </varlistentry>
 
                         <varlistentry>
+                                <term><varname>Distribute=</varname></term>
+                                <listitem><para>Takes an integer
+                                value. Systemd will spawn up to
+                                given number of instances of service each
+                                listening to the same socket. Default is 0.
+                                Setting this requires corresponding service to
+                                be an instansiated service (name ends with <literal>@.service</literal>).
+                                Useful with <varname>ReusePort=</varname> above.
+                                Incompatible with <varname>Accept=true</varname>.</para></listitem>
+                        </varlistentry>
+
+                        <varlistentry>
                                 <term><varname>SmackLabel=</varname></term>
                                 <term><varname>SmackLabelIPIn=</varname></term>
                                 <term><varname>SmackLabelIPOut=</varname></term>
diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c
index 74217df..036c9af 100644
--- a/src/core/dbus-socket.c
+++ b/src/core/dbus-socket.c
@@ -40,7 +40,6 @@ static int property_get_listen(
                 void *userdata,
                 sd_bus_error *error) {
 
-
         Socket *s = SOCKET(userdata);
         SocketPort *p;
         int r;
@@ -115,7 +114,8 @@ const sd_bus_vtable bus_socket_vtable[] = {
         SD_BUS_PROPERTY("MessageQueueMaxMessages", "x", bus_property_get_long, offsetof(Socket, mq_maxmsg), 0),
         SD_BUS_PROPERTY("MessageQueueMessageSize", "x", bus_property_get_long, offsetof(Socket, mq_msgsize), 0),
         SD_BUS_PROPERTY("Result", "s", property_get_result, offsetof(Socket, result), SD_BUS_VTABLE_PROPERTY_EMITS_CHANGE),
-        SD_BUS_PROPERTY("ReusePort", "b",  bus_property_get_bool, offsetof(Socket, reuse_port), 0),
+        SD_BUS_PROPERTY("ReusePort", "b",  bus_property_get_tristate, offsetof(Socket, reuse_port), 0),
+        SD_BUS_PROPERTY("Distribute", "u",  bus_property_get_unsigned, offsetof(Socket, distribute), 0),
         SD_BUS_PROPERTY("SmackLabel", "s", NULL, offsetof(Socket, smack), 0),
         SD_BUS_PROPERTY("SmackLabelIPIn", "s", NULL, offsetof(Socket, smack_ip_in), 0),
         SD_BUS_PROPERTY("SmackLabelIPOut", "s", NULL, offsetof(Socket, smack_ip_out), 0),
diff --git a/src/core/load-fragment-gperf.gperf.m4 b/src/core/load-fragment-gperf.gperf.m4
index a5033b2..70f15bd 100644
--- a/src/core/load-fragment-gperf.gperf.m4
+++ b/src/core/load-fragment-gperf.gperf.m4
@@ -212,7 +212,8 @@ Socket.Broadcast,                config_parse_bool,                  0,
 Socket.PassCredentials,          config_parse_bool,                  0,                             offsetof(Socket, pass_cred)
 Socket.PassSecurity,             config_parse_bool,                  0,                             offsetof(Socket, pass_sec)
 Socket.TCPCongestion,            config_parse_string,                0,                             offsetof(Socket, tcp_congestion)
-Socket.ReusePort,                config_parse_bool,                  0,                             offsetof(Socket, reuse_port)
+Socket.ReusePort,                config_parse_tristate,              0,                             offsetof(Socket, reuse_port)
+Socket.Distribute,               config_parse_unsigned,              0,                             offsetof(Socket, distribute)
 Socket.MessageQueueMaxMessages,  config_parse_long,                  0,                             offsetof(Socket, mq_maxmsg)
 Socket.MessageQueueMessageSize,  config_parse_long,                  0,                             offsetof(Socket, mq_msgsize)
 Socket.Service,                  config_parse_socket_service,        0,                             0
diff --git a/src/core/service.c b/src/core/service.c
index 3b3f956..3047e10 100644
--- a/src/core/service.c
+++ b/src/core/service.c
@@ -3668,7 +3668,6 @@ static void service_bus_name_owner_change(
 int service_set_socket_fd(Service *s, int fd, Socket *sock) {
 
         assert(s);
-        assert(fd >= 0);
 
         /* This is called by the socket code when instantiating a new
          * service for a stream socket and the socket needs to be
@@ -3683,7 +3682,8 @@ int service_set_socket_fd(Service *s, int fd, Socket *sock) {
         if (s->state != SERVICE_DEAD)
                 return -EAGAIN;
 
-        s->socket_fd = fd;
+        if (fd > 0)
+                s->socket_fd = fd;
 
         unit_ref_set(&s->accept_socket, UNIT(sock));
 
diff --git a/src/core/socket.c b/src/core/socket.c
index aaaa8d6..be2b681 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -179,34 +179,30 @@ static int socket_arm_timer(Socket *s) {
 }
 
 static int socket_instantiate_service(Socket *s) {
-        char *prefix, *name;
+        _cleanup_free_ char *prefix = NULL, *name = NULL;
         int r;
         Unit *u;
 
         assert(s);
 
         /* This fills in s->service if it isn't filled in yet. For
-         * Accept=yes sockets we create the next connection service
-         * here. For Accept=no this is mostly a NOP since the service
+         * Accept=yes and Distribute=n sockets we create the next connection
+         * service here. Otherwise is mostly a NOP since the service
          * is figured out at load time anyway. */
 
-        if (UNIT_DEREF(s->service))
+        if (UNIT_DEREF(s->service) && s->distribute == 0)
                 return 0;
 
-        assert(s->accept);
+        assert(s->accept || s->distribute > 0);
 
         if (!(prefix = unit_name_to_prefix(UNIT(s)->id)))
                 return -ENOMEM;
 
         r = asprintf(&name, "%s@%u.service", prefix, s->n_accepted);
-        free(prefix);
-
         if (r < 0)
                 return -ENOMEM;
 
         r = manager_load_unit(UNIT(s)->manager, name, NULL, NULL, &u);
-        free(name);
-
         if (r < 0)
                 return r;
 
@@ -228,7 +224,7 @@ static bool have_non_accept_socket(Socket *s) {
 
         assert(s);
 
-        if (!s->accept)
+        if (!(s->accept || s->distribute > 0))
                 return true;
 
         LIST_FOREACH(port, p, s->ports) {
@@ -408,6 +404,9 @@ static int socket_load(Unit *u) {
         if (r < 0)
                 return r;
 
+        if (s->reuse_port < 0)
+                s->reuse_port = s->distribute > 0;
+
         if (u->load_state == UNIT_LOADED) {
                 /* This is a new unit? Then let's add in some extras */
                 r = socket_add_extras(s);
@@ -483,15 +482,23 @@ static void socket_dump(Unit *u, FILE *f, const char *prefix) {
                         "%sBindToDevice: %s\n",
                         prefix, s->bind_to_device);
 
-        if (s->accept)
+        if (s->accept || s->distribute > 0)
                 fprintf(f,
                         "%sAccepted: %u\n"
-                        "%sNConnections: %u\n"
-                        "%sMaxConnections: %u\n",
+                        "%sNConnections: %u\n",
                         prefix, s->n_accepted,
-                        prefix, s->n_connections,
+                        prefix, s->n_connections);
+
+        if (s->accept)
+                fprintf(f,
+                        "%sMaxConnections: %u\n",
                         prefix, s->max_connections);
 
+        if (s->distribute > 0)
+                fprintf(f,
+                        "%sDistribute: %u\n",
+                        prefix, s->distribute);
+
         if (s->priority >= 0)
                 fprintf(f,
                         "%sPriority: %i\n",
@@ -604,9 +611,14 @@ static int instance_from_socket(int fd, unsigned nr, char **instance) {
                 struct sockaddr_storage storage;
         } local, remote;
 
-        assert(fd >= 0);
         assert(instance);
 
+        if (fd < 0) {
+                if (asprintf(instance, "%u", nr) < 0)
+                        return -ENOMEM;
+                return 0;
+        }
+
         l = sizeof(local);
         if (getsockname(fd, &local.sa, &l) < 0)
                 return -errno;
@@ -798,11 +810,9 @@ static void socket_apply_socket_options(Socket *s, int fd) {
                 if (setsockopt(fd, SOL_TCP, TCP_CONGESTION, s->tcp_congestion, strlen(s->tcp_congestion)+1) < 0)
                         log_warning_unit(UNIT(s)->id, "TCP_CONGESTION failed: %m");
 
-        if (s->reuse_port) {
-                int b = s->reuse_port;
-                if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &b, sizeof(b)) < 0)
+        if (s->reuse_port)
+                if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &s->reuse_port, sizeof(s->reuse_port)) < 0)
                         log_warning_unit(UNIT(s)->id, "SO_REUSEPORT failed: %m");
-        }
 
         if (s->smack_ip_in)
                 if (smack_label_ip_in_fd(fd, s->smack_ip_in) < 0)
@@ -1112,7 +1122,7 @@ static int socket_watch_fds(Socket *s) {
                 if (p->event_source)
                         r = sd_event_source_set_enabled(p->event_source, SD_EVENT_ON);
                 else
-                        r = sd_event_add_io(UNIT(s)->manager->event, p->fd, EPOLLIN, socket_dispatch_io, p, &p->event_source);
+                        r = sd_event_add_io(UNIT(s)->manager->event, p->fd, EPOLLIN | (s->distribute > 0 ? EPOLLET : 0), socket_dispatch_io, p, &p->event_source);
 
                 if (r < 0) {
                         log_warning_unit(UNIT(s)->id, "Failed to watch listening fds: %s", strerror(-r));
@@ -1485,7 +1495,7 @@ static void socket_enter_running(Socket *s, int cfd) {
                 return;
         }
 
-        if (cfd < 0) {
+        if (cfd < 0 && s->distribute == 0) {
                 Iterator i;
                 Unit *other;
                 bool pending = false;
@@ -1509,7 +1519,7 @@ static void socket_enter_running(Socket *s, int cfd) {
                 _cleanup_free_ char *prefix = NULL, *instance = NULL, *name = NULL;
                 Service *service;
 
-                if (s->n_connections >= s->max_connections) {
+                if (s->n_connections >= s->max_connections && s->distribute == 0) {
                         log_warning_unit(UNIT(s)->id, "%s: Too many incoming connections (%u)", UNIT(s)->id, s->n_connections);
                         close_nointr_nofail(cfd);
                         return;
@@ -1565,6 +1575,9 @@ static void socket_enter_running(Socket *s, int cfd) {
                 if (r < 0)
                         goto fail;
 
+                if (s->distribute > 0 && s->n_connections >= s->distribute)
+                        socket_set_state(s, SOCKET_RUNNING);
+
                 /* Notify clients about changed counters */
                 unit_add_to_dbus_queue(UNIT(s));
         }
@@ -2286,13 +2299,17 @@ void socket_connection_unref(Socket *s) {
 
         /* The service is dead. Yay!
          *
-         * This is strictly for one-instance-per-connection
-         * services. */
+         * This is for one-instance-per-connection
+         * and Distribute= services */
 
         assert(s->n_connections > 0);
         s->n_connections--;
 
+
         log_debug_unit(UNIT(s)->id, "%s: One connection closed, %u left.", UNIT(s)->id, s->n_connections);
+
+        if (s->n_connections < s->distribute && s->state == SOCKET_RUNNING)
+                socket_enter_listening(s);
 }
 
 static void socket_trigger_notify(Unit *u, Unit *other) {
@@ -2306,7 +2323,8 @@ static void socket_trigger_notify(Unit *u, Unit *other) {
            already down or accepting connections */
         if ((s->state !=  SOCKET_RUNNING &&
             s->state != SOCKET_LISTENING) ||
-            s->accept)
+            s->accept ||
+            s->distribute > 0)
                 return;
 
         if (other->load_state != UNIT_LOADED ||
diff --git a/src/core/socket.h b/src/core/socket.h
index 076a183..138ac34 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -95,6 +95,8 @@ struct Socket {
         LIST_HEAD(SocketPort, ports);
 
         unsigned n_accepted;
+        /* when Accept=true this is the number of active connectoins
+         * when Distribute=n this is the number of active workers */
         unsigned n_connections;
         unsigned max_connections;
 
@@ -147,7 +149,8 @@ struct Socket {
         size_t pipe_size;
         char *bind_to_device;
         char *tcp_congestion;
-        bool reuse_port;
+        int reuse_port;
+        unsigned distribute;
         long mq_maxmsg;
         long mq_msgsize;
 
diff --git a/src/shared/conf-parser.c b/src/shared/conf-parser.c
index 1e3cee5..e9654b3 100644
--- a/src/shared/conf-parser.c
+++ b/src/shared/conf-parser.c
@@ -498,6 +498,38 @@ int config_parse_bytes_off(const char* unit,
         return 0;
 }
 
+int config_parse_tristate(const char* unit,
+                          const char *filename,
+                          unsigned line,
+                          const char *section,
+                          unsigned section_line,
+                          const char *lvalue,
+                          int ltype,
+                          const char *rvalue,
+                          void *data,
+                          void *userdata) {
+
+        int k;
+        int *b = data;
+
+        assert(filename);
+        assert(lvalue);
+        assert(rvalue);
+        assert(data);
+
+        /* Tristates are like booleans, but can also take the 'default' value, i.e. "-1" */
+
+        k = parse_boolean(rvalue);
+        if (k < 0) {
+                log_syntax(unit, LOG_ERR, filename, line, -k,
+                           "Failed to parse boolean value, ignoring: %s", rvalue);
+                return 0;
+        }
+
+        *b = !!k;
+        return 0;
+}
+
 int config_parse_bool(const char* unit,
                       const char *filename,
                       unsigned line,
diff --git a/src/shared/conf-parser.h b/src/shared/conf-parser.h
index 2d5aa31..f1f9b27 100644
--- a/src/shared/conf-parser.h
+++ b/src/shared/conf-parser.h
@@ -100,6 +100,7 @@ int config_parse_double(const char *unit, const char *filename, unsigned line, c
 int config_parse_bytes_size(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
 int config_parse_bytes_off(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
 int config_parse_bool(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
+int config_parse_tristate(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
 int config_parse_string(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
 int config_parse_path(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
 int config_parse_strv(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
-- 
1.8.5.1



More information about the systemd-devel mailing list