[systemd-devel] [PATCH 3/3 RFC] log: try to repeat sendmsg a few times

Zbigniew Jędrzejewski-Szmek zbyszek at in.waw.pl
Thu Oct 18 07:12:13 PDT 2012


Queue size for unix datagrams is set globally and can be quite small
(e.g. 10). This means that if journald is not keeping up, it is very
easy to fill up the queue and loose messages. To alleviate this
problem, messages are queued on the sender side, and pushed out
repeatedly with sendmmsg. A repeat with a sleep is used to give
journald a chance to process more messages. If log_close() is called,
a bigger delay is used, to try to flush messages.
---
 Makefile.am               |   7 ++
 src/shared/log.c          | 261 ++++++++++++++++++++++++++++++++++------------
 src/test/test-log-multi.c |  58 +++++++++++
 src/test/test-log.c       |   4 +-
 4 files changed, 264 insertions(+), 66 deletions(-)
 create mode 100644 src/test/test-log-multi.c

diff --git a/Makefile.am b/Makefile.am
index 4c65076..b8fc7a1 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -1168,6 +1168,7 @@ noinst_PROGRAMS += \
 	test-watchdog \
 	test-unit-name \
 	test-log \
+	test-log-multi \
 	test-unit-file \
 	test-date
 
@@ -1239,6 +1240,12 @@ test_log_SOURCES = \
 test_log_LDADD = \
 	libsystemd-core.la
 
+test_log_multi_SOURCES = \
+	src/test/test-log-multi.c
+
+test_log_multi_LDADD = \
+	libsystemd-core.la
+
 test_date_SOURCES = \
 	src/test/test-date.c
 
diff --git a/src/shared/log.c b/src/shared/log.c
index 6357868..c40d8f0 100644
--- a/src/shared/log.c
+++ b/src/shared/log.c
@@ -37,6 +37,13 @@
 
 #define SNDBUF_SIZE (8*1024*1024)
 
+typedef struct mmsg_info {
+        struct mmsghdr *vec;
+        int size; /* size (in mmsghdrs) of vec */
+        int offset; /* first used mmsghdr */
+        int used; /* number of available mmsghdrs */
+} mmsg_info;
+
 static LogTarget log_target = LOG_TARGET_CONSOLE;
 static int log_max_level = LOG_INFO;
 static int log_facility = LOG_DAEMON;
@@ -45,6 +52,7 @@ static int console_fd = STDERR_FILENO;
 static int syslog_fd = -1;
 static int kmsg_fd = -1;
 static int journal_fd = -1;
+static mmsg_info journal_mmsg_info;
 
 static bool syslog_is_stream = false;
 
@@ -55,6 +63,97 @@ static bool show_location = false;
  * use here. */
 static char *log_abort_msg = NULL;
 
+#define MSG_QUEUE_LEN 1024
+
+static struct msghdr *msg_new(mmsg_info *info, int max_size) {
+        int pos;
+        struct msghdr *mh;
+
+        assert(max_size >= 1);
+        assert(info);
+
+        if (info->used == info->size) {
+                struct mmsghdr *vec;
+                int size, wrapped;
+
+                size = MAX(info->size*2, 1);
+                if (size > max_size)
+                        return NULL;
+
+                vec = realloc(info->vec, sizeof(struct mmsghdr) * size);
+                if (vec == NULL)
+                        return NULL;
+
+                /* move wrapped part to the end */
+                wrapped = (info->offset + info->used) - info->size;
+                if (wrapped > 0)
+                        memcpy(vec + info->size, vec,
+                               sizeof(struct mmsghdr) * wrapped);
+
+                info->vec = vec;
+                info->size = size;
+        }
+
+        assert(info->used < info->size);
+
+        pos = (info->offset + info->used++) % info->size;
+        zero(info->vec[pos].msg_hdr);
+        mh = &info->vec[pos].msg_hdr;
+
+        return mh;
+}
+
+static const char const* newline = "\n";
+
+static void free_iovec(struct iovec *iovec, int iovlen) {
+        int i;
+
+        for (i = 1; i < iovlen; i += 2)
+                if (iovec[i].iov_base != newline)
+                        free(iovec[i].iov_base);
+        free(iovec);
+}
+
+static int msg_send(int fd, mmsg_info *info, int tries, useconds_t sleep_usec) {
+        int sent = 0;
+
+        assert(info);
+        assert(info->used <= info->size);
+        assert(info->offset < info->size || info->offset == 0);
+
+        while(info->used) {
+                struct mmsghdr *mmh = info->vec + info->offset;
+                int vlen = MIN(info->size - info->offset, info->used);
+                int j;
+
+                int r = sendmmsg(fd, mmh, vlen, MSG_NOSIGNAL);
+                if (r < 0) {
+                        if (errno==EAGAIN || errno==EWOULDBLOCK) {
+                                if (--tries) {
+                                        usleep(sleep_usec);
+                                        continue;
+                                } else
+                                        break;
+                        }
+
+                        /* drop message */
+                        info->used--;
+                        info->offset = (info->offset + 1) % info->size;
+                        free_iovec(mmh->msg_hdr.msg_iov,
+                                   mmh->msg_hdr.msg_iovlen);
+                        return -errno;
+                }
+
+                sent += r;
+                info->used -= r;
+                info->offset = (info->offset + r) % info->size;
+                for (j = 0; j < r; j++, mmh++)
+                        free_iovec(mmh->msg_hdr.msg_iov, mmh->msg_hdr.msg_iovlen);
+        }
+
+        return sent;
+}
+
 void log_close_console(void) {
 
         if (console_fd < 0)
@@ -174,10 +273,20 @@ fail:
 }
 
 void log_close_journal(void) {
+        struct mmsghdr *mmh;
+        int i;
 
         if (journal_fd < 0)
                 return;
 
+        msg_send(journal_fd, &journal_mmsg_info, 100, 10 * USEC_PER_MSEC);
+
+        mmh = journal_mmsg_info.vec + journal_mmsg_info.offset;
+        for(i = 0; i < journal_mmsg_info.used; i++, mmh++)
+                free_iovec(mmh->msg_hdr.msg_iov, mmh->msg_hdr.msg_iovlen);
+        free(journal_mmsg_info.vec);
+        zero(journal_mmsg_info);
+
         close_nointr_nofail(journal_fd);
         journal_fd = -1;
 }
@@ -199,7 +308,9 @@ static int log_open_journal(void) {
         sa.un.sun_family = AF_UNIX;
         strncpy(sa.un.sun_path, "/run/systemd/journal/socket", sizeof(sa.un.sun_path));
 
-        if (connect(journal_fd, &sa.sa, offsetof(struct sockaddr_un, sun_path) + strlen(sa.un.sun_path)) < 0) {
+        r = connect(journal_fd, &sa.sa,
+                    offsetof(struct sockaddr_un, sun_path) + strlen(sa.un.sun_path));
+        if (r < 0) {
                 r = -errno;
                 goto fail;
         }
@@ -441,43 +552,55 @@ static int write_to_journal(
         const char *func,
         const char *buffer) {
 
-        char header[LINE_MAX];
-        struct iovec iovec[3];
-        struct msghdr mh;
+        int r;
+        char *header = NULL, *buffercopy = NULL;
+        const int iovec_size = 3;
+        struct iovec *iovec = NULL;
+        struct msghdr *mh;
 
         if (journal_fd < 0)
                 return 0;
 
-        snprintf(header, sizeof(header),
-                 "PRIORITY=%i\n"
-                 "SYSLOG_FACILITY=%i\n"
-                 "CODE_FILE=%s\n"
-                 "CODE_LINE=%i\n"
-                 "CODE_FUNCTION=%s\n"
-                 "SYSLOG_IDENTIFIER=%s\n"
-                 "MESSAGE=",
-                 LOG_PRI(level),
-                 LOG_FAC(level),
-                 file,
-                 line,
-                 func,
-                 program_invocation_short_name);
-
-        char_array_0(header);
+        r = asprintf(&header,
+                     "PRIORITY=%i\n"
+                     "SYSLOG_FACILITY=%i\n"
+                     "CODE_FILE=%s\n"
+                     "CODE_LINE=%i\n"
+                     "CODE_FUNCTION=%s\n"
+                     "SYSLOG_IDENTIFIER=%s\n"
+                     "MESSAGE=",
+                     LOG_PRI(level),
+                     LOG_FAC(level),
+                     file,
+                     line,
+                     func,
+                     program_invocation_short_name);
+
+        iovec = calloc(sizeof(struct iovec), iovec_size);
+
+        buffercopy = strdup(buffer);
+
+        if (r < 0 || !iovec || !buffercopy) {
+                free(header);
+                free(iovec);
+                free(buffercopy);
+                return -ENOMEM;
+        }
 
-        zero(iovec);
         IOVEC_SET_STRING(iovec[0], header);
-        IOVEC_SET_STRING(iovec[1], buffer);
-        IOVEC_SET_STRING(iovec[2], "\n");
+        IOVEC_SET_STRING(iovec[1], buffercopy);
+        IOVEC_SET_STRING(iovec[2], newline);
 
-        zero(mh);
-        mh.msg_iov = iovec;
-        mh.msg_iovlen = ELEMENTSOF(iovec);
+        mh = msg_new(&journal_mmsg_info, MSG_QUEUE_LEN);
+        if (!mh) {
+                free_iovec(iovec, iovec_size);
+                return -ENOMEM;
+        }
 
-        if (sendmsg(journal_fd, &mh, MSG_NOSIGNAL) < 0)
-                return -errno;
+        mh->msg_iov = iovec;
+        mh->msg_iovlen = iovec_size;
 
-        return 1;
+        return msg_send(journal_fd, &journal_mmsg_info, 1, 0);
 }
 
 static int log_dispatch(
@@ -677,34 +800,41 @@ int log_struct_internal(
              log_target == LOG_TARGET_JOURNAL) &&
             journal_fd >= 0) {
 
-                char header[LINE_MAX];
-                struct iovec iovec[17];
-                unsigned n = 0, i;
-                struct msghdr mh;
-                const char nl = '\n';
+                char *header = NULL;
+                struct iovec *iovec = NULL;
+                unsigned n = 0;
+                struct msghdr *mh;
+                const unsigned iovec_size = 17;
 
                 /* If the journal is available do structured logging */
 
-                snprintf(header, sizeof(header),
-                        "PRIORITY=%i\n"
-                        "SYSLOG_FACILITY=%i\n"
-                        "CODE_FILE=%s\n"
-                        "CODE_LINE=%i\n"
-                        "CODE_FUNCTION=%s\n"
-                        "SYSLOG_IDENTIFIER=%s\n",
-                        LOG_PRI(level),
-                        LOG_FAC(level),
-                        file,
-                        line,
-                        func,
-                        program_invocation_short_name);
-                char_array_0(header);
-
-                zero(iovec);
+                r = asprintf(&header,
+                             "PRIORITY=%i\n"
+                             "SYSLOG_FACILITY=%i\n"
+                             "CODE_FILE=%s\n"
+                             "CODE_LINE=%i\n"
+                             "CODE_FUNCTION=%s\n"
+                             "SYSLOG_IDENTIFIER=%s\n",
+                             LOG_PRI(level),
+                             LOG_FAC(level),
+                             file,
+                             line,
+                             func,
+                             program_invocation_short_name);
+
+                iovec = calloc(sizeof(struct iovec), iovec_size);
+
+                if(r < 0 || !iovec) {
+                        free(iovec);
+                        free(header);
+                        r = -ENOMEM;
+                        goto finish;
+                }
+
                 IOVEC_SET_STRING(iovec[n++], header);
 
                 va_start(ap, format);
-                while (format && n + 1 < ELEMENTSOF(iovec)) {
+                while (format && n + 1 < iovec_size) {
                         char *buf;
                         va_list aq;
 
@@ -725,27 +855,25 @@ int log_struct_internal(
                         VA_FORMAT_ADVANCE(format, ap);
 
                         IOVEC_SET_STRING(iovec[n++], buf);
-
-                        iovec[n].iov_base = (char*) &nl;
-                        iovec[n].iov_len = 1;
-                        n++;
+                        IOVEC_SET_STRING(iovec[n++], newline);
 
                         format = va_arg(ap, char *);
                 }
 
-                zero(mh);
-                mh.msg_iov = iovec;
-                mh.msg_iovlen = n;
+                mh = msg_new(&journal_mmsg_info, MSG_QUEUE_LEN);
+                if (!mh) {
+                        free_iovec(iovec, n);
+                        r = -ENOMEM;
+                        goto finish;
+                }
+
+                mh->msg_iov = iovec;
+                mh->msg_iovlen = n;
 
-                if (sendmsg(journal_fd, &mh, MSG_NOSIGNAL) < 0)
-                        r = -errno;
-                else
-                        r = 1;
+                r = msg_send(journal_fd, &journal_mmsg_info, 1, 0);
 
         finish:
                 va_end(ap);
-                for (i = 1; i < n; i += 2)
-                        free(iovec[i].iov_base);
 
         } else {
                 char buf[LINE_MAX];
@@ -780,6 +908,11 @@ int log_struct_internal(
         }
 
         errno = saved_errno;
+        if(r < 0) {
+                char buf[LINE_MAX];
+                snprintf(buf, sizeof(buf), "dispatch failed %d/%s", r, strerror(-r));
+                log_dispatch(level, file, line, func, buf);
+        }
         return r;
 }
 
diff --git a/src/test/test-log-multi.c b/src/test/test-log-multi.c
new file mode 100644
index 0000000..0f02afc
--- /dev/null
+++ b/src/test/test-log-multi.c
@@ -0,0 +1,58 @@
+/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
+
+/***
+  This file is part of systemd.
+
+  Copyright 2012 Lennart Poettering
+
+  systemd is free software; you can redistribute it and/or modify it
+  under the terms of the GNU Lesser General Public License as published by
+  the Free Software Foundation; either version 2.1 of the License, or
+  (at your option) any later version.
+
+  systemd is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public License
+  along with systemd; If not, see <http://www.gnu.org/licenses/>.
+***/
+
+#include <stddef.h>
+#include <unistd.h>
+
+#include "log.h"
+#include "util.h"
+
+#define _filler " filler filler filler filler filler filler filler filler filler"
+#define filler _filler _filler _filler _filler _filler _filler _filler _filler
+#define filler2 filler filler filler filler filler filler filler filler filler filler filler filler
+
+int main(int argc, char* argv[]) {
+        unsigned i, limit = 100;
+        int r;
+
+        switch(argc) {
+        case 1: break;
+        case 2:
+                r = safe_atou(argv[1], &limit);
+                if (!r)
+                        break;
+        default:
+                log_error("wrong number or bad options");
+                return 1;
+        }
+
+        log_set_target(LOG_TARGET_JOURNAL);
+        log_open();
+
+        for(i = 0; i < limit; i++)
+                log_struct(LOG_NOTICE,
+                           "MESSAGE=line %d", i,
+                           "FILLER=%s", filler2,
+                           NULL);
+
+        log_close();
+        return 0;
+}
diff --git a/src/test/test-log.c b/src/test/test-log.c
index 8dc3d53..fb73fc5 100644
--- a/src/test/test-log.c
+++ b/src/test/test-log.c
@@ -37,12 +37,12 @@ int main(int argc, char* argv[]) {
         log_set_target(LOG_TARGET_JOURNAL);
         log_open();
 
-        log_struct(LOG_INFO,
+        log_struct(LOG_NOTICE,
                    "MESSAGE=Foobar PID=%lu", (unsigned long) getpid(),
                    "SERVICE=foobar",
                    NULL);
 
-        log_struct(LOG_INFO,
+        log_struct(LOG_WARNING,
                    "MESSAGE=Foobar PID=%lu", (unsigned long) getpid(),
                    "FORMAT_STR_TEST=1=%i A=%c 2=%hi 3=%li 4=%lli 1=%p foo=%s 2.5=%g 3.5=%g 4.5=%Lg",
                    (int) 1, 'A', (short) 2, (long int) 3, (long long int) 4, (void*) 1, "foo", (float) 2.5f, (double) 3.5, (long double) 4.5,
-- 
1.7.12.rc1.173.g9b27acc



More information about the systemd-devel mailing list