[systemd-devel] [PATCH] core: support Distribute=n to distribute to n SO_REUSEPORT workers
Shawn Landden
shawn at churchofgit.com
Fri Nov 15 20:22:14 PST 2013
v3: make each worker its own service
v4: be less intrusive
---
TODO | 3 -
man/systemd.socket.xml | 9 +++
src/core/dbus-socket.c | 2 +
src/core/load-fragment-gperf.gperf.m4 | 1 +
src/core/service.c | 7 ++-
src/core/service.h | 1 -
src/core/socket.c | 114 ++++++++++++++++++++--------------
src/core/socket.h | 4 ++
8 files changed, 86 insertions(+), 55 deletions(-)
diff --git a/TODO b/TODO
index 57e1122..38cdf5d 100644
--- a/TODO
+++ b/TODO
@@ -82,8 +82,6 @@ Features:
* rfkill,backlight: we probably should run the load tools inside of the udev rules so that the state is properly initialized by the time other software sees it
-* Add a new Distribute=$NUMBER key to socket units that makes use of SO_REUSEPORT to distribute network traffic on $NUMBER instances
-
* tmpfiles: when applying ownership to /run/log/journal, also do this for the journal fails contained in it
* we probably should replace the left-over uses of strv_append() and replace them by strv_push() or strv_extend()
@@ -261,7 +259,6 @@ Features:
* teach ConditionKernelCommandLine= globs or regexes (in order to match foobar={no,0,off})
* Support SO_REUSEPORT with socket activation:
- - Let systemd maintain a pool of servers.
- Use for seamless upgrades, by running the new server before stopping the
old.
diff --git a/man/systemd.socket.xml b/man/systemd.socket.xml
index 7c10c58..92a9275 100644
--- a/man/systemd.socket.xml
+++ b/man/systemd.socket.xml
@@ -519,6 +519,15 @@
</varlistentry>
<varlistentry>
+ <term><varname>Distribute=</varname></term>
+ <listitem><para>Takes an integer
+ value. If greater than one, systemd will spawn
+ given number of instances of service each
+ listening to the same socket. This option implies
+ <varname>Reuseport=</varname> above.</para></listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><varname>SmackLabel=</varname></term>
<term><varname>SmackLabelIPIn=</varname></term>
<term><varname>SmackLabelIPOut=</varname></term>
diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c
index 60a8d05..4644007 100644
--- a/src/core/dbus-socket.c
+++ b/src/core/dbus-socket.c
@@ -68,6 +68,7 @@
" <property name=\"Listen\" type=\"a(ss)\" access=\"read\"/>\n" \
" <property name=\"Result\" type=\"s\" access=\"read\"/>\n" \
" <property name=\"ReusePort\" type=\"b\" access=\"read\"/>\n" \
+ " <property name=\"Distribute\" type=\"u\" access=\"read\"/>\n" \
" <property name=\"SmackLabel\" type=\"s\" access=\"read\"/>\n" \
" <property name=\"SmackLabelIPIn\" type=\"s\" access=\"read\"/>\n" \
" <property name=\"SmackLabelIPOut\" type=\"s\" access=\"read\"/>\n" \
@@ -196,6 +197,7 @@ static const BusProperty bus_socket_properties[] = {
{ "MessageQueueMessageSize", bus_property_append_long, "x", offsetof(Socket, mq_msgsize) },
{ "Result", bus_socket_append_socket_result, "s", offsetof(Socket, result) },
{ "ReusePort", bus_property_append_bool, "b", offsetof(Socket, reuseport) },
+ { "Distribute", bus_property_append_unsigned, "u", offsetof(Socket, distribute) },
{ "SmackLabel", bus_property_append_string, "s", offsetof(Socket, smack), true },
{ "SmackLabelIPIn", bus_property_append_string, "s", offsetof(Socket, smack_ip_in), true },
{ "SmackLabelIPOut",bus_property_append_string, "s", offsetof(Socket, smack_ip_out), true },
diff --git a/src/core/load-fragment-gperf.gperf.m4 b/src/core/load-fragment-gperf.gperf.m4
index b64fdc9..4058a1f 100644
--- a/src/core/load-fragment-gperf.gperf.m4
+++ b/src/core/load-fragment-gperf.gperf.m4
@@ -211,6 +211,7 @@ Socket.PassCredentials, config_parse_bool, 0,
Socket.PassSecurity, config_parse_bool, 0, offsetof(Socket, pass_sec)
Socket.TCPCongestion, config_parse_string, 0, offsetof(Socket, tcp_congestion)
Socket.ReusePort, config_parse_bool, 0, offsetof(Socket, reuseport)
+Socket.Distribute, config_parse_unsigned, 0, offsetof(Socket, distribute)
Socket.MessageQueueMaxMessages, config_parse_long, 0, offsetof(Socket, mq_maxmsg)
Socket.MessageQueueMessageSize, config_parse_long, 0, offsetof(Socket, mq_msgsize)
Socket.Service, config_parse_socket_service, 0, 0
diff --git a/src/core/service.c b/src/core/service.c
index 3da32a1..8fc55a0 100644
--- a/src/core/service.c
+++ b/src/core/service.c
@@ -3663,7 +3663,6 @@ static void service_bus_query_pid_done(
int service_set_socket_fd(Service *s, int fd, Socket *sock) {
assert(s);
- assert(fd >= 0);
/* This is called by the socket code when instantiating a new
* service for a stream socket and the socket needs to be
@@ -3678,8 +3677,10 @@ int service_set_socket_fd(Service *s, int fd, Socket *sock) {
if (s->state != SERVICE_DEAD)
return -EAGAIN;
- s->socket_fd = fd;
- s->got_socket_fd = true;
+ if (fd >= 0) {
+ s->socket_fd = fd;
+ s->got_socket_fd = true;
+ }
unit_ref_set(&s->accept_socket, UNIT(sock));
diff --git a/src/core/service.h b/src/core/service.h
index 37fa6ff..2ffe7d1 100644
--- a/src/core/service.h
+++ b/src/core/service.h
@@ -26,7 +26,6 @@ typedef struct Service Service;
#include "unit.h"
#include "path.h"
#include "ratelimit.h"
-#include "service.h"
#include "kill.h"
#include "exit-status.h"
diff --git a/src/core/socket.c b/src/core/socket.c
index 751f20b..11b649b 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -153,34 +153,30 @@ static void socket_done(Unit *u) {
}
static int socket_instantiate_service(Socket *s) {
- char *prefix, *name;
+ _cleanup_free_ char *prefix = NULL, *name = NULL;
int r;
Unit *u;
assert(s);
/* This fills in s->service if it isn't filled in yet. For
- * Accept=yes sockets we create the next connection service
- * here. For Accept=no this is mostly a NOP since the service
+ * Accept=yes and Distribute=n sockets we create the next connection
+ * service here. Otherwise is mostly a NOP since the service
* is figured out at load time anyway. */
- if (UNIT_DEREF(s->service))
+ if (UNIT_DEREF(s->service) && !(s->distribute))
return 0;
- assert(s->accept);
+ assert(s->accept || s->distribute);
if (!(prefix = unit_name_to_prefix(UNIT(s)->id)))
return -ENOMEM;
r = asprintf(&name, "%s@%u.service", prefix, s->n_accepted);
- free(prefix);
-
if (r < 0)
return -ENOMEM;
r = manager_load_unit(UNIT(s)->manager, name, NULL, NULL, &u);
- free(name);
-
if (r < 0)
return r;
@@ -513,6 +509,11 @@ static void socket_dump(Unit *u, FILE *f, const char *prefix) {
"%sReusePort: %s\n",
prefix, yes_no(s->reuseport));
+ if (s->distribute)
+ fprintf(f,
+ "%sDistribute: %d\n",
+ prefix, s->distribute);
+
if (s->smack)
fprintf(f,
"%sSmackLabel: %s\n",
@@ -1454,7 +1455,7 @@ static void socket_enter_running(Socket *s, int cfd) {
return;
}
- if (cfd < 0) {
+ if (cfd < 0 && !(s->distribute)) {
Iterator i;
Unit *u;
bool pending = false;
@@ -1486,56 +1487,66 @@ static void socket_enter_running(Socket *s, int cfd) {
return;
}
- r = socket_instantiate_service(s);
- if (r < 0)
- goto fail;
-
- r = instance_from_socket(cfd, s->n_accepted, &instance);
- if (r < 0) {
- if (r != -ENOTCONN)
- goto fail;
-
- /* ENOTCONN is legitimate if TCP RST was received.
- * This connection is over, but the socket unit lives on. */
- close_nointr_nofail(cfd);
- return;
- }
-
prefix = unit_name_to_prefix(UNIT(s)->id);
if (!prefix) {
r = -ENOMEM;
goto fail;
}
- name = unit_name_build(prefix, instance, ".service");
+ do {
+ r = socket_instantiate_service(s);
+ if (r < 0)
+ goto fail;
- if (!name) {
- r = -ENOMEM;
- goto fail;
- }
+ if (!(s->distribute)) {
+ r = instance_from_socket(cfd, s->n_accepted, &instance);
+ if (r < 0) {
+ if (r != -ENOTCONN)
+ goto fail;
- r = unit_add_name(UNIT_DEREF(s->service), name);
- if (r < 0)
- goto fail;
+ /* ENOTCONN is legitimate if TCP RST was received.
+ * This connection is over, but the socket unit lives on. */
+ close_nointr_nofail(cfd);
+ return;
+ }
- service = SERVICE(UNIT_DEREF(s->service));
- unit_ref_unset(&s->service);
- s->n_accepted ++;
+ name = unit_name_build(prefix, instance, ".service");
+ if (!name) {
+ r = -ENOMEM;
+ goto fail;
+ }
- UNIT(service)->no_gc = false;
+ r = unit_add_name(UNIT_DEREF(s->service), name);
+ if (r < 0)
+ goto fail;
+ }
- unit_choose_id(UNIT(service), name);
+ service = SERVICE(UNIT_DEREF(s->service));
+ unit_ref_unset(&s->service);
+ s->n_accepted ++;
- r = service_set_socket_fd(service, cfd, s);
- if (r < 0)
- goto fail;
+ UNIT(service)->no_gc = false;
- cfd = -1;
- s->n_connections ++;
+ unit_choose_id(UNIT(service), name);
- r = manager_add_job(UNIT(s)->manager, JOB_START, UNIT(service), JOB_REPLACE, true, &error, NULL);
- if (r < 0)
- goto fail;
+ r = service_set_socket_fd(service, cfd, s);
+ if (r < 0)
+ goto fail;
+
+ cfd = -1;
+ s->n_connections ++;
+
+ r = manager_add_job(UNIT(s)->manager, JOB_START, UNIT(service), JOB_REPLACE, true, &error, NULL);
+ if (r < 0)
+ goto fail;
+
+ if(s->distribute > s->n_connections) {
+ /* distribute implies reuseport */
+ s->reuseport = true;
+
+ socket_enter_listening(s);
+ }
+ } while(s->distribute > s->n_connections);
/* Notify clients about changed counters */
unit_add_to_dbus_queue(UNIT(s));
@@ -2263,14 +2274,21 @@ void socket_connection_unref(Socket *s) {
/* The service is dead. Yay!
*
- * This is strictly for one-instance-per-connection
- * services. */
+ * This is for one-instance-per-connection
+ * and Distribute= services */
assert(s->n_connections > 0);
s->n_connections--;
log_debug_unit(UNIT(s)->id,
"%s: One connection closed, %u left.", UNIT(s)->id, s->n_connections);
+
+ if(s->distribute > s->n_connections && s->state == SOCKET_RUNNING){
+ s->reuseport = true;
+ /* (re)enter systemd into SO_REUSEPORT pool, when it gets a
+ * connection it will reestablish distribute target */
+ socket_enter_listening(s);
+ }
}
static void socket_reset_failed(Unit *u) {
diff --git a/src/core/socket.h b/src/core/socket.h
index 3d7eadc..5928356 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -93,6 +93,8 @@ struct Socket {
LIST_HEAD(SocketPort, ports);
unsigned n_accepted;
+ /* when Accept=true this is the number of active connectoins
+ * when Distribute=n this is the number of active workers */
unsigned n_connections;
unsigned max_connections;
@@ -145,6 +147,8 @@ struct Socket {
char *bind_to_device;
char *tcp_congestion;
bool reuseport;
+ /* implies reuseport */
+ unsigned distribute;
long mq_maxmsg;
long mq_msgsize;
--
1.8.4.3
More information about the systemd-devel
mailing list