[systemd-devel] [PATCH] [RFCv3] Add sync timer to journal server
Zbigniew Jędrzejewski-Szmek
zbyszek at in.waw.pl
Mon Mar 18 16:37:28 PDT 2013
On Mon, Mar 18, 2013 at 10:17:25PM +0200, Oleksii Shevchuk wrote:
> Sync journal with fdatasync after 10s of inactivity (by default), or
> after 10 messages (by default). Intervals configured
> via SyncIntervalSec and SyncIntervalMsg options at journal.conf.
> Manual sync can be performed via sending SIGUSR1.
> ---
> src/journal/journal-file.c | 25 +++++++-
> src/journal/journal-file.h | 1 +
> src/journal/journald-gperf.gperf | 2 +
> src/journal/journald-server.c | 126 +++++++++++++++++++++++++++++++++++++--
> src/journal/journald-server.h | 7 +++
> src/journal/journald.conf | 2 +
> 6 files changed, 157 insertions(+), 6 deletions(-)
>
> diff --git a/src/journal/journal-file.c b/src/journal/journal-file.c
> index 13fc8ed..2637f88 100644
> --- a/src/journal/journal-file.c
> +++ b/src/journal/journal-file.c
> @@ -68,6 +68,22 @@
> /* How many entries to keep in the entry array chain cache at max */
> #define CHAIN_CACHE_MAX 20
>
> +int journal_file_sync(JournalFile *f) {
> + assert(f);
> +
> + if (! (f->writable && f->fd >= 0))
> + return -1;
-1 means EPERM. Something different is needed.
> + if (fdatasync(f->fd))
> + return -errno;
> +
> + f->header->state = STATE_OFFLINE;
> +
> + fdatasync(f->fd);
Apparently the only error code which could happen here is EIO.
Are you sure that we want to ignore it?
> + return 0;
> +}
> +
> void journal_file_close(JournalFile *f) {
> assert(f);
>
> @@ -81,8 +97,7 @@ void journal_file_close(JournalFile *f) {
> if (f->mmap && f->fd >= 0)
> mmap_cache_close_fd(f->mmap, f->fd);
>
> - if (f->writable && f->fd >= 0)
> - fdatasync(f->fd);
> + journal_file_sync(f);
>
> if (f->header) {
> /* Mark the file offline. Don't override the archived state if it already is set */
> @@ -457,6 +472,9 @@ int journal_file_append_object(JournalFile *f, int type, uint64_t size, Object *
> assert(offset);
> assert(ret);
>
> + if (f->header->state == STATE_OFFLINE)
> + f->header->state = STATE_ONLINE;
> +
> p = le64toh(f->header->tail_object_offset);
> if (p == 0)
> p = le64toh(f->header->header_size);
> @@ -1270,6 +1288,9 @@ int journal_file_append_entry(JournalFile *f, const dual_timestamp *ts, const st
> if (!f->writable)
> return -EPERM;
>
> + if (f->header->state == STATE_OFFLINE)
> + f->header->state = STATE_ONLINE;
> +
> if (!ts) {
> dual_timestamp_get(&_ts);
> ts = &_ts;
> diff --git a/src/journal/journal-file.h b/src/journal/journal-file.h
> index cdbc8e4..717d64c 100644
> --- a/src/journal/journal-file.h
> +++ b/src/journal/journal-file.h
> @@ -106,6 +106,7 @@ int journal_file_open(
> JournalFile *template,
> JournalFile **ret);
>
> +int journal_file_sync(JournalFile *f);
> void journal_file_close(JournalFile *j);
>
> int journal_file_open_reliably(
> diff --git a/src/journal/journald-gperf.gperf b/src/journal/journald-gperf.gperf
> index 1baef14..fe4958f 100644
> --- a/src/journal/journald-gperf.gperf
> +++ b/src/journal/journald-gperf.gperf
> @@ -18,6 +18,8 @@ struct ConfigPerfItem;
> Journal.Storage, config_parse_storage, 0, offsetof(Server, storage)
> Journal.Compress, config_parse_bool, 0, offsetof(Server, compress)
> Journal.Seal, config_parse_bool, 0, offsetof(Server, seal)
> +Journal.SyncIntervalSec, config_parse_usec, 0, offsetof(Server, sync_interval_usec)
> +Journal.SyncIntervalMsg, config_parse_unsigned, 0, offsetof(Server, sync_interval_msg)
> Journal.RateLimitInterval, config_parse_usec, 0, offsetof(Server, rate_limit_interval)
> Journal.RateLimitBurst, config_parse_unsigned, 0, offsetof(Server, rate_limit_burst)
> Journal.SystemMaxUse, config_parse_bytes_off, 0, offsetof(Server, system_metrics.max_use)
> diff --git a/src/journal/journald-server.c b/src/journal/journald-server.c
> index ac565c7..050cdee 100644
> --- a/src/journal/journald-server.c
> +++ b/src/journal/journald-server.c
> @@ -24,6 +24,7 @@
> #include <linux/sockios.h>
> #include <sys/statvfs.h>
> #include <sys/mman.h>
> +#include <sys/timerfd.h>
>
> #include <libudev.h>
> #include <systemd/sd-journal.h>
> @@ -67,6 +68,8 @@
>
> #define USER_JOURNALS_MAX 1024
>
> +#define DEFAULT_SYNC_INTERVAL_USEC (10*USEC_PER_SEC)
> +#define DEFAULT_SYNC_INTERVAL_MSG 10
> #define DEFAULT_RATE_LIMIT_INTERVAL (10*USEC_PER_SEC)
> #define DEFAULT_RATE_LIMIT_BURST 200
>
> @@ -344,6 +347,33 @@ void server_rotate(Server *s) {
> }
> }
>
> +void server_sync(Server *s) {
> + JournalFile *f;
> + void *k;
> + Iterator i;
> + int r;
> +
> + if (s->runtime_journal) {
> + r = journal_file_sync(s->runtime_journal);
> + if (r < 0)
> + log_error("Failed to sync runtime journal: %s", strerror(-r));
> + }
> +
> + if (s->system_journal) {
> + r = journal_file_sync(s->system_journal);
> + if (r < 0)
> + log_error("Failed to sync system journal: %s", strerror(-r));
> + }
> +
> + HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) {
> + r = journal_file_sync(f);
> + if (r < 0)
> + log_error("Failed to sync user journal: %s", strerror(-r));
> + }
> +
> + s->sync_msg_counter = 0;
> +}
> +
> void server_vacuum(Server *s) {
> char *p;
> char ids[33];
> @@ -475,8 +505,10 @@ static void write_to_journal(Server *s, uid_t uid, struct iovec *iovec, unsigned
> }
>
> r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL);
> - if (r >= 0)
> + if (r >= 0) {
> + server_schedule_sync(s);
> return;
> + }
>
> if (vacuumed || !shall_try_append_again(f, r)) {
> log_error("Failed to write entry, ignoring: %s", strerror(-r));
> @@ -991,11 +1023,10 @@ int process_event(Server *s, struct epoll_event *ev) {
> return -errno;
> }
>
> - log_info("Received SIG%s", signal_to_string(sfsi.ssi_signo));
> -
> if (sfsi.ssi_signo == SIGUSR1) {
> touch("/run/systemd/journal/flushed");
> server_flush_to_var(s);
> + server_sync(s);
> return 1;
> }
>
> @@ -1005,8 +1036,23 @@ int process_event(Server *s, struct epoll_event *ev) {
> return 1;
> }
>
> + log_info("Received SIG%s", signal_to_string(sfsi.ssi_signo));
> +
> return 0;
>
> + } else if (ev->data.fd == s->sync_timer_fd) {
> + int r;
> + uint64_t t;
> +
> + log_debug("Got sync request from epoll.");
> +
> + r = read(ev->data.fd, (void *)&t, sizeof(t));
> + if (r < 0)
> + return 0;
> +
> + server_sync(s);
> + return 1;
> +
> } else if (ev->data.fd == s->dev_kmsg_fd) {
> int r;
>
> @@ -1286,21 +1332,84 @@ static int server_parse_config_file(Server *s) {
> return r;
> }
>
> +static int server_open_sync_timer(Server *s) {
> + struct epoll_event ev;
> +
> + assert(s);
> +
> + s->sync_timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
> + if (s->sync_timer_fd < 0)
> + return -errno;
> +
> + zero(ev);
> + ev.events = EPOLLIN;
> + ev.data.fd = s->sync_timer_fd;
> + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->sync_timer_fd, &ev) < 0) {
> + log_error("Failed to add timer fd to epoll object: %m");
> + return -errno;
> + }
> +
> + return 0;
> +}
> +
> +int server_schedule_sync(Server *s) {
> + int r;
> +
> + static struct itimerspec sync_timer_disable = {
> + .it_value.tv_sec = 0,
> + .it_value.tv_nsec = 0,
> + .it_interval.tv_sec = 0,
> + .it_interval.tv_nsec = 0,
> + };
> +
> + struct itimerspec sync_timer_enable = {
> + .it_value.tv_sec = s->sync_interval_usec / USEC_PER_SEC,
> + .it_value.tv_nsec = s->sync_interval_usec % MSEC_PER_SEC,
> + .it_interval.tv_sec = 0,
> + .it_interval.tv_nsec = 0,
> + };
> +
> + assert(s);
This assert doesn't make sense after s has been dereferenced above.
> +
> + if (s->sync_timer_fd < 0)
> + return -EINVAL;
> +
> + if (++ s->sync_msg_counter > s->sync_interval_msg) {
> + server_sync(s);
> + s->sync_msg_counter = 0;
> + r = timerfd_settime(s->sync_timer_fd, 0, &sync_timer_disable, NULL);
> + if (r < 0)
> + return -errno;
> +
> + return 0;
> + }
> +
> + r = timerfd_settime(s->sync_timer_fd, 0, &sync_timer_enable, NULL);
> + if (r < 0)
> + return -errno;
> +
> + return 0;
> +}
> +
> int server_init(Server *s) {
> int n, r, fd;
>
> assert(s);
>
> zero(*s);
> - s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = s->dev_kmsg_fd = -1;
> + s->sync_timer_fd = s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = s->dev_kmsg_fd = -1;
> s->compress = true;
> s->seal = true;
>
> + s->sync_interval_usec = DEFAULT_SYNC_INTERVAL_USEC;
> + s->sync_interval_msg = DEFAULT_SYNC_INTERVAL_MSG;
> +
> s->rate_limit_interval = DEFAULT_RATE_LIMIT_INTERVAL;
> s->rate_limit_burst = DEFAULT_RATE_LIMIT_BURST;
>
> s->forward_to_syslog = true;
>
> + s->sync_msg_counter = 0;
> s->max_level_store = LOG_DEBUG;
> s->max_level_syslog = LOG_DEBUG;
> s->max_level_kmsg = LOG_NOTICE;
> @@ -1318,6 +1427,8 @@ int server_init(Server *s) {
> s->rate_limit_interval = s->rate_limit_burst = 0;
> }
>
> + log_debug("Setting sync interval to %" PRIu64, s->sync_interval_usec);
> +
> mkdir_p("/run/systemd/journal", 0755);
>
> s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func);
> @@ -1395,6 +1506,10 @@ int server_init(Server *s) {
> if (r < 0)
> return r;
>
> + r = server_open_sync_timer(s);
> + if (r < 0)
> + return r;
> +
> r = open_signalfd(s);
> if (r < 0)
> return r;
> @@ -1467,6 +1582,9 @@ void server_done(Server *s) {
> if (s->dev_kmsg_fd >= 0)
> close_nointr_nofail(s->dev_kmsg_fd);
>
> + if (s->sync_timer_fd >= 0)
> + close_nointr_nofail(s->sync_timer_fd);
> +
> if (s->rate_limit)
> journal_rate_limit_free(s->rate_limit);
>
> diff --git a/src/journal/journald-server.h b/src/journal/journald-server.h
> index 9f50a29..5ae68d5 100644
> --- a/src/journal/journald-server.h
> +++ b/src/journal/journald-server.h
> @@ -71,6 +71,8 @@ typedef struct Server {
> size_t buffer_size;
>
> JournalRateLimit *rate_limit;
> + usec_t sync_interval_usec;
> + unsigned sync_interval_msg;
> usec_t rate_limit_interval;
> unsigned rate_limit_burst;
>
> @@ -119,6 +121,9 @@ typedef struct Server {
> uint64_t *kernel_seqnum;
>
> struct udev *udev;
> +
> + int sync_timer_fd;
> + unsigned sync_msg_counter;
> } Server;
>
> #define N_IOVEC_META_FIELDS 17
> @@ -145,8 +150,10 @@ void server_fix_perms(Server *s, JournalFile *f, uid_t uid);
> bool shall_try_append_again(JournalFile *f, int r);
> int server_init(Server *s);
> void server_done(Server *s);
> +void server_sync(Server *s);
> void server_vacuum(Server *s);
> void server_rotate(Server *s);
> +int server_schedule_sync(Server *s);
> int server_flush_to_var(Server *s);
> int process_event(Server *s, struct epoll_event *ev);
> void server_maybe_append_tags(Server *s);
> diff --git a/src/journal/journald.conf b/src/journal/journald.conf
> index 948318b..6de9ed8 100644
> --- a/src/journal/journald.conf
> +++ b/src/journal/journald.conf
> @@ -12,6 +12,8 @@
> #Compress=yes
> #Seal=yes
> #SplitMode=login
> +#SyncIntervalSec=10s
> +#SyncIntervalMsg=10
What is the effect of fdatasync every 10 msg during high load? Wouldn't
this decrease the throughput significantly?
Zbyszek
More information about the systemd-devel
mailing list