[systemd-devel] [PATCH] core: support Distribute=n to distribute to n SO_REUSEPORT workers
Shawn
shawn at churchofgit.com
Sat Nov 16 13:17:44 PST 2013
On Sat, Nov 16, 2013 at 7:38 AM, Zbigniew Jędrzejewski-Szmek
<zbyszek at in.waw.pl> wrote:
> On Fri, Nov 15, 2013 at 08:22:14PM -0800, Shawn Landden wrote:
>> v3: make each worker its own service
>> v4: be less intrusive
> Hi Shawn,
> unfortunately this doesn't apply cleanly. Can you rebase?
thats because it is 3rd in a series, I will send the whole series
right after this email
>
>> diff --git a/man/systemd.socket.xml b/man/systemd.socket.xml
>> index 7c10c58..92a9275 100644
>> --- a/man/systemd.socket.xml
>> +++ b/man/systemd.socket.xml
>> @@ -519,6 +519,15 @@
>> </varlistentry>
>>
>> <varlistentry>
>> + <term><varname>Distribute=</varname></term>
>> + <listitem><para>Takes an integer
>> + value. If greater than one, systemd will spawn
>> + given number of instances of service each
>> + listening to the same socket. This option implies
>> + <varname>Reuseport=</varname> above.</para></listitem>
>> + </varlistentry>
>> +
>> + <varlistentry>
>> <term><varname>SmackLabel=</varname></term>
>> <term><varname>SmackLabelIPIn=</varname></term>
>> <term><varname>SmackLabelIPOut=</varname></term>
>> diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c
>> index 60a8d05..4644007 100644
>> --- a/src/core/dbus-socket.c
>> +++ b/src/core/dbus-socket.c
>> @@ -68,6 +68,7 @@
>> " <property name=\"Listen\" type=\"a(ss)\" access=\"read\"/>\n" \
>> " <property name=\"Result\" type=\"s\" access=\"read\"/>\n" \
>> " <property name=\"ReusePort\" type=\"b\" access=\"read\"/>\n" \
>> + " <property name=\"Distribute\" type=\"u\" access=\"read\"/>\n" \
>> " <property name=\"SmackLabel\" type=\"s\" access=\"read\"/>\n" \
>> " <property name=\"SmackLabelIPIn\" type=\"s\" access=\"read\"/>\n" \
>> " <property name=\"SmackLabelIPOut\" type=\"s\" access=\"read\"/>\n" \
>> @@ -196,6 +197,7 @@ static const BusProperty bus_socket_properties[] = {
>> { "MessageQueueMessageSize", bus_property_append_long, "x", offsetof(Socket, mq_msgsize) },
>> { "Result", bus_socket_append_socket_result, "s", offsetof(Socket, result) },
>> { "ReusePort", bus_property_append_bool, "b", offsetof(Socket, reuseport) },
>> + { "Distribute", bus_property_append_unsigned, "u", offsetof(Socket, distribute) },
>> { "SmackLabel", bus_property_append_string, "s", offsetof(Socket, smack), true },
>> { "SmackLabelIPIn", bus_property_append_string, "s", offsetof(Socket, smack_ip_in), true },
>> { "SmackLabelIPOut",bus_property_append_string, "s", offsetof(Socket, smack_ip_out), true },
>> diff --git a/src/core/load-fragment-gperf.gperf.m4 b/src/core/load-fragment-gperf.gperf.m4
>> index b64fdc9..4058a1f 100644
>> --- a/src/core/load-fragment-gperf.gperf.m4
>> +++ b/src/core/load-fragment-gperf.gperf.m4
>> @@ -211,6 +211,7 @@ Socket.PassCredentials, config_parse_bool, 0,
>> Socket.PassSecurity, config_parse_bool, 0, offsetof(Socket, pass_sec)
>> Socket.TCPCongestion, config_parse_string, 0, offsetof(Socket, tcp_congestion)
>> Socket.ReusePort, config_parse_bool, 0, offsetof(Socket, reuseport)
>> +Socket.Distribute, config_parse_unsigned, 0, offsetof(Socket, distribute)
>> Socket.MessageQueueMaxMessages, config_parse_long, 0, offsetof(Socket, mq_maxmsg)
>> Socket.MessageQueueMessageSize, config_parse_long, 0, offsetof(Socket, mq_msgsize)
>> Socket.Service, config_parse_socket_service, 0, 0
>> diff --git a/src/core/service.c b/src/core/service.c
>> index 3da32a1..8fc55a0 100644
>> --- a/src/core/service.c
>> +++ b/src/core/service.c
>> @@ -3663,7 +3663,6 @@ static void service_bus_query_pid_done(
>> int service_set_socket_fd(Service *s, int fd, Socket *sock) {
>>
>> assert(s);
>> - assert(fd >= 0);
>>
>> /* This is called by the socket code when instantiating a new
>> * service for a stream socket and the socket needs to be
>> @@ -3678,8 +3677,10 @@ int service_set_socket_fd(Service *s, int fd, Socket *sock) {
>> if (s->state != SERVICE_DEAD)
>> return -EAGAIN;
>>
>> - s->socket_fd = fd;
>> - s->got_socket_fd = true;
>> + if (fd >= 0) {
>> + s->socket_fd = fd;
>> + s->got_socket_fd = true;
>> + }
>>
>> unit_ref_set(&s->accept_socket, UNIT(sock));
>>
>> diff --git a/src/core/service.h b/src/core/service.h
>> index 37fa6ff..2ffe7d1 100644
>> --- a/src/core/service.h
>> +++ b/src/core/service.h
>> @@ -26,7 +26,6 @@ typedef struct Service Service;
>> #include "unit.h"
>> #include "path.h"
>> #include "ratelimit.h"
>> -#include "service.h"
>> #include "kill.h"
>> #include "exit-status.h"
>>
>> diff --git a/src/core/socket.c b/src/core/socket.c
>> index 751f20b..11b649b 100644
>> --- a/src/core/socket.c
>> +++ b/src/core/socket.c
>> @@ -153,34 +153,30 @@ static void socket_done(Unit *u) {
>> }
>>
>> static int socket_instantiate_service(Socket *s) {
>> - char *prefix, *name;
>> + _cleanup_free_ char *prefix = NULL, *name = NULL;
>> int r;
>> Unit *u;
>>
>> assert(s);
>>
>> /* This fills in s->service if it isn't filled in yet. For
>> - * Accept=yes sockets we create the next connection service
>> - * here. For Accept=no this is mostly a NOP since the service
>> + * Accept=yes and Distribute=n sockets we create the next connection
>> + * service here. Otherwise is mostly a NOP since the service
>> * is figured out at load time anyway. */
>>
>> - if (UNIT_DEREF(s->service))
>> + if (UNIT_DEREF(s->service) && !(s->distribute))
>> return 0;
>>
>> - assert(s->accept);
>> + assert(s->accept || s->distribute);
>>
>> if (!(prefix = unit_name_to_prefix(UNIT(s)->id)))
>> return -ENOMEM;
>>
>> r = asprintf(&name, "%s@%u.service", prefix, s->n_accepted);
> Here we could use something like "%.*u", MAX(s->distribute-1), s->n_accepted
> to have nicely sorted instances... (E.g. systemctl sorts by type ane name).
> I'm not sure if that's better or not.
I don't think this is a good idea. It needlessly special-cases
s->distribute, and as exited workers are
respawned the number can go above s->distribute, so it isn't really
right. This issues is purely cosmetic,
as all the workers are the same, and even then is really only a
systemctl problem, not a pid 1 problem,
and should be fixed as such, with more fanciful ordering.
>
>> - free(prefix);
>> -
>> if (r < 0)
>> return -ENOMEM;
>>
>> r = manager_load_unit(UNIT(s)->manager, name, NULL, NULL, &u);
>> - free(name);
>> -
>> if (r < 0)
>> return r;
>>
>> @@ -513,6 +509,11 @@ static void socket_dump(Unit *u, FILE *f, const char *prefix) {
>> "%sReusePort: %s\n",
>> prefix, yes_no(s->reuseport));
>>
>> + if (s->distribute)
>> + fprintf(f,
>> + "%sDistribute: %d\n",
>> + prefix, s->distribute);
>> +
>> if (s->smack)
>> fprintf(f,
>> "%sSmackLabel: %s\n",
>> @@ -1454,7 +1455,7 @@ static void socket_enter_running(Socket *s, int cfd) {
>> return;
>> }
>>
>> - if (cfd < 0) {
>> + if (cfd < 0 && !(s->distribute)) {
>> Iterator i;
>> Unit *u;
>> bool pending = false;
>> @@ -1486,56 +1487,66 @@ static void socket_enter_running(Socket *s, int cfd) {
>> return;
>> }
>>
>> - r = socket_instantiate_service(s);
>> - if (r < 0)
>> - goto fail;
>> -
>> - r = instance_from_socket(cfd, s->n_accepted, &instance);
>> - if (r < 0) {
>> - if (r != -ENOTCONN)
>> - goto fail;
>> -
>> - /* ENOTCONN is legitimate if TCP RST was received.
>> - * This connection is over, but the socket unit lives on. */
>> - close_nointr_nofail(cfd);
>> - return;
>> - }
>> -
>> prefix = unit_name_to_prefix(UNIT(s)->id);
>> if (!prefix) {
>> r = -ENOMEM;
>> goto fail;
>> }
>>
>> - name = unit_name_build(prefix, instance, ".service");
>> + do {
>> + r = socket_instantiate_service(s);
>> + if (r < 0)
>> + goto fail;
>>
>> - if (!name) {
>> - r = -ENOMEM;
>> - goto fail;
>> - }
>> + if (!(s->distribute)) {
> What does Distribute=1 mean? Is it treated as a special case of Distribute=n,
> and just one service at 1.service is started? Or is treated as equivalent to
> Distribute=0, and service.service is started? I kind of like the first version,
> but then the in the manpage it should be clarified a bit that Distribute=0 is
> the default, and different from Distribute=1.
the latter, clarified in man page.
>
>> + r = instance_from_socket(cfd, s->n_accepted, &instance);
>> + if (r < 0) {
>> + if (r != -ENOTCONN)
>> + goto fail;
>>
>> - r = unit_add_name(UNIT_DEREF(s->service), name);
>> - if (r < 0)
>> - goto fail;
>> + /* ENOTCONN is legitimate if TCP RST was received.
>> + * This connection is over, but the socket unit lives on. */
>> + close_nointr_nofail(cfd);
>> + return;
>> + }
>>
>> - service = SERVICE(UNIT_DEREF(s->service));
>> - unit_ref_unset(&s->service);
>> - s->n_accepted ++;
>> + name = unit_name_build(prefix, instance, ".service");
>> + if (!name) {
>> + r = -ENOMEM;
>> + goto fail;
>> + }
>>
>> - UNIT(service)->no_gc = false;
>> + r = unit_add_name(UNIT_DEREF(s->service), name);
>> + if (r < 0)
>> + goto fail;
>> + }
>>
>> - unit_choose_id(UNIT(service), name);
>> + service = SERVICE(UNIT_DEREF(s->service));
>> + unit_ref_unset(&s->service);
>> + s->n_accepted ++;
>>
>> - r = service_set_socket_fd(service, cfd, s);
>> - if (r < 0)
>> - goto fail;
>> + UNIT(service)->no_gc = false;
>>
>> - cfd = -1;
>> - s->n_connections ++;
>> + unit_choose_id(UNIT(service), name);
>>
>> - r = manager_add_job(UNIT(s)->manager, JOB_START, UNIT(service), JOB_REPLACE, true, &error, NULL);
>> - if (r < 0)
>> - goto fail;
>> + r = service_set_socket_fd(service, cfd, s);
>> + if (r < 0)
>> + goto fail;
>> +
>> + cfd = -1;
>> + s->n_connections ++;
>> +
>> + r = manager_add_job(UNIT(s)->manager, JOB_START, UNIT(service), JOB_REPLACE, true, &error, NULL);
>> + if (r < 0)
>> + goto fail;
>> +
>> + if(s->distribute > s->n_connections) {
>> + /* distribute implies reuseport */
>> + s->reuseport = true;
>> +
>> + socket_enter_listening(s);
>> + }
>> + } while(s->distribute > s->n_connections);
>>
>> /* Notify clients about changed counters */
>> unit_add_to_dbus_queue(UNIT(s));
>> @@ -2263,14 +2274,21 @@ void socket_connection_unref(Socket *s) {
>>
>> /* The service is dead. Yay!
>> *
>> - * This is strictly for one-instance-per-connection
>> - * services. */
>> + * This is for one-instance-per-connection
>> + * and Distribute= services */
>>
>> assert(s->n_connections > 0);
>> s->n_connections--;
>>
>> log_debug_unit(UNIT(s)->id,
>> "%s: One connection closed, %u left.", UNIT(s)->id, s->n_connections);
>> +
>> + if(s->distribute > s->n_connections && s->state == SOCKET_RUNNING){
> Could this be 's->n_connections < s->distribute'? It just feels backwards.
changed everywhere
>
>> + s->reuseport = true;
> Could this have changed? It seems it should be set in just one place.
moved to socket_apply_socket_options()
>
>> + /* (re)enter systemd into SO_REUSEPORT pool, when it gets a
>> + * connection it will reestablish distribute target */
>> + socket_enter_listening(s);
>> + }
>> }
>>
>> static void socket_reset_failed(Unit *u) {
>> diff --git a/src/core/socket.h b/src/core/socket.h
>> index 3d7eadc..5928356 100644
>> --- a/src/core/socket.h
>> +++ b/src/core/socket.h
>> @@ -93,6 +93,8 @@ struct Socket {
>> LIST_HEAD(SocketPort, ports);
>>
>> unsigned n_accepted;
>> + /* when Accept=true this is the number of active connectoins
>> + * when Distribute=n this is the number of active workers */
>> unsigned n_connections;
> Could this become n_instances then?
I did this, but it was too ugly as NConnections is an exported API.
>
>> unsigned max_connections;
>>
>> @@ -145,6 +147,8 @@ struct Socket {
>> char *bind_to_device;
>> char *tcp_congestion;
>> bool reuseport;
>> + /* implies reuseport */
>> + unsigned distribute;
>> long mq_maxmsg;
>> long mq_msgsize;
>>
>
> Zbyszek
> _______________________________________________
> systemd-devel mailing list
> systemd-devel at lists.freedesktop.org
> http://lists.freedesktop.org/mailman/listinfo/systemd-devel
-------------- next part --------------
A non-text attachment was scrubbed...
Name: reuseport.c
Type: text/x-csrc
Size: 1359 bytes
Desc: not available
URL: <http://lists.freedesktop.org/archives/systemd-devel/attachments/20131116/95aa83f6/attachment.c>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: reuseport.socket
Type: application/octet-stream
Size: 98 bytes
Desc: not available
URL: <http://lists.freedesktop.org/archives/systemd-devel/attachments/20131116/95aa83f6/attachment.obj>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: reuseport at .service
Type: application/octet-stream
Size: 30 bytes
Desc: not available
URL: <http://lists.freedesktop.org/archives/systemd-devel/attachments/20131116/95aa83f6/attachment-0001.obj>
More information about the systemd-devel
mailing list