[systemd-devel] [PATCH 1/2] core: support Distribute=n to distribute to n workers
Shawn Landden
shawn at churchofgit.com
Mon Dec 9 14:08:58 PST 2013
Until there are some use cases for Distribute= w/o
SO_REUSEPORT make it imply that. Otherwise we need
a new config_parse_distribute in load-fragment.c
and gain the same issues of config_parse_syscall,
where NoNewPrivs can be still set to false,
but only if set _after_ SystemCallFilter (only allowed
when root).
Because it takes a while for the service to start up, and
until then we spin in a fast epoll loop, this tends to
start up all the instances all at once. There are a number
of ways we can slow this instanciation down:
1) Call accept() and pass an additional fd to the service
2) Use EPOLLET: requires event to be prioritized and always
dispatched.
3) Disable and then reenable the event source every time we
enqueue an instance.
IMHO #1 is not acceptable. For #2, perhaps the new event loop
infrastructure can support a special class for EPOLLET. I
am going to investigate #3 right after sending this patch.
---
TODO | 3 +-
man/systemd.socket.xml | 11 ++++++
src/core/dbus-socket.c | 2 +-
src/core/load-fragment-gperf.gperf.m4 | 1 +
src/core/socket.c | 64 +++++++++++++++++++++++------------
src/core/socket.h | 3 ++
6 files changed, 59 insertions(+), 25 deletions(-)
diff --git a/TODO b/TODO
index 8f9aabc..15233cd 100644
--- a/TODO
+++ b/TODO
@@ -69,7 +69,7 @@ Features:
* rfkill,backlight: we probably should run the load tools inside of the udev rules so that the state is properly initialized by the time other software sees it
-* Add a new Distribute=$NUMBER key to socket units that makes use of SO_REUSEPORT to distribute network traffic on $NUMBER instances
+* tmpfiles: when applying ownership to /run/log/journal, also do this for the journal fails contained in it
* we probably should replace the left-over uses of strv_append() and replace them by strv_push() or strv_extend()
@@ -187,7 +187,6 @@ Features:
* teach ConditionKernelCommandLine= globs or regexes (in order to match foobar={no,0,off})
* Support SO_REUSEPORT with socket activation:
- - Let systemd maintain a pool of servers.
- Use for seamless upgrades, by running the new server before stopping the
old.
diff --git a/man/systemd.socket.xml b/man/systemd.socket.xml
index 7c10c58..48a4b77 100644
--- a/man/systemd.socket.xml
+++ b/man/systemd.socket.xml
@@ -519,6 +519,17 @@
</varlistentry>
<varlistentry>
+ <term><varname>Distribute=</varname></term>
+ <listitem><para>Takes an integer
+ value. Systemd will spawn up to
+ given number of instances of service each
+ listening to the same socket. Default is 0.
+ Setting this requires corresponding service to
+ be an instansiated service (name ends with <literal>@.service</literal>).
+ Useful with <varname>Reuseport=</varname> above.</para></listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><varname>SmackLabel=</varname></term>
<term><varname>SmackLabelIPIn=</varname></term>
<term><varname>SmackLabelIPOut=</varname></term>
diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c
index 74217df..68c95a0 100644
--- a/src/core/dbus-socket.c
+++ b/src/core/dbus-socket.c
@@ -40,7 +40,6 @@ static int property_get_listen(
void *userdata,
sd_bus_error *error) {
-
Socket *s = SOCKET(userdata);
SocketPort *p;
int r;
@@ -116,6 +115,7 @@ const sd_bus_vtable bus_socket_vtable[] = {
SD_BUS_PROPERTY("MessageQueueMessageSize", "x", bus_property_get_long, offsetof(Socket, mq_msgsize), 0),
SD_BUS_PROPERTY("Result", "s", property_get_result, offsetof(Socket, result), SD_BUS_VTABLE_PROPERTY_EMITS_CHANGE),
SD_BUS_PROPERTY("ReusePort", "b", bus_property_get_bool, offsetof(Socket, reuse_port), 0),
+ SD_BUS_PROPERTY("Distribute", "u", bus_property_get_unsigned, offsetof(Socket, distribute), 0),
SD_BUS_PROPERTY("SmackLabel", "s", NULL, offsetof(Socket, smack), 0),
SD_BUS_PROPERTY("SmackLabelIPIn", "s", NULL, offsetof(Socket, smack_ip_in), 0),
SD_BUS_PROPERTY("SmackLabelIPOut", "s", NULL, offsetof(Socket, smack_ip_out), 0),
diff --git a/src/core/load-fragment-gperf.gperf.m4 b/src/core/load-fragment-gperf.gperf.m4
index a5033b2..de82586 100644
--- a/src/core/load-fragment-gperf.gperf.m4
+++ b/src/core/load-fragment-gperf.gperf.m4
@@ -213,6 +213,7 @@ Socket.PassCredentials, config_parse_bool, 0,
Socket.PassSecurity, config_parse_bool, 0, offsetof(Socket, pass_sec)
Socket.TCPCongestion, config_parse_string, 0, offsetof(Socket, tcp_congestion)
Socket.ReusePort, config_parse_bool, 0, offsetof(Socket, reuse_port)
+Socket.Distribute, config_parse_unsigned, 0, offsetof(Socket, distribute)
Socket.MessageQueueMaxMessages, config_parse_long, 0, offsetof(Socket, mq_maxmsg)
Socket.MessageQueueMessageSize, config_parse_long, 0, offsetof(Socket, mq_msgsize)
Socket.Service, config_parse_socket_service, 0, 0
diff --git a/src/core/socket.c b/src/core/socket.c
index aaaa8d6..f229cd8 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -179,34 +179,30 @@ static int socket_arm_timer(Socket *s) {
}
static int socket_instantiate_service(Socket *s) {
- char *prefix, *name;
+ _cleanup_free_ char *prefix = NULL, *name = NULL;
int r;
Unit *u;
assert(s);
/* This fills in s->service if it isn't filled in yet. For
- * Accept=yes sockets we create the next connection service
- * here. For Accept=no this is mostly a NOP since the service
+ * Accept=yes and Distribute=n sockets we create the next connection
+ * service here. Otherwise is mostly a NOP since the service
* is figured out at load time anyway. */
- if (UNIT_DEREF(s->service))
+ if (UNIT_DEREF(s->service) && !(s->distribute))
return 0;
- assert(s->accept);
+ assert(s->accept || s->distribute);
if (!(prefix = unit_name_to_prefix(UNIT(s)->id)))
return -ENOMEM;
r = asprintf(&name, "%s@%u.service", prefix, s->n_accepted);
- free(prefix);
-
if (r < 0)
return -ENOMEM;
r = manager_load_unit(UNIT(s)->manager, name, NULL, NULL, &u);
- free(name);
-
if (r < 0)
return r;
@@ -228,7 +224,7 @@ static bool have_non_accept_socket(Socket *s) {
assert(s);
- if (!s->accept)
+ if (!(s->accept || s->distribute))
return true;
LIST_FOREACH(port, p, s->ports) {
@@ -483,15 +479,23 @@ static void socket_dump(Unit *u, FILE *f, const char *prefix) {
"%sBindToDevice: %s\n",
prefix, s->bind_to_device);
- if (s->accept)
+ if (s->accept || s->distribute)
fprintf(f,
"%sAccepted: %u\n"
- "%sNConnections: %u\n"
- "%sMaxConnections: %u\n",
+ "%sNConnections: %u\n",
prefix, s->n_accepted,
- prefix, s->n_connections,
+ prefix, s->n_connections);
+
+ if (s->accept)
+ fprintf(f,
+ "%sMaxConnections: %u\n",
prefix, s->max_connections);
+ if (s->distribute)
+ fprintf(f,
+ "%sDistribute: %u\n",
+ prefix, s->distribute);
+
if (s->priority >= 0)
fprintf(f,
"%sPriority: %i\n",
@@ -604,9 +608,14 @@ static int instance_from_socket(int fd, unsigned nr, char **instance) {
struct sockaddr_storage storage;
} local, remote;
- assert(fd >= 0);
assert(instance);
+ if (fd < 0) {
+ if (asprintf(&r, "%u", nr) < 0)
+ return -ENOMEM;
+ goto shortcut;
+ }
+
l = sizeof(local);
if (getsockname(fd, &local.sa, &l) < 0)
return -errno;
@@ -690,6 +699,7 @@ static int instance_from_socket(int fd, unsigned nr, char **instance) {
assert_not_reached("Unhandled socket type.");
}
+shortcut:
*instance = r;
return 0;
}
@@ -798,8 +808,8 @@ static void socket_apply_socket_options(Socket *s, int fd) {
if (setsockopt(fd, SOL_TCP, TCP_CONGESTION, s->tcp_congestion, strlen(s->tcp_congestion)+1) < 0)
log_warning_unit(UNIT(s)->id, "TCP_CONGESTION failed: %m");
- if (s->reuse_port) {
- int b = s->reuse_port;
+ if (s->reuse_port || s->distribute) {
+ int b = true;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &b, sizeof(b)) < 0)
log_warning_unit(UNIT(s)->id, "SO_REUSEPORT failed: %m");
}
@@ -1485,7 +1495,7 @@ static void socket_enter_running(Socket *s, int cfd) {
return;
}
- if (cfd < 0) {
+ if (cfd < 0 && !(s->distribute)) {
Iterator i;
Unit *other;
bool pending = false;
@@ -1509,7 +1519,7 @@ static void socket_enter_running(Socket *s, int cfd) {
_cleanup_free_ char *prefix = NULL, *instance = NULL, *name = NULL;
Service *service;
- if (s->n_connections >= s->max_connections) {
+ if (s->n_connections >= s->max_connections && !(s->distribute)) {
log_warning_unit(UNIT(s)->id, "%s: Too many incoming connections (%u)", UNIT(s)->id, s->n_connections);
close_nointr_nofail(cfd);
return;
@@ -1554,7 +1564,10 @@ static void socket_enter_running(Socket *s, int cfd) {
unit_choose_id(UNIT(service), name);
- r = service_set_socket_fd(service, cfd, s);
+ if (cfd >= 0)
+ r = service_set_socket_fd(service, cfd, s);
+ else
+ r = unit_add_two_dependencies(UNIT(s), UNIT_BEFORE, UNIT_TRIGGERS, UNIT(service), false);
if (r < 0)
goto fail;
@@ -1565,6 +1578,9 @@ static void socket_enter_running(Socket *s, int cfd) {
if (r < 0)
goto fail;
+ if (s->distribute && s->n_connections >= s->distribute)
+ socket_set_state(s, SOCKET_RUNNING);
+
/* Notify clients about changed counters */
unit_add_to_dbus_queue(UNIT(s));
}
@@ -2286,13 +2302,17 @@ void socket_connection_unref(Socket *s) {
/* The service is dead. Yay!
*
- * This is strictly for one-instance-per-connection
- * services. */
+ * This is for one-instance-per-connection
+ * and Distribute= services */
assert(s->n_connections > 0);
s->n_connections--;
+
log_debug_unit(UNIT(s)->id, "%s: One connection closed, %u left.", UNIT(s)->id, s->n_connections);
+
+ if(s->n_connections < s->distribute && s->state == SOCKET_RUNNING)
+ socket_enter_listening(s);
}
static void socket_trigger_notify(Unit *u, Unit *other) {
diff --git a/src/core/socket.h b/src/core/socket.h
index 076a183..0d85cee 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -95,6 +95,8 @@ struct Socket {
LIST_HEAD(SocketPort, ports);
unsigned n_accepted;
+ /* when Accept=true this is the number of active connectoins
+ * when Distribute=n this is the number of active workers */
unsigned n_connections;
unsigned max_connections;
@@ -148,6 +150,7 @@ struct Socket {
char *bind_to_device;
char *tcp_congestion;
bool reuse_port;
+ unsigned distribute;
long mq_maxmsg;
long mq_msgsize;
--
1.8.5.1
More information about the systemd-devel
mailing list