[systemd-devel] [PATCH] [RFC] Add sync timer to journal server

Oleksii Shevchuk alxchk at gmail.com
Sat Mar 16 08:59:34 PDT 2013


Sync journal with fdatasync after 10s of inactivity (by default), or
after 10 messages (by default). Intervals configured
via SyncIntervalSec and SyncIntervalMsg options at journal.conf.
Manual sync can be performed via sending SIGALRM.
---
 src/journal/journal-file.c       |  13 ++++-
 src/journal/journal-file.h       |   1 +
 src/journal/journald-gperf.gperf |   2 +
 src/journal/journald-server.c    | 116 +++++++++++++++++++++++++++++++++++++--
 src/journal/journald-server.h    |   7 +++
 src/journal/journald.conf        |   2 +
 6 files changed, 135 insertions(+), 6 deletions(-)

diff --git a/src/journal/journal-file.c b/src/journal/journal-file.c
index 13fc8ed..42a410a 100644
--- a/src/journal/journal-file.c
+++ b/src/journal/journal-file.c
@@ -68,6 +68,16 @@
 /* How many entries to keep in the entry array chain cache at max */
 #define CHAIN_CACHE_MAX 20
 
+int journal_file_sync(JournalFile *f) {
+        if (! (f->writable && f->fd >= 0))
+                return -1;
+
+        if (fdatasync(f->fd))
+                return -errno;
+
+        return 0;
+}
+
 void journal_file_close(JournalFile *f) {
         assert(f);
 
@@ -81,8 +91,7 @@ void journal_file_close(JournalFile *f) {
         if (f->mmap && f->fd >= 0)
                 mmap_cache_close_fd(f->mmap, f->fd);
 
-        if (f->writable && f->fd >= 0)
-                fdatasync(f->fd);
+        journal_file_sync(f);
 
         if (f->header) {
                 /* Mark the file offline. Don't override the archived state if it already is set */
diff --git a/src/journal/journal-file.h b/src/journal/journal-file.h
index cdbc8e4..717d64c 100644
--- a/src/journal/journal-file.h
+++ b/src/journal/journal-file.h
@@ -106,6 +106,7 @@ int journal_file_open(
                 JournalFile *template,
                 JournalFile **ret);
 
+int journal_file_sync(JournalFile *f);
 void journal_file_close(JournalFile *j);
 
 int journal_file_open_reliably(
diff --git a/src/journal/journald-gperf.gperf b/src/journal/journald-gperf.gperf
index 1baef14..fe4958f 100644
--- a/src/journal/journald-gperf.gperf
+++ b/src/journal/journald-gperf.gperf
@@ -18,6 +18,8 @@ struct ConfigPerfItem;
 Journal.Storage,            config_parse_storage,   0, offsetof(Server, storage)
 Journal.Compress,           config_parse_bool,      0, offsetof(Server, compress)
 Journal.Seal,               config_parse_bool,      0, offsetof(Server, seal)
+Journal.SyncIntervalSec,    config_parse_usec,      0, offsetof(Server, sync_interval_usec)
+Journal.SyncIntervalMsg,    config_parse_unsigned,  0, offsetof(Server, sync_interval_msg)
 Journal.RateLimitInterval,  config_parse_usec,      0, offsetof(Server, rate_limit_interval)
 Journal.RateLimitBurst,     config_parse_unsigned,  0, offsetof(Server, rate_limit_burst)
 Journal.SystemMaxUse,       config_parse_bytes_off, 0, offsetof(Server, system_metrics.max_use)
diff --git a/src/journal/journald-server.c b/src/journal/journald-server.c
index ac565c7..fada229 100644
--- a/src/journal/journald-server.c
+++ b/src/journal/journald-server.c
@@ -67,6 +67,8 @@
 
 #define USER_JOURNALS_MAX 1024
 
+#define DEFAULT_SYNC_INTERVAL_USEC (10*USEC_PER_SEC)
+#define DEFAULT_SYNC_INTERVAL_MSG 10
 #define DEFAULT_RATE_LIMIT_INTERVAL (10*USEC_PER_SEC)
 #define DEFAULT_RATE_LIMIT_BURST 200
 
@@ -344,6 +346,23 @@ void server_rotate(Server *s) {
         }
 }
 
+void server_sync(Server *s) {
+        JournalFile *f;
+        void *k;
+        Iterator i;
+        int r;
+
+        r = journal_file_sync(s->runtime_journal);
+        if (r < 0)
+                log_error("Failed to sync journal: %s", strerror(-r));
+
+        HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) {
+                r = journal_file_sync(f);
+                if (r < 0)
+                        log_error("Failed to sync user journal: %s", strerror(-r));
+        }
+}
+
 void server_vacuum(Server *s) {
         char *p;
         char ids[33];
@@ -475,8 +494,10 @@ static void write_to_journal(Server *s, uid_t uid, struct iovec *iovec, unsigned
         }
 
         r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL);
-        if (r >= 0)
+        if (r >= 0) {
+                server_shedule_sync(s);
                 return;
+        }
 
         if (vacuumed || !shall_try_append_again(f, r)) {
                 log_error("Failed to write entry, ignoring: %s", strerror(-r));
@@ -991,8 +1012,6 @@ int process_event(Server *s, struct epoll_event *ev) {
                         return -errno;
                 }
 
-                log_info("Received SIG%s", signal_to_string(sfsi.ssi_signo));
-
                 if (sfsi.ssi_signo == SIGUSR1) {
                         touch("/run/systemd/journal/flushed");
                         server_flush_to_var(s);
@@ -1005,6 +1024,13 @@ int process_event(Server *s, struct epoll_event *ev) {
                         return 1;
                 }
 
+                if (sfsi.ssi_signo == SIGALRM) {
+                        server_sync(s);
+                        return 1;
+                }
+
+                log_info("Received SIG%s", signal_to_string(sfsi.ssi_signo));
+
                 return 0;
 
         } else if (ev->data.fd == s->dev_kmsg_fd) {
@@ -1194,7 +1220,7 @@ static int open_signalfd(Server *s) {
         assert(s);
 
         assert_se(sigemptyset(&mask) == 0);
-        sigset_add_many(&mask, SIGINT, SIGTERM, SIGUSR1, SIGUSR2, -1);
+        sigset_add_many(&mask, SIGINT, SIGTERM, SIGUSR1, SIGUSR2, SIGALRM, -1);
         assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
 
         s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
@@ -1286,6 +1312,77 @@ static int server_parse_config_file(Server *s) {
         return r;
 }
 
+static int server_create_sync_timer(Server *s) {
+        int r;
+
+        struct sigevent sync_timer = {
+                .sigev_notify = SIGEV_SIGNAL,
+                .sigev_signo  = SIGALRM,
+        };
+
+        assert(s);
+
+        r = timer_create(CLOCK_MONOTONIC, &sync_timer, &s->sync_timer_id);
+        if (r < 0)
+                return -errno;
+
+        s->sync_msg_counter = 0;
+
+        return 0;
+}
+
+static int server_destroy_sync_timer(Server *s) {
+        int r;
+
+        assert(s);
+
+        r = timer_delete(s->sync_timer_id);
+        if (r < 0)
+                return -errno;
+
+        return 0;
+}
+
+
+int server_shedule_sync(Server *s) {
+        int r;
+
+        struct itimerspec sync_timer_disable = {
+                .it_value.tv_sec = 0,
+                .it_value.tv_nsec = 0,
+                .it_interval.tv_sec = 0,
+                .it_interval.tv_nsec = 0,
+        };
+
+        struct itimerspec sync_timer_enable = {
+                .it_value.tv_sec = s->sync_interval_usec / USEC_PER_SEC,
+                .it_value.tv_nsec = s->sync_interval_usec % MSEC_PER_SEC,
+                .it_interval.tv_sec = 0,
+                .it_interval.tv_nsec = 0,
+        };
+
+        assert(s);
+        assert(s->sync_timer_id);
+
+        if (++ s->sync_msg_counter > s->sync_interval_msg) {
+                server_sync(s);
+                s->sync_msg_counter = 0;
+                r = timer_settime(s->sync_timer_id, 0, &sync_timer_disable, NULL);
+                if (r < 0)
+                        return -errno;
+
+                return 0;
+        }
+
+        r = timer_settime(s->sync_timer_id, 0, &sync_timer_enable, NULL);
+        if (r < 0) {
+                timer_delete(s->sync_timer_id);
+                return -errno;
+        }
+
+        return 0;
+}
+
 int server_init(Server *s) {
         int n, r, fd;
 
@@ -1296,6 +1393,9 @@ int server_init(Server *s) {
         s->compress = true;
         s->seal = true;
 
+        s->sync_interval_usec = DEFAULT_SYNC_INTERVAL_USEC;
+        s->sync_interval_msg = DEFAULT_SYNC_INTERVAL_MSG;
+
         s->rate_limit_interval = DEFAULT_RATE_LIMIT_INTERVAL;
         s->rate_limit_burst = DEFAULT_RATE_LIMIT_BURST;
 
@@ -1306,6 +1406,10 @@ int server_init(Server *s) {
         s->max_level_kmsg = LOG_NOTICE;
         s->max_level_console = LOG_INFO;
 
+        r = server_create_sync_timer(s);
+        if (r < 0)
+                return r;
+
         memset(&s->system_metrics, 0xFF, sizeof(s->system_metrics));
         memset(&s->runtime_metrics, 0xFF, sizeof(s->runtime_metrics));
 
@@ -1318,6 +1422,8 @@ int server_init(Server *s) {
                 s->rate_limit_interval = s->rate_limit_burst = 0;
         }
 
+        log_debug("Setting sync interval to %" PRIu64, s->sync_interval_usec);
+
         mkdir_p("/run/systemd/journal", 0755);
 
         s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func);
@@ -1481,4 +1587,6 @@ void server_done(Server *s) {
 
         if (s->udev)
                 udev_unref(s->udev);
+
+        server_destroy_sync_timer(s);
 }
diff --git a/src/journal/journald-server.h b/src/journal/journald-server.h
index 9f50a29..597fd53 100644
--- a/src/journal/journald-server.h
+++ b/src/journal/journald-server.h
@@ -71,6 +71,8 @@ typedef struct Server {
         size_t buffer_size;
 
         JournalRateLimit *rate_limit;
+        usec_t sync_interval_usec;
+        unsigned sync_interval_msg;
         usec_t rate_limit_interval;
         unsigned rate_limit_burst;
 
@@ -119,6 +121,9 @@ typedef struct Server {
         uint64_t *kernel_seqnum;
 
         struct udev *udev;
+
+        timer_t sync_timer_id;
+        unsigned sync_msg_counter;
 } Server;
 
 #define N_IOVEC_META_FIELDS 17
@@ -145,8 +150,10 @@ void server_fix_perms(Server *s, JournalFile *f, uid_t uid);
 bool shall_try_append_again(JournalFile *f, int r);
 int server_init(Server *s);
 void server_done(Server *s);
+void server_sync(Server *s);
 void server_vacuum(Server *s);
 void server_rotate(Server *s);
+int server_shedule_sync(Server *s);
 int server_flush_to_var(Server *s);
 int process_event(Server *s, struct epoll_event *ev);
 void server_maybe_append_tags(Server *s);
diff --git a/src/journal/journald.conf b/src/journal/journald.conf
index 948318b..2ba2b64 100644
--- a/src/journal/journald.conf
+++ b/src/journal/journald.conf
@@ -12,6 +12,8 @@
 #Compress=yes
 #Seal=yes
 #SplitMode=login
+#SyncIntervalSec=10s
+#SyncIntervalSec=10
 #RateLimitInterval=10s
 #RateLimitBurst=200
 #SystemMaxUse=
-- 
1.8.1.2



More information about the systemd-devel mailing list