[systemd-devel] [PATCH] Experimental socket process pool.

david at davidstrauss.net david at davidstrauss.net
Mon Oct 21 04:48:30 PDT 2013


From: David Strauss <david at davidstrauss.net>

---
 .gitignore                                     |   1 +
 Makefile.am                                    |  13 ++
 src/socket-process-pool/Makefile               |   1 +
 src/socket-process-pool/socket-process-poold.c | 247 +++++++++++++++++++++++++
 src/socket-proxy/socket-proxyd.c               |   3 +-
 5 files changed, 264 insertions(+), 1 deletion(-)
 create mode 120000 src/socket-process-pool/Makefile
 create mode 100644 src/socket-process-pool/socket-process-poold.c

diff --git a/.gitignore b/.gitignore
index 6253e0d..d3ab8bb 100644
--- a/.gitignore
+++ b/.gitignore
@@ -67,6 +67,7 @@
 /systemd-reply-password
 /systemd-rfkill
 /systemd-run
+/systemd-socket-process-poold
 /systemd-socket-proxyd
 /systemd-shutdown
 /systemd-shutdownd
diff --git a/Makefile.am b/Makefile.am
index 379e878..41e7012 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -319,6 +319,7 @@ rootlibexec_PROGRAMS = \
 	systemd-ac-power \
 	systemd-sysctl \
 	systemd-sleep \
+	systemd-socket-process-poold \
 	systemd-socket-proxyd
 
 systemgenerator_PROGRAMS = \
@@ -3146,6 +3147,18 @@ EXTRA_DIST += \
 
 # ------------------------------------------------------------------------------
 
+systemd_socket_process_poold_SOURCES = \
+	src/socket-process-pool/socket-process-poold.c
+
+systemd_socket_process_poold_LDADD = \
+	libsystemd-shared.la \
+	libsystemd-logs.la \
+	libsystemd-journal-internal.la \
+	libsystemd-id128-internal.la \
+	libsystemd-daemon.la
+
+# ------------------------------------------------------------------------------
+
 systemd_socket_proxyd_SOURCES = \
 	src/socket-proxy/socket-proxyd.c
 
diff --git a/src/socket-process-pool/Makefile b/src/socket-process-pool/Makefile
new file mode 120000
index 0000000..d0b0e8e
--- /dev/null
+++ b/src/socket-process-pool/Makefile
@@ -0,0 +1 @@
+../Makefile
\ No newline at end of file
diff --git a/src/socket-process-pool/socket-process-poold.c b/src/socket-process-pool/socket-process-poold.c
new file mode 100644
index 0000000..57c1210
--- /dev/null
+++ b/src/socket-process-pool/socket-process-poold.c
@@ -0,0 +1,247 @@
+/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
+
+/***
+  This file is part of systemd.
+
+  Copyright 2013 David Strauss
+
+  systemd is free software; you can redistribute it and/or modify it
+  under the terms of the GNU Lesser General Public License as published by
+  the Free Software Foundation; either version 2.1 of the License, or
+  (at your option) any later version.
+
+  systemd is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public License
+  along with systemd; If not, see <http://www.gnu.org/licenses/>.
+ ***/
+
+#include <errno.h>
+#include <getopt.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/wait.h>
+
+#include "sd-daemon.h"
+#include "log.h"
+#include "util.h"
+#include "env-util.h"
+#include "strv.h"
+
+struct pool {
+        int start_listen_fd;
+        size_t num_listen_fds;
+        bool ignore_env;
+        size_t num_cores;
+        size_t procs_per_core;
+        char *path;
+        char **argp;
+        char **envp;
+};
+
+static int help(void) {
+
+        printf("%s [OPTIONS]... EXECUTABLE-PATH [ARGS-FOR-EXECUTABLE]...\n"
+               "Distribute an inherited socket to a process pool.\n\n"
+               "  -h --help        Show this help\n"
+               "  --version        Print version and exit\n"
+               /*"  --num-procs=NUM  Run an explicit number of processes\n"*/,
+               program_invocation_short_name);
+
+        return 0;
+}
+
+static void version(void) {
+        puts(PACKAGE_STRING " socket-process-poold");
+}
+
+static int parse_argv(int argc, char *argv[], struct pool *p) {
+
+        enum {
+                ARG_VERSION = 0x100
+        };
+
+        static const struct option options[] = {
+                { "help",       no_argument, NULL, 'h'           },
+                { "version",    no_argument, NULL, ARG_VERSION   },
+                { NULL,         0,           NULL, 0             }
+        };
+
+        int c;
+
+        assert(argc >= 0);
+        assert(argv);
+
+        while ((c = getopt_long(argc, argv, "h", options, NULL)) >= 0) {
+
+                switch (c) {
+
+                case 'h':
+                        help();
+                        return 0;
+
+                case '?':
+                        return -EINVAL;
+
+                case ARG_VERSION:
+                        version();
+                        return 0;
+
+                default:
+                        log_error("Unknown option code %c", c);
+                        return -EINVAL;
+                }
+        }
+
+        /* If there are no positional arguments, we have no path. */
+        if (optind == argc) {
+                log_error("An executable path must provided.");
+                help();
+                return -EINVAL;
+        }
+
+        /* The first positional argument is the path. */
+        p->path = argv[optind];
+
+        /* All positional arguments are arguments to the subprocess. */
+        p->argp = new(char*, argc - optind);
+        for (int i = 0; i < argc - optind; ++i) {
+                p->argp[i] = argv[i + optind];
+        }
+        p->argp[argc - optind] = NULL;
+
+        return 1;
+}
+
+static int fork_and_exec(struct pool *p) {
+        /* @TODO: Is there a cleaner way to do a stack allocation here? */
+        char listen_pid[strlen("LISTEN_PID=") + 11];
+        _cleanup_strv_free_ char **envp = NULL;
+        int r;
+        pid_t pid;
+        _cleanup_free_ int *fds = NULL;
+
+        log_debug("About to fork off exec with path: %s", p->path);
+
+        pid = fork();
+        if (pid < 0)
+                return -errno;
+
+        /* Do nothing if we're the parent. */
+        if (pid > 0) {
+                log_debug("Started child %lu.", (unsigned long) pid);
+                return 0;
+        }
+
+        log_debug("Now in child %lu pre-close.", (unsigned long) getpid());
+
+        /* Preserve only the socket fds. */
+        /* @TODO: Should we run log_forget_fds() here? */
+        fds = new(int, p->num_listen_fds);
+        if (fds == NULL && p->num_listen_fds > 0)
+                return log_oom();
+        for (size_t i = 0; i < p->num_listen_fds; ++i) {
+                fd_cloexec(i + p->start_listen_fd, false);
+                fds[i] = i + p->start_listen_fd;
+        }
+        r = close_all_fds(fds, p->num_listen_fds);
+        if (r < 0) {
+                log_error("Failed to close extra file descriptors.");
+                return -1;
+        }
+
+        log_debug("Now in process %lu post-close.", (unsigned long) getpid());
+
+        /* Preserve the environment, except for setting LISTEN_PID. */
+
+        r = snprintf(listen_pid, sizeof(listen_pid), "LISTEN_PID=%lu", (unsigned long) getpid());
+        if (r < 0)
+                return log_oom();
+        envp = strv_env_set(p->envp, listen_pid);
+        execve(p->path, p->argp, envp);
+
+        /* We only reach here if execve() failed. */
+        return errno;
+}
+
+static int run_pool(struct pool *p) {
+        size_t to_start;
+        int r;
+
+        to_start = p->num_cores * p->procs_per_core;
+
+        log_debug("Starting %lu processes for the pool.", to_start);
+
+        for (size_t started = 0; started < to_start; ++started) {
+                log_debug("Starting process %lu of %lu.", started, to_start);
+                r = fork_and_exec(p);
+                if (r < 0)
+                        return r;
+        }
+
+        log_debug("All pool members started.");
+
+        return 0;
+}
+
+static void wait_for_exit(size_t num_children) {
+        pid_t exited;
+        size_t remaining = num_children;
+
+        log_debug("Waiting for pool members to stop.");
+
+        while (remaining > 0) {
+                exited = waitpid(P_ALL, 0, WSTOPPED);
+                remaining--;
+                log_debug("Pool member pid=%lu has exited.", (unsigned long) exited);
+        }
+
+        log_debug("All pool members stopped.");
+}
+
+int main(int argc, char *argv[], char *envp[]) {
+        struct pool p = {};
+        _cleanup_strv_free_ char **filtered_envp = NULL;
+        int r;
+
+        log_parse_environment();
+        log_open();
+
+        p.procs_per_core = 2;
+
+        r = parse_argv(argc, argv, &p);
+        if (r <= 0)
+                goto finish;
+
+        p.num_cores = sysconf(_SC_NPROCESSORS_ONLN);
+        log_info("Running %lu processes for each of %lu logical CPUs.", p.procs_per_core, p.num_cores);
+
+        /* Import the environment, except for LISTEN_PID. */
+        filtered_envp = strv_env_merge(1, envp, NULL);
+        filtered_envp = strv_env_unset(filtered_envp, "LISTEN_PID");
+        p.envp = filtered_envp;
+
+        /* Determine the set of inherited fds. */
+        p.start_listen_fd = SD_LISTEN_FDS_START;
+        r = sd_listen_fds(1);
+        if (r < 0) {
+                log_error("Error %d while finding inheritable sockets: %s", r, strerror(-r));
+                goto finish;
+        }
+        p.num_listen_fds = r;
+
+        log_info("Starting the process pool with fd=%d through fd=%lu.", p.start_listen_fd, p.start_listen_fd + p.num_listen_fds - 1);
+
+        r = run_pool(&p);
+
+        wait_for_exit(p.num_cores * p.procs_per_core);
+
+finish:
+        strv_free(p.argp);
+        return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS;
+}
diff --git a/src/socket-proxy/socket-proxyd.c b/src/socket-proxy/socket-proxyd.c
index d64b0d2..525f8bd 100644
--- a/src/socket-proxy/socket-proxyd.c
+++ b/src/socket-proxy/socket-proxyd.c
@@ -66,7 +66,8 @@ struct connection {
 static void free_connection(struct connection *c) {
         log_debug("Freeing fd=%d (conn %p).", c->fd, c);
         sd_event_source_unref(c->w);
-        close_nointr_nofail(c->fd);
+        if (c->fd > 0)
+                close_nointr_nofail(c->fd);
         free(c);
 }
 
-- 
1.8.3.1



More information about the systemd-devel mailing list