[systemd-devel] [PATCH] core: support Distribute=n to distribute to n workers
Shawn Landden
shawn at churchofgit.com
Thu Dec 19 12:21:26 PST 2013
ping?
On Fri, Dec 13, 2013 at 8:23 PM, Shawn Landden <shawn at churchofgit.com> wrote:
> If Distribute=n, turns SO_REUSEPORT on, and spawns
> n workers to handling incoming requests.
>
> SO_REUSEPORT sockets on the same port must all be created
> by the same uid, therefore using the option allows
> other root programs (or programs of the same user
> if running in --user mode) to "hijack" this port, even
> after systemd reserves it.
>
> We spawn workers at a rate approximentally reverse
> exponentially proportianal to the number of incoming connections.
> Faster based on the time for new workers to start accept()ing
> and their load, or slower if systemd is under load.
> ---
> TODO | 3 +-
> man/systemd.socket.xml | 15 +++++++-
> src/core/dbus-socket.c | 4 +--
> src/core/load-fragment-gperf.gperf.m4 | 3 +-
> src/core/service.c | 4 +--
> src/core/socket.c | 68 ++++++++++++++++++++++-------------
> src/core/socket.h | 5 ++-
> src/shared/conf-parser.c | 32 +++++++++++++++++
> src/shared/conf-parser.h | 1 +
> 9 files changed, 101 insertions(+), 34 deletions(-)
>
> diff --git a/TODO b/TODO
> index 0b43888..2abe1b4 100644
> --- a/TODO
> +++ b/TODO
> @@ -73,7 +73,7 @@ Features:
>
> * rfkill,backlight: we probably should run the load tools inside of the udev rules so that the state is properly initialized by the time other software sees it
>
> -* Add a new Distribute=$NUMBER key to socket units that makes use of SO_REUSEPORT to distribute network traffic on $NUMBER instances
> +* tmpfiles: when applying ownership to /run/log/journal, also do this for the journal fails contained in it
>
> * move config_parse_path_strv() out of conf-parser.c
>
> @@ -181,7 +181,6 @@ Features:
> * teach ConditionKernelCommandLine= globs or regexes (in order to match foobar={no,0,off})
>
> * Support SO_REUSEPORT with socket activation:
> - - Let systemd maintain a pool of servers.
> - Use for seamless upgrades, by running the new server before stopping the
> old.
>
> diff --git a/man/systemd.socket.xml b/man/systemd.socket.xml
> index 7c10c58..6799020 100644
> --- a/man/systemd.socket.xml
> +++ b/man/systemd.socket.xml
> @@ -404,7 +404,8 @@
> designed for usage with
> <citerefentry><refentrytitle>inetd</refentrytitle><manvolnum>8</manvolnum></citerefentry>
> to work unmodified with systemd socket
> - activation.</para></listitem>
> + activation. Incompatible with
> + <varname>Distribute=</varname></para></listitem>
> </varlistentry>
>
> <varlistentry>
> @@ -519,6 +520,18 @@
> </varlistentry>
>
> <varlistentry>
> + <term><varname>Distribute=</varname></term>
> + <listitem><para>Takes an integer
> + value. Systemd will spawn up to
> + given number of instances of service each
> + listening to the same socket. Default is 0.
> + Setting this requires corresponding service to
> + be an instansiated service (name ends with <literal>@.service</literal>).
> + Useful with <varname>ReusePort=</varname> above.
> + Incompatible with <varname>Accept=true</varname>.</para></listitem>
> + </varlistentry>
> +
> + <varlistentry>
> <term><varname>SmackLabel=</varname></term>
> <term><varname>SmackLabelIPIn=</varname></term>
> <term><varname>SmackLabelIPOut=</varname></term>
> diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c
> index 74217df..036c9af 100644
> --- a/src/core/dbus-socket.c
> +++ b/src/core/dbus-socket.c
> @@ -40,7 +40,6 @@ static int property_get_listen(
> void *userdata,
> sd_bus_error *error) {
>
> -
> Socket *s = SOCKET(userdata);
> SocketPort *p;
> int r;
> @@ -115,7 +114,8 @@ const sd_bus_vtable bus_socket_vtable[] = {
> SD_BUS_PROPERTY("MessageQueueMaxMessages", "x", bus_property_get_long, offsetof(Socket, mq_maxmsg), 0),
> SD_BUS_PROPERTY("MessageQueueMessageSize", "x", bus_property_get_long, offsetof(Socket, mq_msgsize), 0),
> SD_BUS_PROPERTY("Result", "s", property_get_result, offsetof(Socket, result), SD_BUS_VTABLE_PROPERTY_EMITS_CHANGE),
> - SD_BUS_PROPERTY("ReusePort", "b", bus_property_get_bool, offsetof(Socket, reuse_port), 0),
> + SD_BUS_PROPERTY("ReusePort", "b", bus_property_get_tristate, offsetof(Socket, reuse_port), 0),
> + SD_BUS_PROPERTY("Distribute", "u", bus_property_get_unsigned, offsetof(Socket, distribute), 0),
> SD_BUS_PROPERTY("SmackLabel", "s", NULL, offsetof(Socket, smack), 0),
> SD_BUS_PROPERTY("SmackLabelIPIn", "s", NULL, offsetof(Socket, smack_ip_in), 0),
> SD_BUS_PROPERTY("SmackLabelIPOut", "s", NULL, offsetof(Socket, smack_ip_out), 0),
> diff --git a/src/core/load-fragment-gperf.gperf.m4 b/src/core/load-fragment-gperf.gperf.m4
> index a5033b2..70f15bd 100644
> --- a/src/core/load-fragment-gperf.gperf.m4
> +++ b/src/core/load-fragment-gperf.gperf.m4
> @@ -212,7 +212,8 @@ Socket.Broadcast, config_parse_bool, 0,
> Socket.PassCredentials, config_parse_bool, 0, offsetof(Socket, pass_cred)
> Socket.PassSecurity, config_parse_bool, 0, offsetof(Socket, pass_sec)
> Socket.TCPCongestion, config_parse_string, 0, offsetof(Socket, tcp_congestion)
> -Socket.ReusePort, config_parse_bool, 0, offsetof(Socket, reuse_port)
> +Socket.ReusePort, config_parse_tristate, 0, offsetof(Socket, reuse_port)
> +Socket.Distribute, config_parse_unsigned, 0, offsetof(Socket, distribute)
> Socket.MessageQueueMaxMessages, config_parse_long, 0, offsetof(Socket, mq_maxmsg)
> Socket.MessageQueueMessageSize, config_parse_long, 0, offsetof(Socket, mq_msgsize)
> Socket.Service, config_parse_socket_service, 0, 0
> diff --git a/src/core/service.c b/src/core/service.c
> index 3b3f956..3047e10 100644
> --- a/src/core/service.c
> +++ b/src/core/service.c
> @@ -3668,7 +3668,6 @@ static void service_bus_name_owner_change(
> int service_set_socket_fd(Service *s, int fd, Socket *sock) {
>
> assert(s);
> - assert(fd >= 0);
>
> /* This is called by the socket code when instantiating a new
> * service for a stream socket and the socket needs to be
> @@ -3683,7 +3682,8 @@ int service_set_socket_fd(Service *s, int fd, Socket *sock) {
> if (s->state != SERVICE_DEAD)
> return -EAGAIN;
>
> - s->socket_fd = fd;
> + if (fd > 0)
> + s->socket_fd = fd;
>
> unit_ref_set(&s->accept_socket, UNIT(sock));
>
> diff --git a/src/core/socket.c b/src/core/socket.c
> index aaaa8d6..be2b681 100644
> --- a/src/core/socket.c
> +++ b/src/core/socket.c
> @@ -179,34 +179,30 @@ static int socket_arm_timer(Socket *s) {
> }
>
> static int socket_instantiate_service(Socket *s) {
> - char *prefix, *name;
> + _cleanup_free_ char *prefix = NULL, *name = NULL;
> int r;
> Unit *u;
>
> assert(s);
>
> /* This fills in s->service if it isn't filled in yet. For
> - * Accept=yes sockets we create the next connection service
> - * here. For Accept=no this is mostly a NOP since the service
> + * Accept=yes and Distribute=n sockets we create the next connection
> + * service here. Otherwise is mostly a NOP since the service
> * is figured out at load time anyway. */
>
> - if (UNIT_DEREF(s->service))
> + if (UNIT_DEREF(s->service) && s->distribute == 0)
> return 0;
>
> - assert(s->accept);
> + assert(s->accept || s->distribute > 0);
>
> if (!(prefix = unit_name_to_prefix(UNIT(s)->id)))
> return -ENOMEM;
>
> r = asprintf(&name, "%s@%u.service", prefix, s->n_accepted);
> - free(prefix);
> -
> if (r < 0)
> return -ENOMEM;
>
> r = manager_load_unit(UNIT(s)->manager, name, NULL, NULL, &u);
> - free(name);
> -
> if (r < 0)
> return r;
>
> @@ -228,7 +224,7 @@ static bool have_non_accept_socket(Socket *s) {
>
> assert(s);
>
> - if (!s->accept)
> + if (!(s->accept || s->distribute > 0))
> return true;
>
> LIST_FOREACH(port, p, s->ports) {
> @@ -408,6 +404,9 @@ static int socket_load(Unit *u) {
> if (r < 0)
> return r;
>
> + if (s->reuse_port < 0)
> + s->reuse_port = s->distribute > 0;
> +
> if (u->load_state == UNIT_LOADED) {
> /* This is a new unit? Then let's add in some extras */
> r = socket_add_extras(s);
> @@ -483,15 +482,23 @@ static void socket_dump(Unit *u, FILE *f, const char *prefix) {
> "%sBindToDevice: %s\n",
> prefix, s->bind_to_device);
>
> - if (s->accept)
> + if (s->accept || s->distribute > 0)
> fprintf(f,
> "%sAccepted: %u\n"
> - "%sNConnections: %u\n"
> - "%sMaxConnections: %u\n",
> + "%sNConnections: %u\n",
> prefix, s->n_accepted,
> - prefix, s->n_connections,
> + prefix, s->n_connections);
> +
> + if (s->accept)
> + fprintf(f,
> + "%sMaxConnections: %u\n",
> prefix, s->max_connections);
>
> + if (s->distribute > 0)
> + fprintf(f,
> + "%sDistribute: %u\n",
> + prefix, s->distribute);
> +
> if (s->priority >= 0)
> fprintf(f,
> "%sPriority: %i\n",
> @@ -604,9 +611,14 @@ static int instance_from_socket(int fd, unsigned nr, char **instance) {
> struct sockaddr_storage storage;
> } local, remote;
>
> - assert(fd >= 0);
> assert(instance);
>
> + if (fd < 0) {
> + if (asprintf(instance, "%u", nr) < 0)
> + return -ENOMEM;
> + return 0;
> + }
> +
> l = sizeof(local);
> if (getsockname(fd, &local.sa, &l) < 0)
> return -errno;
> @@ -798,11 +810,9 @@ static void socket_apply_socket_options(Socket *s, int fd) {
> if (setsockopt(fd, SOL_TCP, TCP_CONGESTION, s->tcp_congestion, strlen(s->tcp_congestion)+1) < 0)
> log_warning_unit(UNIT(s)->id, "TCP_CONGESTION failed: %m");
>
> - if (s->reuse_port) {
> - int b = s->reuse_port;
> - if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &b, sizeof(b)) < 0)
> + if (s->reuse_port)
> + if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &s->reuse_port, sizeof(s->reuse_port)) < 0)
> log_warning_unit(UNIT(s)->id, "SO_REUSEPORT failed: %m");
> - }
>
> if (s->smack_ip_in)
> if (smack_label_ip_in_fd(fd, s->smack_ip_in) < 0)
> @@ -1112,7 +1122,7 @@ static int socket_watch_fds(Socket *s) {
> if (p->event_source)
> r = sd_event_source_set_enabled(p->event_source, SD_EVENT_ON);
> else
> - r = sd_event_add_io(UNIT(s)->manager->event, p->fd, EPOLLIN, socket_dispatch_io, p, &p->event_source);
> + r = sd_event_add_io(UNIT(s)->manager->event, p->fd, EPOLLIN | (s->distribute > 0 ? EPOLLET : 0), socket_dispatch_io, p, &p->event_source);
>
> if (r < 0) {
> log_warning_unit(UNIT(s)->id, "Failed to watch listening fds: %s", strerror(-r));
> @@ -1485,7 +1495,7 @@ static void socket_enter_running(Socket *s, int cfd) {
> return;
> }
>
> - if (cfd < 0) {
> + if (cfd < 0 && s->distribute == 0) {
> Iterator i;
> Unit *other;
> bool pending = false;
> @@ -1509,7 +1519,7 @@ static void socket_enter_running(Socket *s, int cfd) {
> _cleanup_free_ char *prefix = NULL, *instance = NULL, *name = NULL;
> Service *service;
>
> - if (s->n_connections >= s->max_connections) {
> + if (s->n_connections >= s->max_connections && s->distribute == 0) {
> log_warning_unit(UNIT(s)->id, "%s: Too many incoming connections (%u)", UNIT(s)->id, s->n_connections);
> close_nointr_nofail(cfd);
> return;
> @@ -1565,6 +1575,9 @@ static void socket_enter_running(Socket *s, int cfd) {
> if (r < 0)
> goto fail;
>
> + if (s->distribute > 0 && s->n_connections >= s->distribute)
> + socket_set_state(s, SOCKET_RUNNING);
> +
> /* Notify clients about changed counters */
> unit_add_to_dbus_queue(UNIT(s));
> }
> @@ -2286,13 +2299,17 @@ void socket_connection_unref(Socket *s) {
>
> /* The service is dead. Yay!
> *
> - * This is strictly for one-instance-per-connection
> - * services. */
> + * This is for one-instance-per-connection
> + * and Distribute= services */
>
> assert(s->n_connections > 0);
> s->n_connections--;
>
> +
> log_debug_unit(UNIT(s)->id, "%s: One connection closed, %u left.", UNIT(s)->id, s->n_connections);
> +
> + if (s->n_connections < s->distribute && s->state == SOCKET_RUNNING)
> + socket_enter_listening(s);
> }
>
> static void socket_trigger_notify(Unit *u, Unit *other) {
> @@ -2306,7 +2323,8 @@ static void socket_trigger_notify(Unit *u, Unit *other) {
> already down or accepting connections */
> if ((s->state != SOCKET_RUNNING &&
> s->state != SOCKET_LISTENING) ||
> - s->accept)
> + s->accept ||
> + s->distribute > 0)
> return;
>
> if (other->load_state != UNIT_LOADED ||
> diff --git a/src/core/socket.h b/src/core/socket.h
> index 076a183..138ac34 100644
> --- a/src/core/socket.h
> +++ b/src/core/socket.h
> @@ -95,6 +95,8 @@ struct Socket {
> LIST_HEAD(SocketPort, ports);
>
> unsigned n_accepted;
> + /* when Accept=true this is the number of active connectoins
> + * when Distribute=n this is the number of active workers */
> unsigned n_connections;
> unsigned max_connections;
>
> @@ -147,7 +149,8 @@ struct Socket {
> size_t pipe_size;
> char *bind_to_device;
> char *tcp_congestion;
> - bool reuse_port;
> + int reuse_port;
> + unsigned distribute;
> long mq_maxmsg;
> long mq_msgsize;
>
> diff --git a/src/shared/conf-parser.c b/src/shared/conf-parser.c
> index 1e3cee5..e9654b3 100644
> --- a/src/shared/conf-parser.c
> +++ b/src/shared/conf-parser.c
> @@ -498,6 +498,38 @@ int config_parse_bytes_off(const char* unit,
> return 0;
> }
>
> +int config_parse_tristate(const char* unit,
> + const char *filename,
> + unsigned line,
> + const char *section,
> + unsigned section_line,
> + const char *lvalue,
> + int ltype,
> + const char *rvalue,
> + void *data,
> + void *userdata) {
> +
> + int k;
> + int *b = data;
> +
> + assert(filename);
> + assert(lvalue);
> + assert(rvalue);
> + assert(data);
> +
> + /* Tristates are like booleans, but can also take the 'default' value, i.e. "-1" */
> +
> + k = parse_boolean(rvalue);
> + if (k < 0) {
> + log_syntax(unit, LOG_ERR, filename, line, -k,
> + "Failed to parse boolean value, ignoring: %s", rvalue);
> + return 0;
> + }
> +
> + *b = !!k;
> + return 0;
> +}
> +
> int config_parse_bool(const char* unit,
> const char *filename,
> unsigned line,
> diff --git a/src/shared/conf-parser.h b/src/shared/conf-parser.h
> index 2d5aa31..f1f9b27 100644
> --- a/src/shared/conf-parser.h
> +++ b/src/shared/conf-parser.h
> @@ -100,6 +100,7 @@ int config_parse_double(const char *unit, const char *filename, unsigned line, c
> int config_parse_bytes_size(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
> int config_parse_bytes_off(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
> int config_parse_bool(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
> +int config_parse_tristate(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
> int config_parse_string(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
> int config_parse_path(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
> int config_parse_strv(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
> --
> 1.8.5.1
>
> _______________________________________________
> systemd-devel mailing list
> systemd-devel at lists.freedesktop.org
> http://lists.freedesktop.org/mailman/listinfo/systemd-devel
More information about the systemd-devel
mailing list