[systemd-devel] [PATCH 3/3] core: support Distribute=n to distribute to n SO_REUSEPORT workers
Shawn Landden
shawn at churchofgit.com
Thu Nov 14 07:50:44 PST 2013
---
TODO | 3 +-
src/core/dbus-socket.c | 2 ++
src/core/load-fragment-gperf.gperf.m4 | 1 +
src/core/service.c | 2 +-
src/core/service.h | 13 +++++++-
src/core/socket.c | 63 +++++++++++++++++++++++++++++++++++
src/core/socket.h | 2 ++
7 files changed, 82 insertions(+), 4 deletions(-)
diff --git a/TODO b/TODO
index efc7e2a..0db4dc6 100644
--- a/TODO
+++ b/TODO
@@ -80,7 +80,7 @@ Features:
* rfkill,backlight: we probably should run the load tools inside of the udev rules so that the state is properly initialized by the time other software sees it
-* Add a new Distribute=$NUMBER key to socket units that makes use of SO_REUSEPORT to distribute network traffic on $NUMBER instances
+* respawn Distribute= worker threads when they die unexpectedly
* tmpfiles: when applying ownership to /run/log/journal, also do this for the journal fails contained in it
@@ -259,7 +259,6 @@ Features:
* teach ConditionKernelCommandLine= globs or regexes (in order to match foobar={no,0,off})
* Support SO_REUSEPORT with socket activation:
- - Let systemd maintain a pool of servers.
- Use for seamless upgrades, by running the new server before stopping the
old.
diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c
index 60a8d05..4644007 100644
--- a/src/core/dbus-socket.c
+++ b/src/core/dbus-socket.c
@@ -68,6 +68,7 @@
" <property name=\"Listen\" type=\"a(ss)\" access=\"read\"/>\n" \
" <property name=\"Result\" type=\"s\" access=\"read\"/>\n" \
" <property name=\"ReusePort\" type=\"b\" access=\"read\"/>\n" \
+ " <property name=\"Distribute\" type=\"u\" access=\"read\"/>\n" \
" <property name=\"SmackLabel\" type=\"s\" access=\"read\"/>\n" \
" <property name=\"SmackLabelIPIn\" type=\"s\" access=\"read\"/>\n" \
" <property name=\"SmackLabelIPOut\" type=\"s\" access=\"read\"/>\n" \
@@ -196,6 +197,7 @@ static const BusProperty bus_socket_properties[] = {
{ "MessageQueueMessageSize", bus_property_append_long, "x", offsetof(Socket, mq_msgsize) },
{ "Result", bus_socket_append_socket_result, "s", offsetof(Socket, result) },
{ "ReusePort", bus_property_append_bool, "b", offsetof(Socket, reuseport) },
+ { "Distribute", bus_property_append_unsigned, "u", offsetof(Socket, distribute) },
{ "SmackLabel", bus_property_append_string, "s", offsetof(Socket, smack), true },
{ "SmackLabelIPIn", bus_property_append_string, "s", offsetof(Socket, smack_ip_in), true },
{ "SmackLabelIPOut",bus_property_append_string, "s", offsetof(Socket, smack_ip_out), true },
diff --git a/src/core/load-fragment-gperf.gperf.m4 b/src/core/load-fragment-gperf.gperf.m4
index b64fdc9..4058a1f 100644
--- a/src/core/load-fragment-gperf.gperf.m4
+++ b/src/core/load-fragment-gperf.gperf.m4
@@ -211,6 +211,7 @@ Socket.PassCredentials, config_parse_bool, 0,
Socket.PassSecurity, config_parse_bool, 0, offsetof(Socket, pass_sec)
Socket.TCPCongestion, config_parse_string, 0, offsetof(Socket, tcp_congestion)
Socket.ReusePort, config_parse_bool, 0, offsetof(Socket, reuseport)
+Socket.Distribute, config_parse_unsigned, 0, offsetof(Socket, distribute)
Socket.MessageQueueMaxMessages, config_parse_long, 0, offsetof(Socket, mq_maxmsg)
Socket.MessageQueueMessageSize, config_parse_long, 0, offsetof(Socket, mq_msgsize)
Socket.Service, config_parse_socket_service, 0, 0
diff --git a/src/core/service.c b/src/core/service.c
index 3da32a1..cc337cf 100644
--- a/src/core/service.c
+++ b/src/core/service.c
@@ -1668,7 +1668,7 @@ fail:
return r;
}
-static int service_spawn(
+int service_spawn(
Service *s,
ExecCommand *c,
bool timeout,
diff --git a/src/core/service.h b/src/core/service.h
index 37fa6ff..95aa707 100644
--- a/src/core/service.h
+++ b/src/core/service.h
@@ -26,7 +26,6 @@ typedef struct Service Service;
#include "unit.h"
#include "path.h"
#include "ratelimit.h"
-#include "service.h"
#include "kill.h"
#include "exit-status.h"
@@ -201,6 +200,18 @@ extern const UnitVTable service_vtable;
struct Socket;
+int service_spawn(
+ Service *s,
+ ExecCommand *c,
+ bool timeout,
+ bool pass_fds,
+ bool apply_permissions,
+ bool apply_chroot,
+ bool apply_tty_stdin,
+ bool set_notify_socket,
+ bool is_control,
+ pid_t *_pid);
+
int service_set_socket_fd(Service *s, int fd, struct Socket *socket);
const char* service_state_to_string(ServiceState i) _const_;
diff --git a/src/core/socket.c b/src/core/socket.c
index 751f20b..9ada14d 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -1422,6 +1422,65 @@ fail:
socket_enter_dead(s, SOCKET_FAILURE_RESOURCES);
}
+static void socket_distribute(Socket *socket) {
+ int r;
+ Service *s;
+
+ assert(socket);
+
+ s = SERVICE(UNIT_DEREF(socket->service));
+
+ assert(s);
+ assert(s->exec_command[SERVICE_EXEC_START]);
+ assert(!s->exec_command[SERVICE_EXEC_START]->command_next);
+
+ if (socket->distribute <= 1)
+ return;
+
+ if (!s->type == SERVICE_SIMPLE) {
+ log_warning_unit(UNIT(s)->id,
+ "%s is not Type=simple required for Distribute. Not distributing.",
+ UNIT(s)->id);
+ return;
+ }
+
+ /* distribute implies reuseport */
+ socket->reuseport = true;
+
+ /* the first worker has already been started */
+ for (unsigned i=1;i < socket->distribute;i++) {
+ ExecCommand *c;
+ pid_t pid;
+
+ r = socket_open_fds(socket);
+ if (r < 0) {
+ log_warning_unit(UNIT(socket)->id,
+ "%s failed to open socket(s) to distribute. Ignoring: %s",
+ UNIT(socket)->id, strerror(-r));
+ return;
+ }
+
+ c = s->main_command = s->exec_command[SERVICE_EXEC_START];
+
+ r = service_spawn(s,
+ c,
+ NULL,
+ true,
+ true,
+ true,
+ true,
+ s->notify_access != NOTIFY_NONE,
+ false,
+ &pid);
+ if (r < 0) {
+ log_warning_unit(UNIT(socket)->id,
+ "%s failed to distribute. Ignoring: %s",
+ UNIT(socket)->id, strerror(-r));
+ return;
+ }
+ }
+}
+
static void socket_enter_running(Socket *s, int cfd) {
int r;
DBusError error;
@@ -1471,6 +1530,8 @@ static void socket_enter_running(Socket *s, int cfd) {
r = manager_add_job(UNIT(s)->manager, JOB_START, UNIT_DEREF(s->service), JOB_REPLACE, true, &error, NULL);
if (r < 0)
goto fail;
+
+ socket_distribute(s);
}
socket_set_state(s, SOCKET_RUNNING);
@@ -1537,6 +1598,8 @@ static void socket_enter_running(Socket *s, int cfd) {
if (r < 0)
goto fail;
+ socket_distribute(s);
+
/* Notify clients about changed counters */
unit_add_to_dbus_queue(UNIT(s));
}
diff --git a/src/core/socket.h b/src/core/socket.h
index 3d7eadc..b307822 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -145,6 +145,8 @@ struct Socket {
char *bind_to_device;
char *tcp_congestion;
bool reuseport;
+ /* implies reuseport */
+ unsigned distribute;
long mq_maxmsg;
long mq_msgsize;
--
1.8.4.3
More information about the systemd-devel
mailing list