[systemd-devel] [PATCH] core: support Distribute=n to distribute to n workers

Shawn Landden shawn at churchofgit.com
Thu Dec 19 12:21:26 PST 2013


ping?

On Fri, Dec 13, 2013 at 8:23 PM, Shawn Landden <shawn at churchofgit.com> wrote:
> If Distribute=n, turns SO_REUSEPORT on, and spawns
> n workers to handling incoming requests.
>
> SO_REUSEPORT sockets on the same port must all be created
> by the same uid, therefore using the option allows
> other root programs (or programs of the same user
> if running in --user mode) to "hijack" this port, even
> after systemd reserves it.
>
> We spawn workers at a rate approximentally reverse
> exponentially proportianal to the number of incoming connections.
> Faster based on the time for new workers to start accept()ing
> and their load, or slower if systemd is under load.
> ---
>  TODO                                  |  3 +-
>  man/systemd.socket.xml                | 15 +++++++-
>  src/core/dbus-socket.c                |  4 +--
>  src/core/load-fragment-gperf.gperf.m4 |  3 +-
>  src/core/service.c                    |  4 +--
>  src/core/socket.c                     | 68 ++++++++++++++++++++++-------------
>  src/core/socket.h                     |  5 ++-
>  src/shared/conf-parser.c              | 32 +++++++++++++++++
>  src/shared/conf-parser.h              |  1 +
>  9 files changed, 101 insertions(+), 34 deletions(-)
>
> diff --git a/TODO b/TODO
> index 0b43888..2abe1b4 100644
> --- a/TODO
> +++ b/TODO
> @@ -73,7 +73,7 @@ Features:
>
>  * rfkill,backlight: we probably should run the load tools inside of the udev rules so that the state is properly initialized by the time other software sees it
>
> -* Add a new Distribute=$NUMBER key to socket units that makes use of SO_REUSEPORT to distribute network traffic on $NUMBER instances
> +* tmpfiles: when applying ownership to /run/log/journal, also do this for the journal fails contained in it
>
>  * move config_parse_path_strv() out of conf-parser.c
>
> @@ -181,7 +181,6 @@ Features:
>  * teach ConditionKernelCommandLine= globs or regexes (in order to match foobar={no,0,off})
>
>  * Support SO_REUSEPORT with socket activation:
> -  - Let systemd maintain a pool of servers.
>    - Use for seamless upgrades, by running the new server before stopping the
>      old.
>
> diff --git a/man/systemd.socket.xml b/man/systemd.socket.xml
> index 7c10c58..6799020 100644
> --- a/man/systemd.socket.xml
> +++ b/man/systemd.socket.xml
> @@ -404,7 +404,8 @@
>                                  designed for usage with
>                                  <citerefentry><refentrytitle>inetd</refentrytitle><manvolnum>8</manvolnum></citerefentry>
>                                  to work unmodified with systemd socket
> -                                activation.</para></listitem>
> +                                activation. Incompatible with
> +                                <varname>Distribute=</varname></para></listitem>
>                          </varlistentry>
>
>                          <varlistentry>
> @@ -519,6 +520,18 @@
>                          </varlistentry>
>
>                          <varlistentry>
> +                                <term><varname>Distribute=</varname></term>
> +                                <listitem><para>Takes an integer
> +                                value. Systemd will spawn up to
> +                                given number of instances of service each
> +                                listening to the same socket. Default is 0.
> +                                Setting this requires corresponding service to
> +                                be an instansiated service (name ends with <literal>@.service</literal>).
> +                                Useful with <varname>ReusePort=</varname> above.
> +                                Incompatible with <varname>Accept=true</varname>.</para></listitem>
> +                        </varlistentry>
> +
> +                        <varlistentry>
>                                  <term><varname>SmackLabel=</varname></term>
>                                  <term><varname>SmackLabelIPIn=</varname></term>
>                                  <term><varname>SmackLabelIPOut=</varname></term>
> diff --git a/src/core/dbus-socket.c b/src/core/dbus-socket.c
> index 74217df..036c9af 100644
> --- a/src/core/dbus-socket.c
> +++ b/src/core/dbus-socket.c
> @@ -40,7 +40,6 @@ static int property_get_listen(
>                  void *userdata,
>                  sd_bus_error *error) {
>
> -
>          Socket *s = SOCKET(userdata);
>          SocketPort *p;
>          int r;
> @@ -115,7 +114,8 @@ const sd_bus_vtable bus_socket_vtable[] = {
>          SD_BUS_PROPERTY("MessageQueueMaxMessages", "x", bus_property_get_long, offsetof(Socket, mq_maxmsg), 0),
>          SD_BUS_PROPERTY("MessageQueueMessageSize", "x", bus_property_get_long, offsetof(Socket, mq_msgsize), 0),
>          SD_BUS_PROPERTY("Result", "s", property_get_result, offsetof(Socket, result), SD_BUS_VTABLE_PROPERTY_EMITS_CHANGE),
> -        SD_BUS_PROPERTY("ReusePort", "b",  bus_property_get_bool, offsetof(Socket, reuse_port), 0),
> +        SD_BUS_PROPERTY("ReusePort", "b",  bus_property_get_tristate, offsetof(Socket, reuse_port), 0),
> +        SD_BUS_PROPERTY("Distribute", "u",  bus_property_get_unsigned, offsetof(Socket, distribute), 0),
>          SD_BUS_PROPERTY("SmackLabel", "s", NULL, offsetof(Socket, smack), 0),
>          SD_BUS_PROPERTY("SmackLabelIPIn", "s", NULL, offsetof(Socket, smack_ip_in), 0),
>          SD_BUS_PROPERTY("SmackLabelIPOut", "s", NULL, offsetof(Socket, smack_ip_out), 0),
> diff --git a/src/core/load-fragment-gperf.gperf.m4 b/src/core/load-fragment-gperf.gperf.m4
> index a5033b2..70f15bd 100644
> --- a/src/core/load-fragment-gperf.gperf.m4
> +++ b/src/core/load-fragment-gperf.gperf.m4
> @@ -212,7 +212,8 @@ Socket.Broadcast,                config_parse_bool,                  0,
>  Socket.PassCredentials,          config_parse_bool,                  0,                             offsetof(Socket, pass_cred)
>  Socket.PassSecurity,             config_parse_bool,                  0,                             offsetof(Socket, pass_sec)
>  Socket.TCPCongestion,            config_parse_string,                0,                             offsetof(Socket, tcp_congestion)
> -Socket.ReusePort,                config_parse_bool,                  0,                             offsetof(Socket, reuse_port)
> +Socket.ReusePort,                config_parse_tristate,              0,                             offsetof(Socket, reuse_port)
> +Socket.Distribute,               config_parse_unsigned,              0,                             offsetof(Socket, distribute)
>  Socket.MessageQueueMaxMessages,  config_parse_long,                  0,                             offsetof(Socket, mq_maxmsg)
>  Socket.MessageQueueMessageSize,  config_parse_long,                  0,                             offsetof(Socket, mq_msgsize)
>  Socket.Service,                  config_parse_socket_service,        0,                             0
> diff --git a/src/core/service.c b/src/core/service.c
> index 3b3f956..3047e10 100644
> --- a/src/core/service.c
> +++ b/src/core/service.c
> @@ -3668,7 +3668,6 @@ static void service_bus_name_owner_change(
>  int service_set_socket_fd(Service *s, int fd, Socket *sock) {
>
>          assert(s);
> -        assert(fd >= 0);
>
>          /* This is called by the socket code when instantiating a new
>           * service for a stream socket and the socket needs to be
> @@ -3683,7 +3682,8 @@ int service_set_socket_fd(Service *s, int fd, Socket *sock) {
>          if (s->state != SERVICE_DEAD)
>                  return -EAGAIN;
>
> -        s->socket_fd = fd;
> +        if (fd > 0)
> +                s->socket_fd = fd;
>
>          unit_ref_set(&s->accept_socket, UNIT(sock));
>
> diff --git a/src/core/socket.c b/src/core/socket.c
> index aaaa8d6..be2b681 100644
> --- a/src/core/socket.c
> +++ b/src/core/socket.c
> @@ -179,34 +179,30 @@ static int socket_arm_timer(Socket *s) {
>  }
>
>  static int socket_instantiate_service(Socket *s) {
> -        char *prefix, *name;
> +        _cleanup_free_ char *prefix = NULL, *name = NULL;
>          int r;
>          Unit *u;
>
>          assert(s);
>
>          /* This fills in s->service if it isn't filled in yet. For
> -         * Accept=yes sockets we create the next connection service
> -         * here. For Accept=no this is mostly a NOP since the service
> +         * Accept=yes and Distribute=n sockets we create the next connection
> +         * service here. Otherwise is mostly a NOP since the service
>           * is figured out at load time anyway. */
>
> -        if (UNIT_DEREF(s->service))
> +        if (UNIT_DEREF(s->service) && s->distribute == 0)
>                  return 0;
>
> -        assert(s->accept);
> +        assert(s->accept || s->distribute > 0);
>
>          if (!(prefix = unit_name_to_prefix(UNIT(s)->id)))
>                  return -ENOMEM;
>
>          r = asprintf(&name, "%s@%u.service", prefix, s->n_accepted);
> -        free(prefix);
> -
>          if (r < 0)
>                  return -ENOMEM;
>
>          r = manager_load_unit(UNIT(s)->manager, name, NULL, NULL, &u);
> -        free(name);
> -
>          if (r < 0)
>                  return r;
>
> @@ -228,7 +224,7 @@ static bool have_non_accept_socket(Socket *s) {
>
>          assert(s);
>
> -        if (!s->accept)
> +        if (!(s->accept || s->distribute > 0))
>                  return true;
>
>          LIST_FOREACH(port, p, s->ports) {
> @@ -408,6 +404,9 @@ static int socket_load(Unit *u) {
>          if (r < 0)
>                  return r;
>
> +        if (s->reuse_port < 0)
> +                s->reuse_port = s->distribute > 0;
> +
>          if (u->load_state == UNIT_LOADED) {
>                  /* This is a new unit? Then let's add in some extras */
>                  r = socket_add_extras(s);
> @@ -483,15 +482,23 @@ static void socket_dump(Unit *u, FILE *f, const char *prefix) {
>                          "%sBindToDevice: %s\n",
>                          prefix, s->bind_to_device);
>
> -        if (s->accept)
> +        if (s->accept || s->distribute > 0)
>                  fprintf(f,
>                          "%sAccepted: %u\n"
> -                        "%sNConnections: %u\n"
> -                        "%sMaxConnections: %u\n",
> +                        "%sNConnections: %u\n",
>                          prefix, s->n_accepted,
> -                        prefix, s->n_connections,
> +                        prefix, s->n_connections);
> +
> +        if (s->accept)
> +                fprintf(f,
> +                        "%sMaxConnections: %u\n",
>                          prefix, s->max_connections);
>
> +        if (s->distribute > 0)
> +                fprintf(f,
> +                        "%sDistribute: %u\n",
> +                        prefix, s->distribute);
> +
>          if (s->priority >= 0)
>                  fprintf(f,
>                          "%sPriority: %i\n",
> @@ -604,9 +611,14 @@ static int instance_from_socket(int fd, unsigned nr, char **instance) {
>                  struct sockaddr_storage storage;
>          } local, remote;
>
> -        assert(fd >= 0);
>          assert(instance);
>
> +        if (fd < 0) {
> +                if (asprintf(instance, "%u", nr) < 0)
> +                        return -ENOMEM;
> +                return 0;
> +        }
> +
>          l = sizeof(local);
>          if (getsockname(fd, &local.sa, &l) < 0)
>                  return -errno;
> @@ -798,11 +810,9 @@ static void socket_apply_socket_options(Socket *s, int fd) {
>                  if (setsockopt(fd, SOL_TCP, TCP_CONGESTION, s->tcp_congestion, strlen(s->tcp_congestion)+1) < 0)
>                          log_warning_unit(UNIT(s)->id, "TCP_CONGESTION failed: %m");
>
> -        if (s->reuse_port) {
> -                int b = s->reuse_port;
> -                if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &b, sizeof(b)) < 0)
> +        if (s->reuse_port)
> +                if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &s->reuse_port, sizeof(s->reuse_port)) < 0)
>                          log_warning_unit(UNIT(s)->id, "SO_REUSEPORT failed: %m");
> -        }
>
>          if (s->smack_ip_in)
>                  if (smack_label_ip_in_fd(fd, s->smack_ip_in) < 0)
> @@ -1112,7 +1122,7 @@ static int socket_watch_fds(Socket *s) {
>                  if (p->event_source)
>                          r = sd_event_source_set_enabled(p->event_source, SD_EVENT_ON);
>                  else
> -                        r = sd_event_add_io(UNIT(s)->manager->event, p->fd, EPOLLIN, socket_dispatch_io, p, &p->event_source);
> +                        r = sd_event_add_io(UNIT(s)->manager->event, p->fd, EPOLLIN | (s->distribute > 0 ? EPOLLET : 0), socket_dispatch_io, p, &p->event_source);
>
>                  if (r < 0) {
>                          log_warning_unit(UNIT(s)->id, "Failed to watch listening fds: %s", strerror(-r));
> @@ -1485,7 +1495,7 @@ static void socket_enter_running(Socket *s, int cfd) {
>                  return;
>          }
>
> -        if (cfd < 0) {
> +        if (cfd < 0 && s->distribute == 0) {
>                  Iterator i;
>                  Unit *other;
>                  bool pending = false;
> @@ -1509,7 +1519,7 @@ static void socket_enter_running(Socket *s, int cfd) {
>                  _cleanup_free_ char *prefix = NULL, *instance = NULL, *name = NULL;
>                  Service *service;
>
> -                if (s->n_connections >= s->max_connections) {
> +                if (s->n_connections >= s->max_connections && s->distribute == 0) {
>                          log_warning_unit(UNIT(s)->id, "%s: Too many incoming connections (%u)", UNIT(s)->id, s->n_connections);
>                          close_nointr_nofail(cfd);
>                          return;
> @@ -1565,6 +1575,9 @@ static void socket_enter_running(Socket *s, int cfd) {
>                  if (r < 0)
>                          goto fail;
>
> +                if (s->distribute > 0 && s->n_connections >= s->distribute)
> +                        socket_set_state(s, SOCKET_RUNNING);
> +
>                  /* Notify clients about changed counters */
>                  unit_add_to_dbus_queue(UNIT(s));
>          }
> @@ -2286,13 +2299,17 @@ void socket_connection_unref(Socket *s) {
>
>          /* The service is dead. Yay!
>           *
> -         * This is strictly for one-instance-per-connection
> -         * services. */
> +         * This is for one-instance-per-connection
> +         * and Distribute= services */
>
>          assert(s->n_connections > 0);
>          s->n_connections--;
>
> +
>          log_debug_unit(UNIT(s)->id, "%s: One connection closed, %u left.", UNIT(s)->id, s->n_connections);
> +
> +        if (s->n_connections < s->distribute && s->state == SOCKET_RUNNING)
> +                socket_enter_listening(s);
>  }
>
>  static void socket_trigger_notify(Unit *u, Unit *other) {
> @@ -2306,7 +2323,8 @@ static void socket_trigger_notify(Unit *u, Unit *other) {
>             already down or accepting connections */
>          if ((s->state !=  SOCKET_RUNNING &&
>              s->state != SOCKET_LISTENING) ||
> -            s->accept)
> +            s->accept ||
> +            s->distribute > 0)
>                  return;
>
>          if (other->load_state != UNIT_LOADED ||
> diff --git a/src/core/socket.h b/src/core/socket.h
> index 076a183..138ac34 100644
> --- a/src/core/socket.h
> +++ b/src/core/socket.h
> @@ -95,6 +95,8 @@ struct Socket {
>          LIST_HEAD(SocketPort, ports);
>
>          unsigned n_accepted;
> +        /* when Accept=true this is the number of active connectoins
> +         * when Distribute=n this is the number of active workers */
>          unsigned n_connections;
>          unsigned max_connections;
>
> @@ -147,7 +149,8 @@ struct Socket {
>          size_t pipe_size;
>          char *bind_to_device;
>          char *tcp_congestion;
> -        bool reuse_port;
> +        int reuse_port;
> +        unsigned distribute;
>          long mq_maxmsg;
>          long mq_msgsize;
>
> diff --git a/src/shared/conf-parser.c b/src/shared/conf-parser.c
> index 1e3cee5..e9654b3 100644
> --- a/src/shared/conf-parser.c
> +++ b/src/shared/conf-parser.c
> @@ -498,6 +498,38 @@ int config_parse_bytes_off(const char* unit,
>          return 0;
>  }
>
> +int config_parse_tristate(const char* unit,
> +                          const char *filename,
> +                          unsigned line,
> +                          const char *section,
> +                          unsigned section_line,
> +                          const char *lvalue,
> +                          int ltype,
> +                          const char *rvalue,
> +                          void *data,
> +                          void *userdata) {
> +
> +        int k;
> +        int *b = data;
> +
> +        assert(filename);
> +        assert(lvalue);
> +        assert(rvalue);
> +        assert(data);
> +
> +        /* Tristates are like booleans, but can also take the 'default' value, i.e. "-1" */
> +
> +        k = parse_boolean(rvalue);
> +        if (k < 0) {
> +                log_syntax(unit, LOG_ERR, filename, line, -k,
> +                           "Failed to parse boolean value, ignoring: %s", rvalue);
> +                return 0;
> +        }
> +
> +        *b = !!k;
> +        return 0;
> +}
> +
>  int config_parse_bool(const char* unit,
>                        const char *filename,
>                        unsigned line,
> diff --git a/src/shared/conf-parser.h b/src/shared/conf-parser.h
> index 2d5aa31..f1f9b27 100644
> --- a/src/shared/conf-parser.h
> +++ b/src/shared/conf-parser.h
> @@ -100,6 +100,7 @@ int config_parse_double(const char *unit, const char *filename, unsigned line, c
>  int config_parse_bytes_size(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
>  int config_parse_bytes_off(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
>  int config_parse_bool(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
> +int config_parse_tristate(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
>  int config_parse_string(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
>  int config_parse_path(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
>  int config_parse_strv(const char *unit, const char *filename, unsigned line, const char *section, unsigned section_line, const char *lvalue, int ltype, const char *rvalue, void *data, void *userdata);
> --
> 1.8.5.1
>
> _______________________________________________
> systemd-devel mailing list
> systemd-devel at lists.freedesktop.org
> http://lists.freedesktop.org/mailman/listinfo/systemd-devel


More information about the systemd-devel mailing list