[systemd-devel] [PATCH] core: support Distribute=n to distribute to n workers
Shawn Landden
shawn at churchofgit.com
Thu Dec 12 23:46:42 PST 2013
If Distribute=n, turns SO_REUSEPORT on, and spawns
n workers to handling incoming requests.
SO_REUSEPORT sockets on the same port must all be created
by the same uid, therefore using the option allows
other root programs (or programs of the same user
if running in --user mode) to "hijack" this port, even
after systemd reserves it.
This patch is currently not ready for merging because it is
possible to do quite a number of operations, and yet have no assurance
that we have accomplished anything, ending up in an
infinite loop respawning a service that never accept()s
the socket passed to it, and therefore leaving our copy
of the socket always in EPOLLIN status. To prevent this we need
to do one of:
*) Call accept() and pass an additional fd to the service.
*) Use EPOLLET: requires event to always dispatched when it comes in.
*) Disable and then reenable the event source (requiring
that we re-create the socket) every time we
enqueue an instance, essentially emulating the behavior
of EPOLLET.
---
TODO | 3 +-
man/systemd.socket.xml | 15 ++++++-
src/core/dbus-socket.c | 4 +-
src/core/load-fragment-gperf.gperf.m4 | 3 +-
src/core/socket.c | 76 ++++++++++++++++++++++++-----------
src/core/socket.h | 5 ++-
src/shared/conf-parser.c | 32 +++++++++++++++
src/shared/conf-parser.h | 1 +
8 files changed, 109 insertions(+), 30 deletions(-)
diff --git a/TODO b/TODO
index dad55c4..fbc2609 100644
--- a/TODO
+++ b/TODO
@@ -73,7 +73,7 @@ Features:
* rfkill,backlight: we probably should run the load tools inside of the udev rules so that the state is properly initialized by the time other software sees it
-* Add a new Distribute=$NUMBER key to socket units that makes use of SO_REUSEPORT to distribute network traffic on $NUMBER instances
+* tmpfiles: when applying ownership to /run/log/journal, also do this for the journal fails contained in it
* move config_parse_path_strv() out of conf-parser.c
@@ -187,7 +187,6 @@ Features:
* teach ConditionKernelCommandLine= globs or regexes (in order to match foobar={no,0,off})
* Support SO_REUSEPORT with socket activation:
- - Let systemd maintain a pool of servers.
- Use for seamless upgrades, by running the new server before stopping the
old.
diff --git a/man/systemd.socket.xml b/man/systemd.socket.xml
index 7c10c58..6799020 100644
--- a/man/systemd.socket.xml
+++ b/man/systemd.socket.xml
@@ -404,7 +404,8 @@
designed for usage with
<citerefentry><refentrytitle>inetd</refentrytitle><manvolnum>8</manvolnum></citerefentry>
to work unmodified with systemd socket
- activation.</para></listitem>
+ activation. Incompatible with
+ <varname>Distribute=</varname></para></listitem>
</varlistentry>
<varlistentry>
@@ -519,6 +520,18 @@
</varlistentry>
<varlistentry>
+ <term><varname>Distribute=</varname></term>
+ <listitem><para>Takes an integer
+ value. Systemd will spawn up to
+ given number of instances of service each
+ listening to the same socket. Default is 0.
+ Setting this requires corresponding service to
+ be an instansiated service (name ends with <literal>@.service</literal>).
+ Useful with <varname>ReusePort=</varname> above.
+ Incompatible with <varname>Accept=true</varname>.</para></listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><varname>SmackLabel=</varname></term>
<term><varname>SmackLabelIPIn=</varname></term>
<term><varname>SmackLabelIPOut=</varname></term>
diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c
index 74217df..036c9af 100644
--- a/src/core/dbus-socket.c
+++ b/src/core/dbus-socket.c
@@ -40,7 +40,6 @@ static int property_get_listen(
void *userdata,
sd_bus_error *error) {
-
Socket *s = SOCKET(userdata);
SocketPort *p;
int r;
@@ -115,7 +114,8 @@ const sd_bus_vtable bus_socket_vtable[] = {
SD_BUS_PROPERTY("MessageQueueMaxMessages", "x", bus_property_get_long, offsetof(Socket, mq_maxmsg), 0),
SD_BUS_PROPERTY("MessageQueueMessageSize", "x", bus_property_get_long, offsetof(Socket, mq_msgsize), 0),
SD_BUS_PROPERTY("Result", "s", property_get_result, offsetof(Socket, result), SD_BUS_VTABLE_PROPERTY_EMITS_CHANGE),
- SD_BUS_PROPERTY("ReusePort", "b", bus_property_get_bool, offsetof(Socket, reuse_port), 0),
+ SD_BUS_PROPERTY("ReusePort", "b", bus_property_get_tristate, offsetof(Socket, reuse_port), 0),
+ SD_BUS_PROPERTY("Distribute", "u", bus_property_get_unsigned, offsetof(Socket, distribute), 0),
SD_BUS_PROPERTY("SmackLabel", "s", NULL, offsetof(Socket, smack), 0),
SD_BUS_PROPERTY("SmackLabelIPIn", "s", NULL, offsetof(Socket, smack_ip_in), 0),
SD_BUS_PROPERTY("SmackLabelIPOut", "s", NULL, offsetof(Socket, smack_ip_out), 0),
diff --git a/src/core/load-fragment-gperf.gperf.m4 b/src/core/load-fragment-gperf.gperf.m4
index a5033b2..b74d276 100644
--- a/src/core/load-fragment-gperf.gperf.m4
+++ b/src/core/load-fragment-gperf.gperf.m4
@@ -212,7 +212,8 @@ Socket.Broadcast, config_parse_bool, 0,
Socket.PassCredentials, config_parse_bool, 0, offsetof(Socket, pass_cred)
Socket.PassSecurity, config_parse_bool, 0, offsetof(Socket, pass_sec)
Socket.TCPCongestion, config_parse_string, 0, offsetof(Socket, tcp_congestion)
-Socket.ReusePort, config_parse_bool, 0, offsetof(Socket, reuse_port)
+Socket.ReusePort, config_parse_tristate, -1, offsetof(Socket, reuse_port)
+Socket.Distribute, config_parse_unsigned, 0, offsetof(Socket, distribute)
Socket.MessageQueueMaxMessages, config_parse_long, 0, offsetof(Socket, mq_maxmsg)
Socket.MessageQueueMessageSize, config_parse_long, 0, offsetof(Socket, mq_msgsize)
Socket.Service, config_parse_socket_service, 0, 0
diff --git a/src/core/socket.c b/src/core/socket.c
index aaaa8d6..183a875 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -179,34 +179,30 @@ static int socket_arm_timer(Socket *s) {
}
static int socket_instantiate_service(Socket *s) {
- char *prefix, *name;
+ _cleanup_free_ char *prefix = NULL, *name = NULL;
int r;
Unit *u;
assert(s);
/* This fills in s->service if it isn't filled in yet. For
- * Accept=yes sockets we create the next connection service
- * here. For Accept=no this is mostly a NOP since the service
+ * Accept=yes and Distribute=n sockets we create the next connection
+ * service here. Otherwise is mostly a NOP since the service
* is figured out at load time anyway. */
- if (UNIT_DEREF(s->service))
+ if (UNIT_DEREF(s->service) && s->distribute == 0)
return 0;
- assert(s->accept);
+ assert(s->accept || s->distribute > 0);
if (!(prefix = unit_name_to_prefix(UNIT(s)->id)))
return -ENOMEM;
r = asprintf(&name, "%s@%u.service", prefix, s->n_accepted);
- free(prefix);
-
if (r < 0)
return -ENOMEM;
r = manager_load_unit(UNIT(s)->manager, name, NULL, NULL, &u);
- free(name);
-
if (r < 0)
return r;
@@ -228,7 +224,7 @@ static bool have_non_accept_socket(Socket *s) {
assert(s);
- if (!s->accept)
+ if (!(s->accept || s->distribute > 0))
return true;
LIST_FOREACH(port, p, s->ports) {
@@ -408,6 +404,13 @@ static int socket_load(Unit *u) {
if (r < 0)
return r;
+ if (s->reuse_port < 0) {
+ if (s->distribute > 0)
+ s->reuse_port = true;
+ else
+ s->reuse_port = false;
+ }
+
if (u->load_state == UNIT_LOADED) {
/* This is a new unit? Then let's add in some extras */
r = socket_add_extras(s);
@@ -483,15 +486,23 @@ static void socket_dump(Unit *u, FILE *f, const char *prefix) {
"%sBindToDevice: %s\n",
prefix, s->bind_to_device);
- if (s->accept)
+ if (s->accept || s->distribute > 0)
fprintf(f,
"%sAccepted: %u\n"
- "%sNConnections: %u\n"
- "%sMaxConnections: %u\n",
+ "%sNConnections: %u\n",
prefix, s->n_accepted,
- prefix, s->n_connections,
+ prefix, s->n_connections);
+
+ if (s->accept)
+ fprintf(f,
+ "%sMaxConnections: %u\n",
prefix, s->max_connections);
+ if (s->distribute > 0)
+ fprintf(f,
+ "%sDistribute: %u\n",
+ prefix, s->distribute);
+
if (s->priority >= 0)
fprintf(f,
"%sPriority: %i\n",
@@ -604,9 +615,14 @@ static int instance_from_socket(int fd, unsigned nr, char **instance) {
struct sockaddr_storage storage;
} local, remote;
- assert(fd >= 0);
assert(instance);
+ if (fd < 0) {
+ if (asprintf(instance, "%u", nr) < 0)
+ return -ENOMEM;
+ return 0;
+ }
+
l = sizeof(local);
if (getsockname(fd, &local.sa, &l) < 0)
return -errno;
@@ -799,7 +815,7 @@ static void socket_apply_socket_options(Socket *s, int fd) {
log_warning_unit(UNIT(s)->id, "TCP_CONGESTION failed: %m");
if (s->reuse_port) {
- int b = s->reuse_port;
+ int b = (bool)s->reuse_port;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &b, sizeof(b)) < 0)
log_warning_unit(UNIT(s)->id, "SO_REUSEPORT failed: %m");
}
@@ -1485,7 +1501,7 @@ static void socket_enter_running(Socket *s, int cfd) {
return;
}
- if (cfd < 0) {
+ if (cfd < 0 && s->distribute == 0) {
Iterator i;
Unit *other;
bool pending = false;
@@ -1509,7 +1525,7 @@ static void socket_enter_running(Socket *s, int cfd) {
_cleanup_free_ char *prefix = NULL, *instance = NULL, *name = NULL;
Service *service;
- if (s->n_connections >= s->max_connections) {
+ if (s->n_connections >= s->max_connections && !(s->distribute)) {
log_warning_unit(UNIT(s)->id, "%s: Too many incoming connections (%u)", UNIT(s)->id, s->n_connections);
close_nointr_nofail(cfd);
return;
@@ -1554,7 +1570,10 @@ static void socket_enter_running(Socket *s, int cfd) {
unit_choose_id(UNIT(service), name);
- r = service_set_socket_fd(service, cfd, s);
+ if (cfd >= 0)
+ r = service_set_socket_fd(service, cfd, s);
+ else
+ r = unit_add_two_dependencies(UNIT(s), UNIT_BEFORE, UNIT_TRIGGERS, UNIT(service), false);
if (r < 0)
goto fail;
@@ -1565,6 +1584,9 @@ static void socket_enter_running(Socket *s, int cfd) {
if (r < 0)
goto fail;
+ if (s->distribute > 0 && s->n_connections >= s->distribute)
+ socket_set_state(s, SOCKET_RUNNING);
+
/* Notify clients about changed counters */
unit_add_to_dbus_queue(UNIT(s));
}
@@ -2286,13 +2308,17 @@ void socket_connection_unref(Socket *s) {
/* The service is dead. Yay!
*
- * This is strictly for one-instance-per-connection
- * services. */
+ * This is for one-instance-per-connection
+ * and Distribute= services */
assert(s->n_connections > 0);
s->n_connections--;
+
log_debug_unit(UNIT(s)->id, "%s: One connection closed, %u left.", UNIT(s)->id, s->n_connections);
+
+ if (s->n_connections < s->distribute && UNIT_ISSET(s->service) && SERVICE(UNIT_DEREF(s->service))->result == SERVICE_SUCCESS && s->state == SOCKET_RUNNING)
+ socket_enter_listening(s);
}
static void socket_trigger_notify(Unit *u, Unit *other) {
@@ -2328,8 +2354,12 @@ static void socket_trigger_notify(Unit *u, Unit *other) {
se->state == SERVICE_AUTO_RESTART)
socket_notify_service_dead(s, false);
- if (se->state == SERVICE_RUNNING)
- socket_set_state(s, SOCKET_RUNNING);
+ if (se->state == SERVICE_RUNNING) {
+ if (s->n_connections < s->distribute)
+ ;
+ else
+ socket_set_state(s, SOCKET_RUNNING);
+ }
}
static int socket_kill(Unit *u, KillWho who, int signo, sd_bus_error *error) {
diff --git a/src/core/socket.h b/src/core/socket.h
index 076a183..138ac34 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -95,6 +95,8 @@ struct Socket {
LIST_HEAD(SocketPort, ports);
unsigned n_accepted;
+ /* when Accept=true this is the number of active connectoins
+ * when Distribute=n this is the number of active workers */
unsigned n_connections;
unsigned max_connections;
@@ -147,7 +149,8 @@ struct Socket {
size_t pipe_size;
char *bind_to_device;
char *tcp_congestion;
- bool reuse_port;
+ int reuse_port;
+ unsigned distribute;
long mq_maxmsg;
long mq_msgsize;
diff --git a/src/shared/conf-parser.c b/src/shared/conf-parser.c
index 1e3cee5..e9654b3 100644
--- a/src/shared/conf-parser.c
+++ b/src/shared/conf-parser.c
@@ -498,6 +498,38 @@ int config_parse_bytes_off(const char* unit,
return 0;
}
+int config_parse_tristate(const char* unit,
+ const char *filename,
+ unsigned line,
+ const char *section,
+ unsigned section_line,
+ const char *lvalue,
+ int ltype,
+ const char *rvalue,
+ void *data,
+ void *userdata) {
+
+ int k;
+ int *b = data;
+
+ assert(filename);
+ assert(lvalue);
+ assert(rvalue);
+ assert(data);
+
+ /* Tristates are like booleans, but can also take the 'default' value, i.e. "-1" */
+
+ k = parse_boolean(rvalue);
+ if (k < 0) {
+ log_syntax(unit, LOG_ERR, filename, line, -k,
+ "Failed to parse boolean value, ignoring: %s", rvalue);
+ return 0;
+ }
+
+ *b = !!k;
+ return 0;
+}
+
int config_parse_bool(const char* unit,
const char *filename,
unsigned line,
diff --git a/src/shared/conf-parser.h b/src/shared/conf-parser.h
index 2d5aa31..f1f9b27 100644
--- a/src/shared/conf-parser.h
+++ b/src/shared/conf-parser.h
@@ -100,6 +100,7 @@ int config_parse_double(const char *unit, const char *filename, unsigned line, c
int config_parse_bytes_size(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
int config_parse_bytes_off(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
int config_parse_bool(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
+int config_parse_tristate(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
int config_parse_string(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
int config_parse_path(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
int config_parse_strv(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
--
1.8.5.1
More information about the systemd-devel
mailing list