[systemd-devel] [PATCH] core: support Distribute=n to distribute to n SO_REUSEPORT workers
Zbigniew Jędrzejewski-Szmek
zbyszek at in.waw.pl
Sat Nov 16 07:38:42 PST 2013
On Fri, Nov 15, 2013 at 08:22:14PM -0800, Shawn Landden wrote:
> v3: make each worker its own service
> v4: be less intrusive
Hi Shawn,
unfortunately this doesn't apply cleanly. Can you rebase?
> diff --git a/man/systemd.socket.xml b/man/systemd.socket.xml
> index 7c10c58..92a9275 100644
> --- a/man/systemd.socket.xml
> +++ b/man/systemd.socket.xml
> @@ -519,6 +519,15 @@
> </varlistentry>
>
> <varlistentry>
> + <term><varname>Distribute=</varname></term>
> + <listitem><para>Takes an integer
> + value. If greater than one, systemd will spawn
> + given number of instances of service each
> + listening to the same socket. This option implies
> + <varname>Reuseport=</varname> above.</para></listitem>
> + </varlistentry>
> +
> + <varlistentry>
> <term><varname>SmackLabel=</varname></term>
> <term><varname>SmackLabelIPIn=</varname></term>
> <term><varname>SmackLabelIPOut=</varname></term>
> diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c
> index 60a8d05..4644007 100644
> --- a/src/core/dbus-socket.c
> +++ b/src/core/dbus-socket.c
> @@ -68,6 +68,7 @@
> " <property name=\"Listen\" type=\"a(ss)\" access=\"read\"/>\n" \
> " <property name=\"Result\" type=\"s\" access=\"read\"/>\n" \
> " <property name=\"ReusePort\" type=\"b\" access=\"read\"/>\n" \
> + " <property name=\"Distribute\" type=\"u\" access=\"read\"/>\n" \
> " <property name=\"SmackLabel\" type=\"s\" access=\"read\"/>\n" \
> " <property name=\"SmackLabelIPIn\" type=\"s\" access=\"read\"/>\n" \
> " <property name=\"SmackLabelIPOut\" type=\"s\" access=\"read\"/>\n" \
> @@ -196,6 +197,7 @@ static const BusProperty bus_socket_properties[] = {
> { "MessageQueueMessageSize", bus_property_append_long, "x", offsetof(Socket, mq_msgsize) },
> { "Result", bus_socket_append_socket_result, "s", offsetof(Socket, result) },
> { "ReusePort", bus_property_append_bool, "b", offsetof(Socket, reuseport) },
> + { "Distribute", bus_property_append_unsigned, "u", offsetof(Socket, distribute) },
> { "SmackLabel", bus_property_append_string, "s", offsetof(Socket, smack), true },
> { "SmackLabelIPIn", bus_property_append_string, "s", offsetof(Socket, smack_ip_in), true },
> { "SmackLabelIPOut",bus_property_append_string, "s", offsetof(Socket, smack_ip_out), true },
> diff --git a/src/core/load-fragment-gperf.gperf.m4 b/src/core/load-fragment-gperf.gperf.m4
> index b64fdc9..4058a1f 100644
> --- a/src/core/load-fragment-gperf.gperf.m4
> +++ b/src/core/load-fragment-gperf.gperf.m4
> @@ -211,6 +211,7 @@ Socket.PassCredentials, config_parse_bool, 0,
> Socket.PassSecurity, config_parse_bool, 0, offsetof(Socket, pass_sec)
> Socket.TCPCongestion, config_parse_string, 0, offsetof(Socket, tcp_congestion)
> Socket.ReusePort, config_parse_bool, 0, offsetof(Socket, reuseport)
> +Socket.Distribute, config_parse_unsigned, 0, offsetof(Socket, distribute)
> Socket.MessageQueueMaxMessages, config_parse_long, 0, offsetof(Socket, mq_maxmsg)
> Socket.MessageQueueMessageSize, config_parse_long, 0, offsetof(Socket, mq_msgsize)
> Socket.Service, config_parse_socket_service, 0, 0
> diff --git a/src/core/service.c b/src/core/service.c
> index 3da32a1..8fc55a0 100644
> --- a/src/core/service.c
> +++ b/src/core/service.c
> @@ -3663,7 +3663,6 @@ static void service_bus_query_pid_done(
> int service_set_socket_fd(Service *s, int fd, Socket *sock) {
>
> assert(s);
> - assert(fd >= 0);
>
> /* This is called by the socket code when instantiating a new
> * service for a stream socket and the socket needs to be
> @@ -3678,8 +3677,10 @@ int service_set_socket_fd(Service *s, int fd, Socket *sock) {
> if (s->state != SERVICE_DEAD)
> return -EAGAIN;
>
> - s->socket_fd = fd;
> - s->got_socket_fd = true;
> + if (fd >= 0) {
> + s->socket_fd = fd;
> + s->got_socket_fd = true;
> + }
>
> unit_ref_set(&s->accept_socket, UNIT(sock));
>
> diff --git a/src/core/service.h b/src/core/service.h
> index 37fa6ff..2ffe7d1 100644
> --- a/src/core/service.h
> +++ b/src/core/service.h
> @@ -26,7 +26,6 @@ typedef struct Service Service;
> #include "unit.h"
> #include "path.h"
> #include "ratelimit.h"
> -#include "service.h"
> #include "kill.h"
> #include "exit-status.h"
>
> diff --git a/src/core/socket.c b/src/core/socket.c
> index 751f20b..11b649b 100644
> --- a/src/core/socket.c
> +++ b/src/core/socket.c
> @@ -153,34 +153,30 @@ static void socket_done(Unit *u) {
> }
>
> static int socket_instantiate_service(Socket *s) {
> - char *prefix, *name;
> + _cleanup_free_ char *prefix = NULL, *name = NULL;
> int r;
> Unit *u;
>
> assert(s);
>
> /* This fills in s->service if it isn't filled in yet. For
> - * Accept=yes sockets we create the next connection service
> - * here. For Accept=no this is mostly a NOP since the service
> + * Accept=yes and Distribute=n sockets we create the next connection
> + * service here. Otherwise is mostly a NOP since the service
> * is figured out at load time anyway. */
>
> - if (UNIT_DEREF(s->service))
> + if (UNIT_DEREF(s->service) && !(s->distribute))
> return 0;
>
> - assert(s->accept);
> + assert(s->accept || s->distribute);
>
> if (!(prefix = unit_name_to_prefix(UNIT(s)->id)))
> return -ENOMEM;
>
> r = asprintf(&name, "%s@%u.service", prefix, s->n_accepted);
Here we could use something like "%.*u", MAX(s->distribute-1), s->n_accepted
to have nicely sorted instances... (E.g. systemctl sorts by type ane name).
I'm not sure if that's better or not.
> - free(prefix);
> -
> if (r < 0)
> return -ENOMEM;
>
> r = manager_load_unit(UNIT(s)->manager, name, NULL, NULL, &u);
> - free(name);
> -
> if (r < 0)
> return r;
>
> @@ -513,6 +509,11 @@ static void socket_dump(Unit *u, FILE *f, const char *prefix) {
> "%sReusePort: %s\n",
> prefix, yes_no(s->reuseport));
>
> + if (s->distribute)
> + fprintf(f,
> + "%sDistribute: %d\n",
> + prefix, s->distribute);
> +
> if (s->smack)
> fprintf(f,
> "%sSmackLabel: %s\n",
> @@ -1454,7 +1455,7 @@ static void socket_enter_running(Socket *s, int cfd) {
> return;
> }
>
> - if (cfd < 0) {
> + if (cfd < 0 && !(s->distribute)) {
> Iterator i;
> Unit *u;
> bool pending = false;
> @@ -1486,56 +1487,66 @@ static void socket_enter_running(Socket *s, int cfd) {
> return;
> }
>
> - r = socket_instantiate_service(s);
> - if (r < 0)
> - goto fail;
> -
> - r = instance_from_socket(cfd, s->n_accepted, &instance);
> - if (r < 0) {
> - if (r != -ENOTCONN)
> - goto fail;
> -
> - /* ENOTCONN is legitimate if TCP RST was received.
> - * This connection is over, but the socket unit lives on. */
> - close_nointr_nofail(cfd);
> - return;
> - }
> -
> prefix = unit_name_to_prefix(UNIT(s)->id);
> if (!prefix) {
> r = -ENOMEM;
> goto fail;
> }
>
> - name = unit_name_build(prefix, instance, ".service");
> + do {
> + r = socket_instantiate_service(s);
> + if (r < 0)
> + goto fail;
>
> - if (!name) {
> - r = -ENOMEM;
> - goto fail;
> - }
> + if (!(s->distribute)) {
What does Distribute=1 mean? Is it treated as a special case of Distribute=n,
and just one service at 1.service is started? Or is treated as equivalent to
Distribute=0, and service.service is started? I kind of like the first version,
but then the in the manpage it should be clarified a bit that Distribute=0 is
the default, and different from Distribute=1.
> + r = instance_from_socket(cfd, s->n_accepted, &instance);
> + if (r < 0) {
> + if (r != -ENOTCONN)
> + goto fail;
>
> - r = unit_add_name(UNIT_DEREF(s->service), name);
> - if (r < 0)
> - goto fail;
> + /* ENOTCONN is legitimate if TCP RST was received.
> + * This connection is over, but the socket unit lives on. */
> + close_nointr_nofail(cfd);
> + return;
> + }
>
> - service = SERVICE(UNIT_DEREF(s->service));
> - unit_ref_unset(&s->service);
> - s->n_accepted ++;
> + name = unit_name_build(prefix, instance, ".service");
> + if (!name) {
> + r = -ENOMEM;
> + goto fail;
> + }
>
> - UNIT(service)->no_gc = false;
> + r = unit_add_name(UNIT_DEREF(s->service), name);
> + if (r < 0)
> + goto fail;
> + }
>
> - unit_choose_id(UNIT(service), name);
> + service = SERVICE(UNIT_DEREF(s->service));
> + unit_ref_unset(&s->service);
> + s->n_accepted ++;
>
> - r = service_set_socket_fd(service, cfd, s);
> - if (r < 0)
> - goto fail;
> + UNIT(service)->no_gc = false;
>
> - cfd = -1;
> - s->n_connections ++;
> + unit_choose_id(UNIT(service), name);
>
> - r = manager_add_job(UNIT(s)->manager, JOB_START, UNIT(service), JOB_REPLACE, true, &error, NULL);
> - if (r < 0)
> - goto fail;
> + r = service_set_socket_fd(service, cfd, s);
> + if (r < 0)
> + goto fail;
> +
> + cfd = -1;
> + s->n_connections ++;
> +
> + r = manager_add_job(UNIT(s)->manager, JOB_START, UNIT(service), JOB_REPLACE, true, &error, NULL);
> + if (r < 0)
> + goto fail;
> +
> + if(s->distribute > s->n_connections) {
> + /* distribute implies reuseport */
> + s->reuseport = true;
> +
> + socket_enter_listening(s);
> + }
> + } while(s->distribute > s->n_connections);
>
> /* Notify clients about changed counters */
> unit_add_to_dbus_queue(UNIT(s));
> @@ -2263,14 +2274,21 @@ void socket_connection_unref(Socket *s) {
>
> /* The service is dead. Yay!
> *
> - * This is strictly for one-instance-per-connection
> - * services. */
> + * This is for one-instance-per-connection
> + * and Distribute= services */
>
> assert(s->n_connections > 0);
> s->n_connections--;
>
> log_debug_unit(UNIT(s)->id,
> "%s: One connection closed, %u left.", UNIT(s)->id, s->n_connections);
> +
> + if(s->distribute > s->n_connections && s->state == SOCKET_RUNNING){
Could this be 's->n_connections < s->distribute'? It just feels backwards.
> + s->reuseport = true;
Could this have changed? It seems it should be set in just one place.
> + /* (re)enter systemd into SO_REUSEPORT pool, when it gets a
> + * connection it will reestablish distribute target */
> + socket_enter_listening(s);
> + }
> }
>
> static void socket_reset_failed(Unit *u) {
> diff --git a/src/core/socket.h b/src/core/socket.h
> index 3d7eadc..5928356 100644
> --- a/src/core/socket.h
> +++ b/src/core/socket.h
> @@ -93,6 +93,8 @@ struct Socket {
> LIST_HEAD(SocketPort, ports);
>
> unsigned n_accepted;
> + /* when Accept=true this is the number of active connectoins
> + * when Distribute=n this is the number of active workers */
> unsigned n_connections;
Could this become n_instances then?
> unsigned max_connections;
>
> @@ -145,6 +147,8 @@ struct Socket {
> char *bind_to_device;
> char *tcp_congestion;
> bool reuseport;
> + /* implies reuseport */
> + unsigned distribute;
> long mq_maxmsg;
> long mq_msgsize;
>
Zbyszek
More information about the systemd-devel
mailing list