[systemd-devel] [PATCH] [RFCv3] Add sync timer to journal server

Zbigniew Jędrzejewski-Szmek zbyszek at in.waw.pl
Mon Mar 18 16:37:28 PDT 2013


On Mon, Mar 18, 2013 at 10:17:25PM +0200, Oleksii Shevchuk wrote:
> Sync journal with fdatasync after 10s of inactivity (by default), or
> after 10 messages (by default). Intervals configured
> via SyncIntervalSec and SyncIntervalMsg options at journal.conf.
> Manual sync can be performed via sending SIGUSR1.
> ---
>  src/journal/journal-file.c       |  25 +++++++-
>  src/journal/journal-file.h       |   1 +
>  src/journal/journald-gperf.gperf |   2 +
>  src/journal/journald-server.c    | 126 +++++++++++++++++++++++++++++++++++++--
>  src/journal/journald-server.h    |   7 +++
>  src/journal/journald.conf        |   2 +
>  6 files changed, 157 insertions(+), 6 deletions(-)
> 
> diff --git a/src/journal/journal-file.c b/src/journal/journal-file.c
> index 13fc8ed..2637f88 100644
> --- a/src/journal/journal-file.c
> +++ b/src/journal/journal-file.c
> @@ -68,6 +68,22 @@
>  /* How many entries to keep in the entry array chain cache at max */
>  #define CHAIN_CACHE_MAX 20
>  
> +int journal_file_sync(JournalFile *f) {
> +        assert(f);
> +
> +        if (! (f->writable && f->fd >= 0))
> +                return -1;
-1 means EPERM. Something different is needed.

> +        if (fdatasync(f->fd))
> +                return -errno;
> +
> +        f->header->state = STATE_OFFLINE;
> +
> +        fdatasync(f->fd);
Apparently the only error code which could happen here is EIO.
Are you sure that we want to ignore it?

> +        return 0;
> +}
> +
>  void journal_file_close(JournalFile *f) {
>          assert(f);
>  
> @@ -81,8 +97,7 @@ void journal_file_close(JournalFile *f) {
>          if (f->mmap && f->fd >= 0)
>                  mmap_cache_close_fd(f->mmap, f->fd);
>  
> -        if (f->writable && f->fd >= 0)
> -                fdatasync(f->fd);
> +        journal_file_sync(f);
>  
>          if (f->header) {
>                  /* Mark the file offline. Don't override the archived state if it already is set */
> @@ -457,6 +472,9 @@ int journal_file_append_object(JournalFile *f, int type, uint64_t size, Object *
>          assert(offset);
>          assert(ret);
>  
> +        if (f->header->state == STATE_OFFLINE)
> +                f->header->state = STATE_ONLINE;
> +
>          p = le64toh(f->header->tail_object_offset);
>          if (p == 0)
>                  p = le64toh(f->header->header_size);
> @@ -1270,6 +1288,9 @@ int journal_file_append_entry(JournalFile *f, const dual_timestamp *ts, const st
>          if (!f->writable)
>                  return -EPERM;
>  
> +        if (f->header->state == STATE_OFFLINE)
> +                f->header->state = STATE_ONLINE;
> +
>          if (!ts) {
>                  dual_timestamp_get(&_ts);
>                  ts = &_ts;
> diff --git a/src/journal/journal-file.h b/src/journal/journal-file.h
> index cdbc8e4..717d64c 100644
> --- a/src/journal/journal-file.h
> +++ b/src/journal/journal-file.h
> @@ -106,6 +106,7 @@ int journal_file_open(
>                  JournalFile *template,
>                  JournalFile **ret);
>  
> +int journal_file_sync(JournalFile *f);
>  void journal_file_close(JournalFile *j);
>  
>  int journal_file_open_reliably(
> diff --git a/src/journal/journald-gperf.gperf b/src/journal/journald-gperf.gperf
> index 1baef14..fe4958f 100644
> --- a/src/journal/journald-gperf.gperf
> +++ b/src/journal/journald-gperf.gperf
> @@ -18,6 +18,8 @@ struct ConfigPerfItem;
>  Journal.Storage,            config_parse_storage,   0, offsetof(Server, storage)
>  Journal.Compress,           config_parse_bool,      0, offsetof(Server, compress)
>  Journal.Seal,               config_parse_bool,      0, offsetof(Server, seal)
> +Journal.SyncIntervalSec,    config_parse_usec,      0, offsetof(Server, sync_interval_usec)
> +Journal.SyncIntervalMsg,    config_parse_unsigned,  0, offsetof(Server, sync_interval_msg)
>  Journal.RateLimitInterval,  config_parse_usec,      0, offsetof(Server, rate_limit_interval)
>  Journal.RateLimitBurst,     config_parse_unsigned,  0, offsetof(Server, rate_limit_burst)
>  Journal.SystemMaxUse,       config_parse_bytes_off, 0, offsetof(Server, system_metrics.max_use)
> diff --git a/src/journal/journald-server.c b/src/journal/journald-server.c
> index ac565c7..050cdee 100644
> --- a/src/journal/journald-server.c
> +++ b/src/journal/journald-server.c
> @@ -24,6 +24,7 @@
>  #include <linux/sockios.h>
>  #include <sys/statvfs.h>
>  #include <sys/mman.h>
> +#include <sys/timerfd.h>
>  
>  #include <libudev.h>
>  #include <systemd/sd-journal.h>
> @@ -67,6 +68,8 @@
>  
>  #define USER_JOURNALS_MAX 1024
>  
> +#define DEFAULT_SYNC_INTERVAL_USEC (10*USEC_PER_SEC)
> +#define DEFAULT_SYNC_INTERVAL_MSG 10
>  #define DEFAULT_RATE_LIMIT_INTERVAL (10*USEC_PER_SEC)
>  #define DEFAULT_RATE_LIMIT_BURST 200
>  
> @@ -344,6 +347,33 @@ void server_rotate(Server *s) {
>          }
>  }
>  
> +void server_sync(Server *s) {
> +        JournalFile *f;
> +        void *k;
> +        Iterator i;
> +        int r;
> +
> +        if (s->runtime_journal) {
> +                r = journal_file_sync(s->runtime_journal);
> +                if (r < 0)
> +                        log_error("Failed to sync runtime journal: %s", strerror(-r));
> +        }
> +
> +        if (s->system_journal) {
> +                r = journal_file_sync(s->system_journal);
> +                if (r < 0)
> +                        log_error("Failed to sync system journal: %s", strerror(-r));
> +        }
> +
> +        HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) {
> +                r = journal_file_sync(f);
> +                if (r < 0)
> +                        log_error("Failed to sync user journal: %s", strerror(-r));
> +        }
> +
> +        s->sync_msg_counter = 0;
> +}
> +
>  void server_vacuum(Server *s) {
>          char *p;
>          char ids[33];
> @@ -475,8 +505,10 @@ static void write_to_journal(Server *s, uid_t uid, struct iovec *iovec, unsigned
>          }
>  
>          r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL);
> -        if (r >= 0)
> +        if (r >= 0) {
> +                server_schedule_sync(s);
>                  return;
> +        }
>  
>          if (vacuumed || !shall_try_append_again(f, r)) {
>                  log_error("Failed to write entry, ignoring: %s", strerror(-r));
> @@ -991,11 +1023,10 @@ int process_event(Server *s, struct epoll_event *ev) {
>                          return -errno;
>                  }
>  
> -                log_info("Received SIG%s", signal_to_string(sfsi.ssi_signo));
> -
>                  if (sfsi.ssi_signo == SIGUSR1) {
>                          touch("/run/systemd/journal/flushed");
>                          server_flush_to_var(s);
> +                        server_sync(s);
>                          return 1;
>                  }
>  
> @@ -1005,8 +1036,23 @@ int process_event(Server *s, struct epoll_event *ev) {
>                          return 1;
>                  }
>  
> +                log_info("Received SIG%s", signal_to_string(sfsi.ssi_signo));
> +
>                  return 0;
>  
> +        } else if (ev->data.fd == s->sync_timer_fd) {
> +                int r;
> +                uint64_t t;
> +
> +                log_debug("Got sync request from epoll.");
> +
> +                r = read(ev->data.fd, (void *)&t, sizeof(t));
> +                if (r < 0)
> +                        return 0;
> +
> +                server_sync(s);
> +                return 1;
> +
>          } else if (ev->data.fd == s->dev_kmsg_fd) {
>                  int r;
>  
> @@ -1286,21 +1332,84 @@ static int server_parse_config_file(Server *s) {
>          return r;
>  }
>  
> +static int server_open_sync_timer(Server *s) {
> +        struct epoll_event ev;
> +
> +        assert(s);
> +
> +        s->sync_timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
> +        if (s->sync_timer_fd < 0)
> +                return -errno;
> +
> +        zero(ev);
> +        ev.events = EPOLLIN;
> +        ev.data.fd = s->sync_timer_fd;
> +        if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->sync_timer_fd, &ev) < 0) {
> +                log_error("Failed to add timer fd to epoll object: %m");
> +                return -errno;
> +        }
> +
> +        return 0;
> +}
> +
> +int server_schedule_sync(Server *s) {
> +        int r;
> +
> +        static struct itimerspec sync_timer_disable = {
> +                .it_value.tv_sec = 0,
> +                .it_value.tv_nsec = 0,
> +                .it_interval.tv_sec = 0,
> +                .it_interval.tv_nsec = 0,
> +        };
> +
> +        struct itimerspec sync_timer_enable = {
> +                .it_value.tv_sec = s->sync_interval_usec / USEC_PER_SEC,
> +                .it_value.tv_nsec = s->sync_interval_usec % MSEC_PER_SEC,
> +                .it_interval.tv_sec = 0,
> +                .it_interval.tv_nsec = 0,
> +        };
> +
> +        assert(s);
This assert doesn't make sense after s has been dereferenced above.

> +
> +        if (s->sync_timer_fd < 0)
> +                return -EINVAL;
> +
> +        if (++ s->sync_msg_counter > s->sync_interval_msg) {
> +                server_sync(s);
> +                s->sync_msg_counter = 0;
> +                r = timerfd_settime(s->sync_timer_fd, 0, &sync_timer_disable, NULL);
> +                if (r < 0)
> +                        return -errno;
> +
> +                return 0;
> +        }
> +
> +        r = timerfd_settime(s->sync_timer_fd, 0, &sync_timer_enable, NULL);
> +        if (r < 0)
> +                return -errno;
> +
> +        return 0;
> +}
> +
>  int server_init(Server *s) {
>          int n, r, fd;
>  
>          assert(s);
>  
>          zero(*s);
> -        s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = s->dev_kmsg_fd = -1;
> +        s->sync_timer_fd = s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = s->dev_kmsg_fd = -1;
>          s->compress = true;
>          s->seal = true;
>  
> +        s->sync_interval_usec = DEFAULT_SYNC_INTERVAL_USEC;
> +        s->sync_interval_msg = DEFAULT_SYNC_INTERVAL_MSG;
> +
>          s->rate_limit_interval = DEFAULT_RATE_LIMIT_INTERVAL;
>          s->rate_limit_burst = DEFAULT_RATE_LIMIT_BURST;
>  
>          s->forward_to_syslog = true;
>  
> +        s->sync_msg_counter = 0;
>          s->max_level_store = LOG_DEBUG;
>          s->max_level_syslog = LOG_DEBUG;
>          s->max_level_kmsg = LOG_NOTICE;
> @@ -1318,6 +1427,8 @@ int server_init(Server *s) {
>                  s->rate_limit_interval = s->rate_limit_burst = 0;
>          }
>  
> +        log_debug("Setting sync interval to %" PRIu64, s->sync_interval_usec);
> +
>          mkdir_p("/run/systemd/journal", 0755);
>  
>          s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func);
> @@ -1395,6 +1506,10 @@ int server_init(Server *s) {
>          if (r < 0)
>                  return r;
>  
> +        r = server_open_sync_timer(s);
> +        if (r < 0)
> +                return r;
> +
>          r = open_signalfd(s);
>          if (r < 0)
>                  return r;
> @@ -1467,6 +1582,9 @@ void server_done(Server *s) {
>          if (s->dev_kmsg_fd >= 0)
>                  close_nointr_nofail(s->dev_kmsg_fd);
>  
> +        if (s->sync_timer_fd >= 0)
> +                close_nointr_nofail(s->sync_timer_fd);
> +
>          if (s->rate_limit)
>                  journal_rate_limit_free(s->rate_limit);
>  
> diff --git a/src/journal/journald-server.h b/src/journal/journald-server.h
> index 9f50a29..5ae68d5 100644
> --- a/src/journal/journald-server.h
> +++ b/src/journal/journald-server.h
> @@ -71,6 +71,8 @@ typedef struct Server {
>          size_t buffer_size;
>  
>          JournalRateLimit *rate_limit;
> +        usec_t sync_interval_usec;
> +        unsigned sync_interval_msg;
>          usec_t rate_limit_interval;
>          unsigned rate_limit_burst;
>  
> @@ -119,6 +121,9 @@ typedef struct Server {
>          uint64_t *kernel_seqnum;
>  
>          struct udev *udev;
> +
> +        int sync_timer_fd;
> +        unsigned sync_msg_counter;
>  } Server;
>  
>  #define N_IOVEC_META_FIELDS 17
> @@ -145,8 +150,10 @@ void server_fix_perms(Server *s, JournalFile *f, uid_t uid);
>  bool shall_try_append_again(JournalFile *f, int r);
>  int server_init(Server *s);
>  void server_done(Server *s);
> +void server_sync(Server *s);
>  void server_vacuum(Server *s);
>  void server_rotate(Server *s);
> +int server_schedule_sync(Server *s);
>  int server_flush_to_var(Server *s);
>  int process_event(Server *s, struct epoll_event *ev);
>  void server_maybe_append_tags(Server *s);
> diff --git a/src/journal/journald.conf b/src/journal/journald.conf
> index 948318b..6de9ed8 100644
> --- a/src/journal/journald.conf
> +++ b/src/journal/journald.conf
> @@ -12,6 +12,8 @@
>  #Compress=yes
>  #Seal=yes
>  #SplitMode=login
> +#SyncIntervalSec=10s
> +#SyncIntervalMsg=10
What is the effect of fdatasync every 10 msg during high load? Wouldn't
this decrease the throughput significantly?

Zbyszek


More information about the systemd-devel mailing list