[systemd-devel] [PATCH] core: support Distribute=n to distribute to n workers

Shawn Landden shawn at churchofgit.com
Tue Dec 10 18:53:33 PST 2013


Because it takes a while for the service to start up, and
until then we spin in a fast epoll loop, this tends to
start up all the instances all at once. There are a number
of ways we can slow this instanciation down:
 1) Call accept() and pass an additional fd to the service
 2) Use EPOLLET: requires event to be prioritized and always
      dispatched.
 3) Disable and then reenable the event source every time we
     enqueue an instance.

With Type=notify, we wait until a service tells us it is ready
before we listen again and thereby start up more instances.

What if someone want to use the templating namespace ('@')
with Distribute=?
---
 TODO                                  |  3 +-
 man/systemd.socket.xml                | 15 +++++++-
 src/core/dbus-socket.c                |  2 +-
 src/core/load-fragment-gperf.gperf.m4 |  3 +-
 src/core/service.c                    |  4 ++
 src/core/socket.c                     | 72 ++++++++++++++++++++++++-----------
 src/core/socket.h                     |  8 +++-
 7 files changed, 78 insertions(+), 29 deletions(-)

diff --git a/TODO b/TODO
index 2fb9cd3..697d568 100644
--- a/TODO
+++ b/TODO
@@ -69,7 +69,7 @@ Features:
 
 * rfkill,backlight: we probably should run the load tools inside of the udev rules so that the state is properly initialized by the time other software sees it
 
-* Add a new Distribute=$NUMBER key to socket units that makes use of SO_REUSEPORT to distribute network traffic on $NUMBER instances
+* tmpfiles: when applying ownership to /run/log/journal, also do this for the journal fails contained in it
 
 * we probably should replace the left-over uses of strv_append() and replace them by strv_push() or strv_extend()
 
@@ -187,7 +187,6 @@ Features:
 * teach ConditionKernelCommandLine= globs or regexes (in order to match foobar={no,0,off})
 
 * Support SO_REUSEPORT with socket activation:
-  - Let systemd maintain a pool of servers.
   - Use for seamless upgrades, by running the new server before stopping the
     old.
 
diff --git a/man/systemd.socket.xml b/man/systemd.socket.xml
index 7c10c58..6799020 100644
--- a/man/systemd.socket.xml
+++ b/man/systemd.socket.xml
@@ -404,7 +404,8 @@
                                 designed for usage with
                                 <citerefentry><refentrytitle>inetd</refentrytitle><manvolnum>8</manvolnum></citerefentry>
                                 to work unmodified with systemd socket
-                                activation.</para></listitem>
+                                activation. Incompatible with
+                                <varname>Distribute=</varname></para></listitem>
                         </varlistentry>
 
                         <varlistentry>
@@ -519,6 +520,18 @@
                         </varlistentry>
 
                         <varlistentry>
+                                <term><varname>Distribute=</varname></term>
+                                <listitem><para>Takes an integer
+                                value. Systemd will spawn up to
+                                given number of instances of service each
+                                listening to the same socket. Default is 0.
+                                Setting this requires corresponding service to
+                                be an instansiated service (name ends with <literal>@.service</literal>).
+                                Useful with <varname>ReusePort=</varname> above.
+                                Incompatible with <varname>Accept=true</varname>.</para></listitem>
+                        </varlistentry>
+
+                        <varlistentry>
                                 <term><varname>SmackLabel=</varname></term>
                                 <term><varname>SmackLabelIPIn=</varname></term>
                                 <term><varname>SmackLabelIPOut=</varname></term>
diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c
index 74217df..68c95a0 100644
--- a/src/core/dbus-socket.c
+++ b/src/core/dbus-socket.c
@@ -40,7 +40,6 @@ static int property_get_listen(
                 void *userdata,
                 sd_bus_error *error) {
 
-
         Socket *s = SOCKET(userdata);
         SocketPort *p;
         int r;
@@ -116,6 +115,7 @@ const sd_bus_vtable bus_socket_vtable[] = {
         SD_BUS_PROPERTY("MessageQueueMessageSize", "x", bus_property_get_long, offsetof(Socket, mq_msgsize), 0),
         SD_BUS_PROPERTY("Result", "s", property_get_result, offsetof(Socket, result), SD_BUS_VTABLE_PROPERTY_EMITS_CHANGE),
         SD_BUS_PROPERTY("ReusePort", "b",  bus_property_get_bool, offsetof(Socket, reuse_port), 0),
+        SD_BUS_PROPERTY("Distribute", "u",  bus_property_get_unsigned, offsetof(Socket, distribute), 0),
         SD_BUS_PROPERTY("SmackLabel", "s", NULL, offsetof(Socket, smack), 0),
         SD_BUS_PROPERTY("SmackLabelIPIn", "s", NULL, offsetof(Socket, smack_ip_in), 0),
         SD_BUS_PROPERTY("SmackLabelIPOut", "s", NULL, offsetof(Socket, smack_ip_out), 0),
diff --git a/src/core/load-fragment-gperf.gperf.m4 b/src/core/load-fragment-gperf.gperf.m4
index a5033b2..98f1eb7 100644
--- a/src/core/load-fragment-gperf.gperf.m4
+++ b/src/core/load-fragment-gperf.gperf.m4
@@ -212,7 +212,8 @@ Socket.Broadcast,                config_parse_bool,                  0,
 Socket.PassCredentials,          config_parse_bool,                  0,                             offsetof(Socket, pass_cred)
 Socket.PassSecurity,             config_parse_bool,                  0,                             offsetof(Socket, pass_sec)
 Socket.TCPCongestion,            config_parse_string,                0,                             offsetof(Socket, tcp_congestion)
-Socket.ReusePort,                config_parse_bool,                  0,                             offsetof(Socket, reuse_port)
+Socket.ReusePort,                config_parse_bool,                 -1,                             offsetof(Socket, reuse_port)
+Socket.Distribute,               config_parse_unsigned,              0,                             offsetof(Socket, distribute)
 Socket.MessageQueueMaxMessages,  config_parse_long,                  0,                             offsetof(Socket, mq_maxmsg)
 Socket.MessageQueueMessageSize,  config_parse_long,                  0,                             offsetof(Socket, mq_msgsize)
 Socket.Service,                  config_parse_socket_service,        0,                             0
diff --git a/src/core/service.c b/src/core/service.c
index 702443d..1d5972b 100644
--- a/src/core/service.c
+++ b/src/core/service.c
@@ -3409,9 +3409,13 @@ static void service_notify_message(Unit *u, pid_t pid, char **tags) {
         if (s->type == SERVICE_NOTIFY &&
             s->state == SERVICE_START &&
             strv_find(tags, "READY=1")) {
+                Socket *socket = SOCKET(UNIT_DEREF(s->accept_socket));
                 log_debug_unit(u->id,
                                "%s: got READY=1", u->id);
 
+                if (socket && socket->distribute > socket->n_connections)
+                        socket_enter_listening(socket);
+
                 service_enter_start_post(s);
         }
 
diff --git a/src/core/socket.c b/src/core/socket.c
index aaaa8d6..41a72a4 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -179,34 +179,30 @@ static int socket_arm_timer(Socket *s) {
 }
 
 static int socket_instantiate_service(Socket *s) {
-        char *prefix, *name;
+        _cleanup_free_ char *prefix = NULL, *name = NULL;
         int r;
         Unit *u;
 
         assert(s);
 
         /* This fills in s->service if it isn't filled in yet. For
-         * Accept=yes sockets we create the next connection service
-         * here. For Accept=no this is mostly a NOP since the service
+         * Accept=yes and Distribute=n sockets we create the next connection
+         * service here. Otherwise is mostly a NOP since the service
          * is figured out at load time anyway. */
 
-        if (UNIT_DEREF(s->service))
+        if (UNIT_DEREF(s->service) && s->distribute == 0)
                 return 0;
 
-        assert(s->accept);
+        assert(s->accept || s->distribute);
 
         if (!(prefix = unit_name_to_prefix(UNIT(s)->id)))
                 return -ENOMEM;
 
         r = asprintf(&name, "%s@%u.service", prefix, s->n_accepted);
-        free(prefix);
-
         if (r < 0)
                 return -ENOMEM;
 
         r = manager_load_unit(UNIT(s)->manager, name, NULL, NULL, &u);
-        free(name);
-
         if (r < 0)
                 return r;
 
@@ -228,7 +224,7 @@ static bool have_non_accept_socket(Socket *s) {
 
         assert(s);
 
-        if (!s->accept)
+        if (!(s->accept || s->distribute))
                 return true;
 
         LIST_FOREACH(port, p, s->ports) {
@@ -359,6 +355,13 @@ static int socket_add_extras(Socket *s) {
                         return r;
         }
 
+        if (s->reuse_port == -1) {
+                if (s->distribute)
+                        s->reuse_port = true;
+                else
+                        s->reuse_port = false;
+        }
+
         return 0;
 }
 
@@ -483,15 +486,23 @@ static void socket_dump(Unit *u, FILE *f, const char *prefix) {
                         "%sBindToDevice: %s\n",
                         prefix, s->bind_to_device);
 
-        if (s->accept)
+        if (s->accept || s->distribute)
                 fprintf(f,
                         "%sAccepted: %u\n"
-                        "%sNConnections: %u\n"
-                        "%sMaxConnections: %u\n",
+                        "%sNConnections: %u\n",
                         prefix, s->n_accepted,
-                        prefix, s->n_connections,
+                        prefix, s->n_connections);
+
+        if (s->accept)
+                fprintf(f,
+                        "%sMaxConnections: %u\n",
                         prefix, s->max_connections);
 
+        if (s->distribute)
+                fprintf(f,
+                        "%sDistribute: %u\n",
+                        prefix, s->distribute);
+
         if (s->priority >= 0)
                 fprintf(f,
                         "%sPriority: %i\n",
@@ -604,9 +615,14 @@ static int instance_from_socket(int fd, unsigned nr, char **instance) {
                 struct sockaddr_storage storage;
         } local, remote;
 
-        assert(fd >= 0);
         assert(instance);
 
+        if (fd < 0) {
+                if (asprintf(instance, "%u", nr) < 0)
+                        return -ENOMEM;
+                return 0;
+        }
+
         l = sizeof(local);
         if (getsockname(fd, &local.sa, &l) < 0)
                 return -errno;
@@ -798,8 +814,8 @@ static void socket_apply_socket_options(Socket *s, int fd) {
                 if (setsockopt(fd, SOL_TCP, TCP_CONGESTION, s->tcp_congestion, strlen(s->tcp_congestion)+1) < 0)
                         log_warning_unit(UNIT(s)->id, "TCP_CONGESTION failed: %m");
 
-        if (s->reuse_port) {
-                int b = s->reuse_port;
+        if (s->reuse_port || s->distribute) {
+                int b = true;
                 if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &b, sizeof(b)) < 0)
                         log_warning_unit(UNIT(s)->id, "SO_REUSEPORT failed: %m");
         }
@@ -1385,7 +1401,7 @@ fail:
         socket_enter_stop_post(s, SOCKET_FAILURE_RESOURCES);
 }
 
-static void socket_enter_listening(Socket *s) {
+void socket_enter_listening(Socket *s) {
         int r;
         assert(s);
 
@@ -1485,7 +1501,7 @@ static void socket_enter_running(Socket *s, int cfd) {
                 return;
         }
 
-        if (cfd < 0) {
+        if (cfd < 0 && !(s->distribute)) {
                 Iterator i;
                 Unit *other;
                 bool pending = false;
@@ -1509,7 +1525,7 @@ static void socket_enter_running(Socket *s, int cfd) {
                 _cleanup_free_ char *prefix = NULL, *instance = NULL, *name = NULL;
                 Service *service;
 
-                if (s->n_connections >= s->max_connections) {
+                if (s->n_connections >= s->max_connections && !(s->distribute)) {
                         log_warning_unit(UNIT(s)->id, "%s: Too many incoming connections (%u)", UNIT(s)->id, s->n_connections);
                         close_nointr_nofail(cfd);
                         return;
@@ -1554,7 +1570,10 @@ static void socket_enter_running(Socket *s, int cfd) {
 
                 unit_choose_id(UNIT(service), name);
 
-                r = service_set_socket_fd(service, cfd, s);
+                if (cfd >= 0)
+                        r = service_set_socket_fd(service, cfd, s);
+                else
+                        r = unit_add_two_dependencies(UNIT(s), UNIT_BEFORE, UNIT_TRIGGERS, UNIT(service), false);
                 if (r < 0)
                         goto fail;
 
@@ -1565,6 +1584,9 @@ static void socket_enter_running(Socket *s, int cfd) {
                 if (r < 0)
                         goto fail;
 
+                if (s->distribute > 0 && (s->n_connections >= s->distribute || SERVICE(UNIT_DEREF(s->service))->type == SERVICE_NOTIFY))
+                        socket_set_state(s, SOCKET_RUNNING);
+
                 /* Notify clients about changed counters */
                 unit_add_to_dbus_queue(UNIT(s));
         }
@@ -2286,13 +2308,17 @@ void socket_connection_unref(Socket *s) {
 
         /* The service is dead. Yay!
          *
-         * This is strictly for one-instance-per-connection
-         * services. */
+         * This is for one-instance-per-connection
+         * and Distribute= services */
 
         assert(s->n_connections > 0);
         s->n_connections--;
 
+
         log_debug_unit(UNIT(s)->id, "%s: One connection closed, %u left.", UNIT(s)->id, s->n_connections);
+
+        if(s->n_connections < s->distribute && s->state == SOCKET_RUNNING)
+                socket_enter_listening(s);
 }
 
 static void socket_trigger_notify(Unit *u, Unit *other) {
diff --git a/src/core/socket.h b/src/core/socket.h
index 076a183..86cd353 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -95,6 +95,8 @@ struct Socket {
         LIST_HEAD(SocketPort, ports);
 
         unsigned n_accepted;
+        /* when Accept=true this is the number of active connectoins
+         * when Distribute=n this is the number of active workers */
         unsigned n_connections;
         unsigned max_connections;
 
@@ -147,7 +149,8 @@ struct Socket {
         size_t pipe_size;
         char *bind_to_device;
         char *tcp_congestion;
-        bool reuse_port;
+        int reuse_port;
+        unsigned distribute;
         long mq_maxmsg;
         long mq_msgsize;
 
@@ -162,6 +165,9 @@ int socket_collect_fds(Socket *s, int **fds, unsigned *n_fds);
 /* Called from the service code when a per-connection service ended */
 void socket_connection_unref(Socket *s);
 
+/* Called from the service code when we recieve a READY=1 notification */
+void socket_enter_listening(Socket *s);
+
 void socket_free_ports(Socket *s);
 
 extern const UnitVTable socket_vtable;
-- 
1.8.5.1



More information about the systemd-devel mailing list