[systemd-devel] [PATCH] core: support Distribute=n to distribute to n workers

Shawn Landden shawn at churchofgit.com
Fri Dec 13 12:32:54 PST 2013


If Distribute=n, turns SO_REUSEPORT on, and spawns
n workers to handling incoming requests.

SO_REUSEPORT sockets on the same port must all be created
by the same uid, therefore using the option allows
other root programs (or programs of the same user
if running in --user mode) to "hijack" this port, even
after systemd reserves it.

This patch is currently not ready for merging because if
the service never accept()s
the socket passed to it, we stay in EPOLLIN, looping
infinitely. To prevent this we need to do one of:

 *) Call accept() and pass an additional fd to the service.
 *) Use EPOLLET: requires event to always dispatched when it comes in.
 *) Disable and then reenable the event source (requiring
     that we re-create the socket) every time we
     enqueue an instance, essentially emulating the behavior
     of EPOLLET.
---
 TODO                                  |  3 +-
 man/systemd.socket.xml                | 15 +++++++-
 src/core/dbus-socket.c                |  4 +-
 src/core/load-fragment-gperf.gperf.m4 |  3 +-
 src/core/service.c                    |  4 +-
 src/core/socket.c                     | 70 +++++++++++++++++++++++++----------
 src/core/socket.h                     |  5 ++-
 src/shared/conf-parser.c              | 32 ++++++++++++++++
 src/shared/conf-parser.h              |  1 +
 9 files changed, 108 insertions(+), 29 deletions(-)

diff --git a/TODO b/TODO
index dad55c4..fbc2609 100644
--- a/TODO
+++ b/TODO
@@ -73,7 +73,7 @@ Features:
 
 * rfkill,backlight: we probably should run the load tools inside of the udev rules so that the state is properly initialized by the time other software sees it
 
-* Add a new Distribute=$NUMBER key to socket units that makes use of SO_REUSEPORT to distribute network traffic on $NUMBER instances
+* tmpfiles: when applying ownership to /run/log/journal, also do this for the journal fails contained in it
 
 * move config_parse_path_strv() out of conf-parser.c
 
@@ -187,7 +187,6 @@ Features:
 * teach ConditionKernelCommandLine= globs or regexes (in order to match foobar={no,0,off})
 
 * Support SO_REUSEPORT with socket activation:
-  - Let systemd maintain a pool of servers.
   - Use for seamless upgrades, by running the new server before stopping the
     old.
 
diff --git a/man/systemd.socket.xml b/man/systemd.socket.xml
index 7c10c58..6799020 100644
--- a/man/systemd.socket.xml
+++ b/man/systemd.socket.xml
@@ -404,7 +404,8 @@
                                 designed for usage with
                                 <citerefentry><refentrytitle>inetd</refentrytitle><manvolnum>8</manvolnum></citerefentry>
                                 to work unmodified with systemd socket
-                                activation.</para></listitem>
+                                activation. Incompatible with
+                                <varname>Distribute=</varname></para></listitem>
                         </varlistentry>
 
                         <varlistentry>
@@ -519,6 +520,18 @@
                         </varlistentry>
 
                         <varlistentry>
+                                <term><varname>Distribute=</varname></term>
+                                <listitem><para>Takes an integer
+                                value. Systemd will spawn up to
+                                given number of instances of service each
+                                listening to the same socket. Default is 0.
+                                Setting this requires corresponding service to
+                                be an instansiated service (name ends with <literal>@.service</literal>).
+                                Useful with <varname>ReusePort=</varname> above.
+                                Incompatible with <varname>Accept=true</varname>.</para></listitem>
+                        </varlistentry>
+
+                        <varlistentry>
                                 <term><varname>SmackLabel=</varname></term>
                                 <term><varname>SmackLabelIPIn=</varname></term>
                                 <term><varname>SmackLabelIPOut=</varname></term>
diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c
index 74217df..036c9af 100644
--- a/src/core/dbus-socket.c
+++ b/src/core/dbus-socket.c
@@ -40,7 +40,6 @@ static int property_get_listen(
                 void *userdata,
                 sd_bus_error *error) {
 
-
         Socket *s = SOCKET(userdata);
         SocketPort *p;
         int r;
@@ -115,7 +114,8 @@ const sd_bus_vtable bus_socket_vtable[] = {
         SD_BUS_PROPERTY("MessageQueueMaxMessages", "x", bus_property_get_long, offsetof(Socket, mq_maxmsg), 0),
         SD_BUS_PROPERTY("MessageQueueMessageSize", "x", bus_property_get_long, offsetof(Socket, mq_msgsize), 0),
         SD_BUS_PROPERTY("Result", "s", property_get_result, offsetof(Socket, result), SD_BUS_VTABLE_PROPERTY_EMITS_CHANGE),
-        SD_BUS_PROPERTY("ReusePort", "b",  bus_property_get_bool, offsetof(Socket, reuse_port), 0),
+        SD_BUS_PROPERTY("ReusePort", "b",  bus_property_get_tristate, offsetof(Socket, reuse_port), 0),
+        SD_BUS_PROPERTY("Distribute", "u",  bus_property_get_unsigned, offsetof(Socket, distribute), 0),
         SD_BUS_PROPERTY("SmackLabel", "s", NULL, offsetof(Socket, smack), 0),
         SD_BUS_PROPERTY("SmackLabelIPIn", "s", NULL, offsetof(Socket, smack_ip_in), 0),
         SD_BUS_PROPERTY("SmackLabelIPOut", "s", NULL, offsetof(Socket, smack_ip_out), 0),
diff --git a/src/core/load-fragment-gperf.gperf.m4 b/src/core/load-fragment-gperf.gperf.m4
index a5033b2..70f15bd 100644
--- a/src/core/load-fragment-gperf.gperf.m4
+++ b/src/core/load-fragment-gperf.gperf.m4
@@ -212,7 +212,8 @@ Socket.Broadcast,                config_parse_bool,                  0,
 Socket.PassCredentials,          config_parse_bool,                  0,                             offsetof(Socket, pass_cred)
 Socket.PassSecurity,             config_parse_bool,                  0,                             offsetof(Socket, pass_sec)
 Socket.TCPCongestion,            config_parse_string,                0,                             offsetof(Socket, tcp_congestion)
-Socket.ReusePort,                config_parse_bool,                  0,                             offsetof(Socket, reuse_port)
+Socket.ReusePort,                config_parse_tristate,              0,                             offsetof(Socket, reuse_port)
+Socket.Distribute,               config_parse_unsigned,              0,                             offsetof(Socket, distribute)
 Socket.MessageQueueMaxMessages,  config_parse_long,                  0,                             offsetof(Socket, mq_maxmsg)
 Socket.MessageQueueMessageSize,  config_parse_long,                  0,                             offsetof(Socket, mq_msgsize)
 Socket.Service,                  config_parse_socket_service,        0,                             0
diff --git a/src/core/service.c b/src/core/service.c
index 67d2008..2a5ed90 100644
--- a/src/core/service.c
+++ b/src/core/service.c
@@ -3659,7 +3659,6 @@ static void service_bus_name_owner_change(
 int service_set_socket_fd(Service *s, int fd, Socket *sock) {
 
         assert(s);
-        assert(fd >= 0);
 
         /* This is called by the socket code when instantiating a new
          * service for a stream socket and the socket needs to be
@@ -3674,7 +3673,8 @@ int service_set_socket_fd(Service *s, int fd, Socket *sock) {
         if (s->state != SERVICE_DEAD)
                 return -EAGAIN;
 
-        s->socket_fd = fd;
+        if (fd > 0)
+                s->socket_fd = fd;
 
         unit_ref_set(&s->accept_socket, UNIT(sock));
 
diff --git a/src/core/socket.c b/src/core/socket.c
index aaaa8d6..5943a28 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -179,34 +179,30 @@ static int socket_arm_timer(Socket *s) {
 }
 
 static int socket_instantiate_service(Socket *s) {
-        char *prefix, *name;
+        _cleanup_free_ char *prefix = NULL, *name = NULL;
         int r;
         Unit *u;
 
         assert(s);
 
         /* This fills in s->service if it isn't filled in yet. For
-         * Accept=yes sockets we create the next connection service
-         * here. For Accept=no this is mostly a NOP since the service
+         * Accept=yes and Distribute=n sockets we create the next connection
+         * service here. Otherwise is mostly a NOP since the service
          * is figured out at load time anyway. */
 
-        if (UNIT_DEREF(s->service))
+        if (UNIT_DEREF(s->service) && s->distribute == 0)
                 return 0;
 
-        assert(s->accept);
+        assert(s->accept || s->distribute > 0);
 
         if (!(prefix = unit_name_to_prefix(UNIT(s)->id)))
                 return -ENOMEM;
 
         r = asprintf(&name, "%s@%u.service", prefix, s->n_accepted);
-        free(prefix);
-
         if (r < 0)
                 return -ENOMEM;
 
         r = manager_load_unit(UNIT(s)->manager, name, NULL, NULL, &u);
-        free(name);
-
         if (r < 0)
                 return r;
 
@@ -228,7 +224,7 @@ static bool have_non_accept_socket(Socket *s) {
 
         assert(s);
 
-        if (!s->accept)
+        if (!(s->accept || s->distribute > 0))
                 return true;
 
         LIST_FOREACH(port, p, s->ports) {
@@ -408,6 +404,9 @@ static int socket_load(Unit *u) {
         if (r < 0)
                 return r;
 
+        if (s->reuse_port < 0)
+                s->reuse_port = s->distribute > 0;
+
         if (u->load_state == UNIT_LOADED) {
                 /* This is a new unit? Then let's add in some extras */
                 r = socket_add_extras(s);
@@ -483,15 +482,23 @@ static void socket_dump(Unit *u, FILE *f, const char *prefix) {
                         "%sBindToDevice: %s\n",
                         prefix, s->bind_to_device);
 
-        if (s->accept)
+        if (s->accept || s->distribute > 0)
                 fprintf(f,
                         "%sAccepted: %u\n"
-                        "%sNConnections: %u\n"
-                        "%sMaxConnections: %u\n",
+                        "%sNConnections: %u\n",
                         prefix, s->n_accepted,
-                        prefix, s->n_connections,
+                        prefix, s->n_connections);
+
+        if (s->accept)
+                fprintf(f,
+                        "%sMaxConnections: %u\n",
                         prefix, s->max_connections);
 
+        if (s->distribute > 0)
+                fprintf(f,
+                        "%sDistribute: %u\n",
+                        prefix, s->distribute);
+
         if (s->priority >= 0)
                 fprintf(f,
                         "%sPriority: %i\n",
@@ -604,9 +611,14 @@ static int instance_from_socket(int fd, unsigned nr, char **instance) {
                 struct sockaddr_storage storage;
         } local, remote;
 
-        assert(fd >= 0);
         assert(instance);
 
+        if (fd < 0) {
+                if (asprintf(instance, "%u", nr) < 0)
+                        return -ENOMEM;
+                return 0;
+        }
+
         l = sizeof(local);
         if (getsockname(fd, &local.sa, &l) < 0)
                 return -errno;
@@ -799,7 +811,7 @@ static void socket_apply_socket_options(Socket *s, int fd) {
                         log_warning_unit(UNIT(s)->id, "TCP_CONGESTION failed: %m");
 
         if (s->reuse_port) {
-                int b = s->reuse_port;
+                int b = (bool)s->reuse_port;
                 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &b, sizeof(b)) < 0)
                         log_warning_unit(UNIT(s)->id, "SO_REUSEPORT failed: %m");
         }
@@ -1485,7 +1497,7 @@ static void socket_enter_running(Socket *s, int cfd) {
                 return;
         }
 
-        if (cfd < 0) {
+        if (cfd < 0 && s->distribute == 0) {
                 Iterator i;
                 Unit *other;
                 bool pending = false;
@@ -1509,7 +1521,7 @@ static void socket_enter_running(Socket *s, int cfd) {
                 _cleanup_free_ char *prefix = NULL, *instance = NULL, *name = NULL;
                 Service *service;
 
-                if (s->n_connections >= s->max_connections) {
+                if (s->n_connections >= s->max_connections && s->distribute == 0) {
                         log_warning_unit(UNIT(s)->id, "%s: Too many incoming connections (%u)", UNIT(s)->id, s->n_connections);
                         close_nointr_nofail(cfd);
                         return;
@@ -1565,6 +1577,8 @@ static void socket_enter_running(Socket *s, int cfd) {
                 if (r < 0)
                         goto fail;
 
+                socket_set_state(s, SOCKET_RUNNING);
+
                 /* Notify clients about changed counters */
                 unit_add_to_dbus_queue(UNIT(s));
         }
@@ -1992,6 +2006,8 @@ static int socket_dispatch_io(sd_event_source *source, int fd, uint32_t revents,
         SocketPort *p = userdata;
         int cfd = -1;
 
+        assert(source);
+        assert(source->userdata == userdata);
         assert(p);
         assert(fd >= 0);
 
@@ -2286,13 +2302,17 @@ void socket_connection_unref(Socket *s) {
 
         /* The service is dead. Yay!
          *
-         * This is strictly for one-instance-per-connection
-         * services. */
+         * This is for one-instance-per-connection
+         * and Distribute= services */
 
         assert(s->n_connections > 0);
         s->n_connections--;
 
+
         log_debug_unit(UNIT(s)->id, "%s: One connection closed, %u left.", UNIT(s)->id, s->n_connections);
+
+        if (s->n_connections < s->distribute && s->state == SOCKET_RUNNING)
+                socket_enter_listening(s);
 }
 
 static void socket_trigger_notify(Unit *u, Unit *other) {
@@ -2315,6 +2335,16 @@ static void socket_trigger_notify(Unit *u, Unit *other) {
 
         se = SERVICE(other);
 
+        if (s->distribute > 0) {
+                if (se->state == SERVICE_RUNNING) {
+                        if (s->n_connections < s->distribute)
+                                socket_enter_listening(s);
+                        else
+                                socket_set_state(s, SOCKET_RUNNING);
+                }
+                return;
+        }
+
         if (se->state == SERVICE_FAILED)
                 socket_notify_service_dead(s, se->result == SERVICE_FAILURE_START_LIMIT);
 
diff --git a/src/core/socket.h b/src/core/socket.h
index 076a183..138ac34 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -95,6 +95,8 @@ struct Socket {
         LIST_HEAD(SocketPort, ports);
 
         unsigned n_accepted;
+        /* when Accept=true this is the number of active connectoins
+         * when Distribute=n this is the number of active workers */
         unsigned n_connections;
         unsigned max_connections;
 
@@ -147,7 +149,8 @@ struct Socket {
         size_t pipe_size;
         char *bind_to_device;
         char *tcp_congestion;
-        bool reuse_port;
+        int reuse_port;
+        unsigned distribute;
         long mq_maxmsg;
         long mq_msgsize;
 
diff --git a/src/shared/conf-parser.c b/src/shared/conf-parser.c
index 1e3cee5..e9654b3 100644
--- a/src/shared/conf-parser.c
+++ b/src/shared/conf-parser.c
@@ -498,6 +498,38 @@ int config_parse_bytes_off(const char* unit,
         return 0;
 }
 
+int config_parse_tristate(const char* unit,
+                          const char *filename,
+                          unsigned line,
+                          const char *section,
+                          unsigned section_line,
+                          const char *lvalue,
+                          int ltype,
+                          const char *rvalue,
+                          void *data,
+                          void *userdata) {
+
+        int k;
+        int *b = data;
+
+        assert(filename);
+        assert(lvalue);
+        assert(rvalue);
+        assert(data);
+
+        /* Tristates are like booleans, but can also take the 'default' value, i.e. "-1" */
+
+        k = parse_boolean(rvalue);
+        if (k < 0) {
+                log_syntax(unit, LOG_ERR, filename, line, -k,
+                           "Failed to parse boolean value, ignoring: %s", rvalue);
+                return 0;
+        }
+
+        *b = !!k;
+        return 0;
+}
+
 int config_parse_bool(const char* unit,
                       const char *filename,
                       unsigned line,
diff --git a/src/shared/conf-parser.h b/src/shared/conf-parser.h
index 2d5aa31..f1f9b27 100644
--- a/src/shared/conf-parser.h
+++ b/src/shared/conf-parser.h
@@ -100,6 +100,7 @@ int config_parse_double(const char *unit, const char *filename, unsigned line, c
 int config_parse_bytes_size(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
 int config_parse_bytes_off(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
 int config_parse_bool(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
+int config_parse_tristate(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
 int config_parse_string(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
 int config_parse_path(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
 int config_parse_strv(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
-- 
1.8.5.1



More information about the systemd-devel mailing list