[systemd-commits] Branch 'journal' - 3 commits - src/journal src/util.c

Lennart Poettering lennart at kemper.freedesktop.org
Mon Dec 19 17:38:45 PST 2011


 src/journal/journal-def.h  |    3 
 src/journal/journal-file.c |   67 ++++++----
 src/journal/journal-file.h |   13 +
 src/journal/journalctl.c   |   79 ++++++++---
 src/journal/journald.c     |   81 +++++++++++-
 src/journal/sd-journal.c   |  301 ++++++++++++++++++++++++++++++++++++++++++---
 src/journal/sd-journal.h   |   12 -
 src/util.c                 |    2 
 8 files changed, 486 insertions(+), 72 deletions(-)

New commits:
commit bc85bfee87e11317fbcd1160c9003860dc6edde9
Author: Lennart Poettering <lennart at poettering.net>
Date:   Tue Dec 20 02:38:36 2011 +0100

    journal: fix space reservation limit enforcement

diff --git a/src/journal/journal-def.h b/src/journal/journal-def.h
index 1a63ca1..5f026ee 100644
--- a/src/journal/journal-def.h
+++ b/src/journal/journal-def.h
@@ -135,9 +135,6 @@ _packed_ struct Header {
         sd_id128_t seqnum_id;
         uint64_t arena_offset;
         uint64_t arena_size;
-        uint64_t arena_max_size;  /* obsolete */
-        uint64_t arena_min_size;  /* obsolete */
-        uint64_t arena_keep_free; /* obsolete */
         uint64_t data_hash_table_offset;     /* for looking up data objects */
         uint64_t data_hash_table_size;
         uint64_t field_hash_table_offset;     /* for looking up field objects */
diff --git a/src/journal/journal-file.c b/src/journal/journal-file.c
index 8a864cb..8f9b61b 100644
--- a/src/journal/journal-file.c
+++ b/src/journal/journal-file.c
@@ -31,12 +31,6 @@
 #include "journal-file.h"
 #include "lookup3.h"
 
-#define DEFAULT_ARENA_MAX_SIZE (16ULL*1024ULL*1024ULL*1024ULL)
-#define DEFAULT_ARENA_MIN_SIZE (256ULL*1024ULL)
-#define DEFAULT_ARENA_KEEP_FREE (1ULL*1024ULL*1024ULL)
-
-#define DEFAULT_MAX_USE (16ULL*1024ULL*1024ULL*16ULL)
-
 #define DEFAULT_DATA_HASH_TABLE_SIZE (2047ULL*16ULL)
 #define DEFAULT_FIELD_HASH_TABLE_SIZE (2047ULL*16ULL)
 
@@ -76,9 +70,6 @@ static int journal_file_init_header(JournalFile *f, JournalFile *template) {
         zero(h);
         memcpy(h.signature, signature, 8);
         h.arena_offset = htole64(ALIGN64(sizeof(h)));
-        h.arena_max_size = htole64(DEFAULT_ARENA_MAX_SIZE);
-        h.arena_min_size = htole64(DEFAULT_ARENA_MIN_SIZE);
-        h.arena_keep_free = htole64(DEFAULT_ARENA_KEEP_FREE);
 
         r = sd_id128_randomize(&h.file_id);
         if (r < 0)
@@ -161,16 +152,10 @@ static int journal_file_verify_header(JournalFile *f) {
 }
 
 static int journal_file_allocate(JournalFile *f, uint64_t offset, uint64_t size) {
-        uint64_t asize;
         uint64_t old_size, new_size;
 
         assert(f);
 
-        if (offset < le64toh(f->header->arena_offset))
-                return -EINVAL;
-
-        new_size = PAGE_ALIGN(offset + size);
-
         /* We assume that this file is not sparse, and we know that
          * for sure, since we always call posix_fallocate()
          * ourselves */
@@ -179,12 +164,19 @@ static int journal_file_allocate(JournalFile *f, uint64_t offset, uint64_t size)
                 le64toh(f->header->arena_offset) +
                 le64toh(f->header->arena_size);
 
-        if (old_size >= new_size)
+        new_size = PAGE_ALIGN(offset + size);
+        if (new_size < le64toh(f->header->arena_offset))
+                new_size = le64toh(f->header->arena_offset);
+
+        if (new_size <= old_size)
                 return 0;
 
-        asize = new_size - le64toh(f->header->arena_offset);
+        if (f->metrics.max_size > 0 &&
+            new_size > f->metrics.max_size)
+                return -E2BIG;
 
-        if (asize > le64toh(f->header->arena_min_size)) {
+        if (new_size > f->metrics.min_size &&
+            f->metrics.keep_free > 0) {
                 struct statvfs svfs;
 
                 if (fstatvfs(f->fd, &svfs) >= 0) {
@@ -192,8 +184,8 @@ static int journal_file_allocate(JournalFile *f, uint64_t offset, uint64_t size)
 
                         available = svfs.f_bfree * svfs.f_bsize;
 
-                        if (available >= f->header->arena_keep_free)
-                                available -= f->header->arena_keep_free;
+                        if (available >= f->metrics.keep_free)
+                                available -= f->metrics.keep_free;
                         else
                                 available = 0;
 
@@ -202,16 +194,16 @@ static int journal_file_allocate(JournalFile *f, uint64_t offset, uint64_t size)
                 }
         }
 
-        if (asize > le64toh(f->header->arena_max_size))
-                return -E2BIG;
-
+        /* Note that the glibc fallocate() fallback is very
+           inefficient, hence we try to minimize the allocation area
+           as we can. */
         if (posix_fallocate(f->fd, old_size, new_size - old_size) < 0)
                 return -errno;
 
         if (fstat(f->fd, &f->last_stat) < 0)
                 return -errno;
 
-        f->header->arena_size = htole64(asize);
+        f->header->arena_size = new_size - htole64(f->header->arena_offset);
 
         return 0;
 }
@@ -576,6 +568,9 @@ int journal_file_find_data_object_with_hash(
 
         osize = offsetof(Object, data.payload) + size;
 
+        if (f->header->data_hash_table_size == 0)
+                return -EBADMSG;
+
         h = hash % (le64toh(f->header->data_hash_table_size) / sizeof(HashItem));
         p = le64toh(f->data_hash_table[h].head_hash_offset);
 
@@ -816,7 +811,7 @@ static int journal_file_link_entry(JournalFile *f, Object *o, uint64_t offset) {
         if (r < 0)
                 return r;
 
-        log_error("%s %lu", f->path, (unsigned long) f->header->n_entries);
+        log_error("=> %s seqnr=%lu n_entries=%lu", f->path, (unsigned long) o->entry.seqnum, (unsigned long) f->header->n_entries);
 
         if (f->header->head_entry_realtime == 0)
                 f->header->head_entry_realtime = o->entry.realtime;
@@ -887,6 +882,8 @@ static void journal_file_post_change(JournalFile *f) {
          * trigger IN_MODIFY by truncating the journal file to its
          * current size which triggers IN_MODIFY. */
 
+        __sync_synchronize();
+
         if (ftruncate(f->fd, f->last_stat.st_size) < 0)
                 log_error("Failed to to truncate file to its own size: %m");
 }
@@ -1626,6 +1623,10 @@ int journal_file_open(
         f->writable = (flags & O_ACCMODE) != O_RDONLY;
         f->prot = prot_from_flags(flags);
 
+        f->metrics.max_size = DEFAULT_MAX_SIZE;
+        f->metrics.min_size = DEFAULT_MIN_SIZE;
+        f->metrics.keep_free = DEFAULT_KEEP_FREE;
+
         f->path = strdup(fname);
         if (!f->path) {
                 r = -ENOMEM;
diff --git a/src/journal/journal-file.h b/src/journal/journal-file.h
index 664f917..20712b5 100644
--- a/src/journal/journal-file.h
+++ b/src/journal/journal-file.h
@@ -28,6 +28,11 @@
 #include "util.h"
 #include "sd-id128.h"
 
+#define DEFAULT_MAX_SIZE (1024ULL*128ULL)
+#define DEFAULT_MIN_SIZE (256ULL*1024ULL)
+#define DEFAULT_KEEP_FREE (1ULL*1024ULL*1024ULL)
+#define DEFAULT_MAX_USE (16ULL*1024ULL*1024ULL*16ULL)
+
 typedef struct Window {
         void *ptr;
         uint64_t offset;
@@ -45,6 +50,12 @@ enum {
         _WINDOW_MAX
 };
 
+typedef struct JournalMetrics {
+        uint64_t max_size;
+        uint64_t min_size;
+        uint64_t keep_free;
+} JournalMetrics;
+
 typedef struct JournalFile {
         int fd;
         char *path;
@@ -62,6 +73,8 @@ typedef struct JournalFile {
         Window windows[_WINDOW_MAX];
 
         uint64_t current_offset;
+
+        JournalMetrics metrics;
 } JournalFile;
 
 typedef enum direction {
diff --git a/src/journal/journald.c b/src/journal/journald.c
index c457d27..37f8f16 100644
--- a/src/journal/journald.c
+++ b/src/journal/journald.c
@@ -54,6 +54,9 @@ typedef struct Server {
 
         char *buffer;
         size_t buffer_size;
+
+        JournalMetrics metrics;
+        uint64_t max_use;
 } Server;
 
 static void fix_perms(JournalFile *f, uid_t uid) {
@@ -153,6 +156,66 @@ static JournalFile* find_journal(Server *s, uid_t uid) {
         return f;
 }
 
+static void server_vacuum(Server *s) {
+        Iterator i;
+        void *k;
+        char *p;
+        char ids[33];
+        sd_id128_t machine;
+        int r;
+        JournalFile *f;
+
+        log_info("Rotating...");
+
+        if (s->runtime_journal) {
+                r = journal_file_rotate(&s->runtime_journal);
+                if (r < 0)
+                        log_error("Failed to rotate %s: %s", s->runtime_journal->path, strerror(-r));
+        }
+
+        if (s->system_journal) {
+                r = journal_file_rotate(&s->system_journal);
+                if (r < 0)
+                        log_error("Failed to rotate %s: %s", s->system_journal->path, strerror(-r));
+        }
+
+        HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) {
+                r = journal_file_rotate(&f);
+                if (r < 0)
+                        log_error("Failed to rotate %s: %s", f->path, strerror(-r));
+                else
+                        hashmap_replace(s->user_journals, k, f);
+        }
+
+        log_info("Vacuuming...");
+
+        r = sd_id128_get_machine(&machine);
+        if (r < 0) {
+                log_error("Failed to get machine ID: %s", strerror(-r));
+                return;
+        }
+
+        if (asprintf(&p, "/var/log/journal/%s", sd_id128_to_string(machine, ids)) < 0) {
+                log_error("Out of memory.");
+                return;
+        }
+
+        r = journal_directory_vacuum(p, s->max_use, s->metrics.keep_free);
+        if (r < 0 && r != -ENOENT)
+                log_error("Failed to vacuum %s: %s", p, strerror(-r));
+        free(p);
+
+        if (asprintf(&p, "/run/log/journal/%s", ids) < 0) {
+                log_error("Out of memory.");
+                return;
+        }
+
+        r = journal_directory_vacuum(p, s->max_use, s->metrics.keep_free);
+        if (r < 0 && r != -ENOENT)
+                log_error("Failed to vacuum %s: %s", p, strerror(-r));
+        free(p);
+}
+
 static void dispatch_message(Server *s, struct iovec *iovec, unsigned n, unsigned m, struct ucred *ucred, struct timeval *tv) {
         char *pid = NULL, *uid = NULL, *gid = NULL,
                 *source_time = NULL, *boot_id = NULL, *machine_id = NULL,
@@ -166,6 +229,7 @@ static void dispatch_message(Server *s, struct iovec *iovec, unsigned n, unsigne
         char *t;
         uid_t loginuid = 0, realuid = 0;
         JournalFile *f;
+        bool vacuumed = false;
 
         assert(s);
         assert(iovec || n == 0);
@@ -262,12 +326,23 @@ static void dispatch_message(Server *s, struct iovec *iovec, unsigned n, unsigne
 
         assert(n <= m);
 
+retry:
         f = find_journal(s, realuid == 0 ? 0 : loginuid);
         if (!f)
                 log_warning("Dropping message, as we can't find a place to store the data.");
         else {
                 r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL);
 
+                if (r == -E2BIG && !vacuumed) {
+                        log_info("Allocation limit reached.");
+
+                        server_vacuum(s);
+                        vacuumed = true;
+
+                        log_info("Retrying write.");
+                        goto retry;
+                }
+
                 if (r < 0)
                         log_error("Failed to write entry, ignoring: %s", strerror(-r));
         }
@@ -715,6 +790,10 @@ static int server_init(Server *s) {
 
         zero(*s);
         s->syslog_fd = s->native_fd = s->signal_fd = -1;
+        s->metrics.max_size = DEFAULT_MAX_SIZE;
+        s->metrics.min_size = DEFAULT_MIN_SIZE;
+        s->metrics.keep_free = DEFAULT_KEEP_FREE;
+        s->max_use = DEFAULT_MAX_USE;
 
         s->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
         if (s->epoll_fd < 0) {
diff --git a/src/journal/sd-journal.h b/src/journal/sd-journal.h
index 33e4b78..ee9813f 100644
--- a/src/journal/sd-journal.h
+++ b/src/journal/sd-journal.h
@@ -38,7 +38,6 @@
  *   - accelerate looking for "all hostnames" and suchlike.
  *   - throttling
  *   - cryptographic hash
- *   - fix space reservation logic
  *   - compression
  */
 

commit 466ccd92e2f9ad712332012e1b3643a34b006a45
Author: Lennart Poettering <lennart at poettering.net>
Date:   Tue Dec 20 00:38:14 2011 +0100

    journal: fix matches

diff --git a/src/journal/journal-file.c b/src/journal/journal-file.c
index 7626743..8a864cb 100644
--- a/src/journal/journal-file.c
+++ b/src/journal/journal-file.c
@@ -1434,7 +1434,7 @@ int journal_file_next_entry_for_data(
         assert(p > 0 || !o);
 
         r = journal_file_move_to_object(f, OBJECT_DATA, data_offset, &d);
-        if (r <= 0)
+        if (r < 0)
                 return r;
 
         n = le64toh(d->data.n_entries);
diff --git a/src/journal/sd-journal.c b/src/journal/sd-journal.c
index bd510be..9dff724 100644
--- a/src/journal/sd-journal.c
+++ b/src/journal/sd-journal.c
@@ -525,6 +525,7 @@ static int next_with_matches(sd_journal *j, JournalFile *f, direction_t directio
                 uint64_t np, n;
                 bool found, term_result = false;
                 Match *m, *term_match = NULL;
+                Object *npo = NULL;
 
                 n = journal_file_entry_n_items(c);
 
@@ -535,6 +536,7 @@ static int next_with_matches(sd_journal *j, JournalFile *f, direction_t directio
                 np = 0;
                 LIST_FOREACH(matches, m, j->matches) {
                         uint64_t q, k;
+                        Object *qo = NULL;
 
                         /* Let's check if this is the beginning of a
                          * new term, i.e. has a different field prefix
@@ -567,22 +569,26 @@ static int next_with_matches(sd_journal *j, JournalFile *f, direction_t directio
                          * where we'd have to try next, in case the other
                          * matches are not OK */
 
-                        r = journal_file_next_entry_for_data(f, c, cp, le64toh(c->entry.items[k].object_offset), direction, NULL, &q);
+                        r = journal_file_next_entry_for_data(f, c, cp, le64toh(c->entry.items[k].object_offset), direction, &qo, &q);
                         if (r > 0) {
 
                                 if (direction == DIRECTION_DOWN) {
-                                        if (q > np)
+                                        if (q > np) {
                                                 np = q;
+                                                npo = qo;
+                                        }
                                 } else {
-                                        if (np == 0 || q < np)
+                                        if (np == 0 || q < np) {
                                                 np = q;
+                                                npo = qo;
+                                        }
                                 }
                         }
                 }
 
                 /* Check the last term */
-                if (term_match && term_result)
-                        found = true;
+                if (term_match && !term_result)
+                        found = false;
 
                 /* Did this entry match against all matches? */
                 if (found) {
@@ -600,6 +606,7 @@ static int next_with_matches(sd_journal *j, JournalFile *f, direction_t directio
                 /* Hmm, ok, this entry only matched partially, so
                  * let's try another one */
                 cp = np;
+                c = npo;
         }
 }
 
@@ -612,12 +619,12 @@ static int next_beyond_location(sd_journal *j, JournalFile *f, direction_t direc
         assert(f);
 
         if (f->current_offset > 0) {
-                r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &c);
+                cp = f->current_offset;
+
+                r = journal_file_move_to_object(f, OBJECT_ENTRY, cp, &c);
                 if (r < 0)
                         return r;
 
-                cp = f->current_offset;
-
                 r = next_with_matches(j, f, direction, &c, &cp);
                 if (r <= 0)
                         return r;

commit 50f20cfdb0f127e415ab38c024d9ca7a3602f74b
Author: Lennart Poettering <lennart at poettering.net>
Date:   Mon Dec 19 22:35:46 2011 +0100

    journal: implement inotify-based live logging logic

diff --git a/src/journal/journal-file.c b/src/journal/journal-file.c
index 427631d..7626743 100644
--- a/src/journal/journal-file.c
+++ b/src/journal/journal-file.c
@@ -879,6 +879,18 @@ static int journal_file_append_entry_internal(
         return 0;
 }
 
+static void journal_file_post_change(JournalFile *f) {
+        assert(f);
+
+        /* inotify() does not receive IN_MODIFY events from file
+         * accesses done via mmap(). After each access we hence
+         * trigger IN_MODIFY by truncating the journal file to its
+         * current size which triggers IN_MODIFY. */
+
+        if (ftruncate(f->fd, f->last_stat.st_size) < 0)
+                log_error("Failed to to truncate file to its own size: %m");
+}
+
 int journal_file_append_entry(JournalFile *f, const dual_timestamp *ts, const struct iovec iovec[], unsigned n_iovec, uint64_t *seqnum, Object **ret, uint64_t *offset) {
         unsigned i;
         EntryItem *items;
@@ -923,6 +935,8 @@ int journal_file_append_entry(JournalFile *f, const dual_timestamp *ts, const st
 
         r = journal_file_append_entry_internal(f, ts, xor_hash, items, n_iovec, seqnum, ret, offset);
 
+        journal_file_post_change(f);
+
 finish:
         free(items);
 
diff --git a/src/journal/journalctl.c b/src/journal/journalctl.c
index 9220efd..c947730 100644
--- a/src/journal/journalctl.c
+++ b/src/journal/journalctl.c
@@ -26,12 +26,15 @@
 #include <stdio.h>
 #include <unistd.h>
 #include <stdlib.h>
+#include <sys/poll.h>
 
 #include "sd-journal.h"
 #include "log.h"
 
+static bool arg_follow = true;
+
 int main(int argc, char *argv[]) {
-        int r, i;
+        int r, i, fd;
         sd_journal *j = NULL;
 
         log_set_max_level(LOG_DEBUG);
@@ -54,32 +57,68 @@ int main(int argc, char *argv[]) {
                 }
         }
 
-        SD_JOURNAL_FOREACH(j) {
+        fd = sd_journal_get_fd(j);
+        if (fd < 0) {
+                log_error("Failed to get wakeup fd: %s", strerror(-fd));
+                goto finish;
+        }
 
-                const void *data;
-                size_t length;
-                char *cursor;
-                uint64_t realtime = 0, monotonic = 0;
+        r = sd_journal_seek_head(j);
+        if (r < 0) {
+                log_error("Failed to seek to head: %s", strerror(-r));
+                goto finish;
+        }
 
-                r = sd_journal_get_cursor(j, &cursor);
-                if (r < 0) {
-                        log_error("Failed to get cursor: %s", strerror(-r));
-                        goto finish;
+        for (;;) {
+                struct pollfd pollfd;
+
+                while (sd_journal_next(j) > 0) {
+                        const void *data;
+                        size_t length;
+                        char *cursor;
+                        uint64_t realtime = 0, monotonic = 0;
+
+                        r = sd_journal_get_cursor(j, &cursor);
+                        if (r < 0) {
+                                log_error("Failed to get cursor: %s", strerror(-r));
+                                goto finish;
+                        }
+
+                        printf("entry: %s\n", cursor);
+                        free(cursor);
+
+                        sd_journal_get_realtime_usec(j, &realtime);
+                        sd_journal_get_monotonic_usec(j, &monotonic, NULL);
+                        printf("realtime: %llu\n"
+                               "monotonic: %llu\n",
+                               (unsigned long long) realtime,
+                               (unsigned long long) monotonic);
+
+                        SD_JOURNAL_FOREACH_DATA(j, data, length)
+                                printf("\t%.*s\n", (int) length, (const char*) data);
                 }
 
-                printf("entry: %s\n", cursor);
-                free(cursor);
+                if (!arg_follow)
+                        break;
+
+                zero(pollfd);
+                pollfd.fd = fd;
+                pollfd.events = POLLIN;
 
-                sd_journal_get_realtime_usec(j, &realtime);
-                sd_journal_get_monotonic_usec(j, &monotonic, NULL);
-                printf("realtime: %llu\n"
-                       "monotonic: %llu\n",
-                       (unsigned long long) realtime,
-                       (unsigned long long) monotonic);
+                if (poll(&pollfd, 1, -1) < 0) {
+                        if (errno == EINTR)
+                                break;
 
-                SD_JOURNAL_FOREACH_DATA(j, data, length)
-                        printf("\t%.*s\n", (int) length, (const char*) data);
+                        log_error("poll(): %m");
+                        r = -errno;
+                        goto finish;
+                }
 
+                r = sd_journal_process(j);
+                if (r < 0) {
+                        log_error("Failed to process: %s", strerror(-r));
+                        goto finish;
+                }
         }
 
 finish:
diff --git a/src/journal/journald.c b/src/journal/journald.c
index 630ead0..c457d27 100644
--- a/src/journal/journald.c
+++ b/src/journal/journald.c
@@ -866,7 +866,7 @@ int main(int argc, char *argv[]) {
         sd_notify(false,
                   "READY=1\n"
                   "STATUS=Processing messages...");
-#
+
         for (;;) {
                 struct epoll_event event;
 
diff --git a/src/journal/sd-journal.c b/src/journal/sd-journal.c
index bcfcbfb..bd510be 100644
--- a/src/journal/sd-journal.c
+++ b/src/journal/sd-journal.c
@@ -22,6 +22,8 @@
 #include <errno.h>
 #include <fcntl.h>
 #include <stddef.h>
+#include <unistd.h>
+#include <sys/inotify.h>
 
 #include "sd-journal.h"
 #include "journal-def.h"
@@ -73,6 +75,10 @@ struct sd_journal {
         JournalFile *current_file;
         uint64_t current_field;
 
+        int inotify_fd;
+        Hashmap *inotify_wd_dirs;
+        Hashmap *inotify_wd_roots;
+
         LIST_HEAD(Match, matches);
         unsigned n_matches;
 };
@@ -934,11 +940,6 @@ static int add_file(sd_journal *j, const char *prefix, const char *dir, const ch
         assert(prefix);
         assert(filename);
 
-        if (hashmap_size(j->files) >= JOURNAL_FILES_MAX) {
-                log_debug("Too many open journal files, ignoring.");
-                return 0;
-        }
-
         if (dir)
                 fn = join(prefix, "/", dir, "/", filename, NULL);
         else
@@ -947,6 +948,17 @@ static int add_file(sd_journal *j, const char *prefix, const char *dir, const ch
         if (!fn)
                 return -ENOMEM;
 
+        if (hashmap_get(j->files, fn)) {
+                free(fn);
+                return 0;
+        }
+
+        if (hashmap_size(j->files) >= JOURNAL_FILES_MAX) {
+                log_debug("Too many open journal files, not adding %s, ignoring.", fn);
+                free(fn);
+                return 0;
+        }
+
         r = journal_file_open(fn, O_RDONLY, 0, NULL, &f);
         free(fn);
 
@@ -965,6 +977,37 @@ static int add_file(sd_journal *j, const char *prefix, const char *dir, const ch
                 return r;
         }
 
+        log_debug("File %s got added.", f->path);
+
+        return 0;
+}
+
+static int remove_file(sd_journal *j, const char *prefix, const char *dir, const char *filename) {
+        char *fn;
+        JournalFile *f;
+
+        assert(j);
+        assert(prefix);
+        assert(filename);
+
+        if (dir)
+                fn = join(prefix, "/", dir, "/", filename, NULL);
+        else
+                fn = join(prefix, "/", filename, NULL);
+
+        if (!fn)
+                return -ENOMEM;
+
+        f = hashmap_get(j->files, fn);
+        free(fn);
+
+        if (!f)
+                return 0;
+
+        hashmap_remove(j->files, f->path);
+        journal_file_close(f);
+
+        log_debug("File %s got removed.", f->path);
         return 0;
 }
 
@@ -972,6 +1015,7 @@ static int add_directory(sd_journal *j, const char *prefix, const char *dir) {
         char *fn;
         int r;
         DIR *d;
+        int wd;
 
         assert(j);
         assert(prefix);
@@ -982,15 +1026,28 @@ static int add_directory(sd_journal *j, const char *prefix, const char *dir) {
                 return -ENOMEM;
 
         d = opendir(fn);
-        free(fn);
 
         if (!d) {
+                free(fn);
                 if (errno == ENOENT)
                         return 0;
 
                 return -errno;
         }
 
+        wd = inotify_add_watch(j->inotify_fd, fn,
+                               IN_CREATE|IN_MOVED_TO|IN_MODIFY|IN_ATTRIB|IN_DELETE|
+                               IN_DELETE_SELF|IN_MOVE_SELF|IN_UNMOUNT|
+                               IN_DONT_FOLLOW|IN_ONLYDIR);
+        if (wd > 0) {
+                if (hashmap_put(j->inotify_wd_dirs, INT_TO_PTR(wd), fn) < 0)
+                        inotify_rm_watch(j->inotify_fd, wd);
+                else
+                        fn = NULL;
+        }
+
+        free(fn);
+
         for (;;) {
                 struct dirent buf, *de;
 
@@ -1008,9 +1065,65 @@ static int add_directory(sd_journal *j, const char *prefix, const char *dir) {
 
         closedir(d);
 
+        log_debug("Directory %s/%s got added.", prefix, dir);
+
         return 0;
 }
 
+static void remove_directory_wd(sd_journal *j, int wd) {
+        char *p;
+
+        assert(j);
+        assert(wd > 0);
+
+        if (j->inotify_fd >= 0)
+                inotify_rm_watch(j->inotify_fd, wd);
+
+        p = hashmap_remove(j->inotify_wd_dirs, INT_TO_PTR(wd));
+
+        if (p) {
+                log_debug("Directory %s got removed.", p);
+                free(p);
+        }
+}
+
+static void add_root_wd(sd_journal *j, const char *p) {
+        int wd;
+        char *k;
+
+        assert(j);
+        assert(p);
+
+        wd = inotify_add_watch(j->inotify_fd, p,
+                               IN_CREATE|IN_MOVED_TO|IN_MODIFY|IN_ATTRIB|IN_DELETE|
+                               IN_DONT_FOLLOW|IN_ONLYDIR);
+        if (wd <= 0)
+                return;
+
+        k = strdup(p);
+        if (!k || hashmap_put(j->inotify_wd_roots, INT_TO_PTR(wd), k) < 0) {
+                inotify_rm_watch(j->inotify_fd, wd);
+                free(k);
+        }
+}
+
+static void remove_root_wd(sd_journal *j, int wd) {
+        char *p;
+
+        assert(j);
+        assert(wd > 0);
+
+        if (j->inotify_fd >= 0)
+                inotify_rm_watch(j->inotify_fd, wd);
+
+        p = hashmap_remove(j->inotify_wd_roots, INT_TO_PTR(wd));
+
+        if (p) {
+                log_debug("Root %s got removed.", p);
+                free(p);
+        }
+}
+
 int sd_journal_open(sd_journal **ret) {
         sd_journal *j;
         const char *p;
@@ -1025,12 +1138,26 @@ int sd_journal_open(sd_journal **ret) {
         if (!j)
                 return -ENOMEM;
 
+        j->inotify_fd = inotify_init1(IN_NONBLOCK|IN_CLOEXEC);
+        if (j->inotify_fd < 0) {
+                r = -errno;
+                goto fail;
+        }
+
         j->files = hashmap_new(string_hash_func, string_compare_func);
         if (!j->files) {
                 r = -ENOMEM;
                 goto fail;
         }
 
+        j->inotify_wd_dirs = hashmap_new(trivial_hash_func, trivial_compare_func);
+        j->inotify_wd_roots = hashmap_new(trivial_hash_func, trivial_compare_func);
+
+        if (!j->inotify_wd_dirs || !j->inotify_wd_roots) {
+                r = -ENOMEM;
+                goto fail;
+        }
+
         /* We ignore most errors here, since the idea is to only open
          * what's actually accessible, and ignore the rest. */
 
@@ -1044,6 +1171,8 @@ int sd_journal_open(sd_journal **ret) {
                         continue;
                 }
 
+                add_root_wd(j, p);
+
                 for (;;) {
                         struct dirent buf, *de;
                         sd_id128_t id;
@@ -1081,6 +1210,24 @@ fail:
 void sd_journal_close(sd_journal *j) {
         assert(j);
 
+        if (j->inotify_wd_dirs) {
+                void *k;
+
+                while ((k = hashmap_first_key(j->inotify_wd_dirs)))
+                        remove_directory_wd(j, PTR_TO_INT(k));
+
+                hashmap_free(j->inotify_wd_dirs);
+        }
+
+        if (j->inotify_wd_roots) {
+                void *k;
+
+                while ((k = hashmap_first_key(j->inotify_wd_roots)))
+                        remove_root_wd(j, PTR_TO_INT(k));
+
+                hashmap_free(j->inotify_wd_roots);
+        }
+
         if (j->files) {
                 JournalFile *f;
 
@@ -1092,6 +1239,9 @@ void sd_journal_close(sd_journal *j) {
 
         sd_journal_flush_matches(j);
 
+        if (j->inotify_fd >= 0)
+                close_nointr_nofail(j->inotify_fd);
+
         free(j);
 }
 
@@ -1275,3 +1425,119 @@ void sd_journal_restart_data(sd_journal *j) {
 
         j->current_field = 0;
 }
+
+int sd_journal_get_fd(sd_journal *j) {
+        assert(j);
+
+        return j->inotify_fd;
+}
+
+static void process_inotify_event(sd_journal *j, struct inotify_event *e) {
+        char *p;
+        int r;
+
+        assert(j);
+        assert(e);
+
+        /* Is this a subdirectory we watch? */
+        p = hashmap_get(j->inotify_wd_dirs, INT_TO_PTR(e->wd));
+        if (p) {
+
+                if (!(e->mask & IN_ISDIR) && e->len > 0 && endswith(e->name, ".journal")) {
+
+                        /* Event for a journal file */
+
+                        if (e->mask & (IN_CREATE|IN_MOVED_TO|IN_MODIFY|IN_ATTRIB)) {
+                                r = add_file(j, p, NULL, e->name);
+                                if (r < 0)
+                                        log_debug("Failed to add file %s/%s: %s", p, e->name, strerror(-r));
+                        } else if (e->mask & (IN_DELETE|IN_UNMOUNT)) {
+
+                                r = remove_file(j, p, NULL, e->name);
+                                if (r < 0)
+                                        log_debug("Failed to remove file %s/%s: %s", p, e->name, strerror(-r));
+                        }
+
+                } else if (e->len == 0) {
+
+                        /* Event for the directory itself */
+
+                        if (e->mask & (IN_DELETE_SELF|IN_MOVE_SELF|IN_UNMOUNT))
+                                remove_directory_wd(j, e->wd);
+                }
+
+                return;
+        }
+
+        /* Must be the root directory then? */
+        p = hashmap_get(j->inotify_wd_roots, INT_TO_PTR(e->wd));
+        if (p) {
+                sd_id128_t id;
+
+                if (!(e->mask & IN_ISDIR) && e->len > 0 && endswith(e->name, ".journal")) {
+
+                        /* Event for a journal file */
+
+                        if (e->mask & (IN_CREATE|IN_MOVED_TO|IN_MODIFY|IN_ATTRIB)) {
+                                r = add_file(j, p, NULL, e->name);
+                                if (r < 0)
+                                        log_debug("Failed to add file %s/%s: %s", p, e->name, strerror(-r));
+                        } else if (e->mask & (IN_DELETE|IN_UNMOUNT)) {
+
+                                r = remove_file(j, p, NULL, e->name);
+                                if (r < 0)
+                                        log_debug("Failed to remove file %s/%s: %s", p, e->name, strerror(-r));
+                        }
+
+                } else if ((e->mask & IN_ISDIR) && e->len > 0 && sd_id128_from_string(e->name, &id) >= 0) {
+
+                        /* Event for subdirectory */
+
+                        if (e->mask & (IN_CREATE|IN_MOVED_TO|IN_MODIFY|IN_ATTRIB)) {
+
+                                r = add_directory(j, p, e->name);
+                                if (r < 0)
+                                        log_debug("Failed to add directory %s/%s: %s", p, e->name, strerror(-r));
+                        }
+                }
+
+                return;
+        }
+
+        if (e->mask & IN_IGNORED)
+                return;
+
+        log_warning("Unknown inotify event.");
+}
+
+int sd_journal_process(sd_journal *j) {
+        uint8_t buffer[sizeof(struct inotify_event) + FILENAME_MAX];
+
+        assert(j);
+
+        for (;;) {
+                struct inotify_event *e;
+                ssize_t l;
+
+                l = read(j->inotify_fd, buffer, sizeof(buffer));
+                if (l < 0) {
+                        if (errno == EINTR || errno == EAGAIN)
+                                return 0;
+
+                        return -errno;
+                }
+
+                e = (struct inotify_event*) buffer;
+                while (l > 0) {
+                        size_t step;
+
+                        process_inotify_event(j, e);
+
+                        step = sizeof(struct inotify_event) + e->len;
+                        assert(step <= (size_t) l);
+
+                        e = (struct inotify_event*) ((uint8_t*) e + step);
+                        l -= step;
+                }
+        }
+}
diff --git a/src/journal/sd-journal.h b/src/journal/sd-journal.h
index 05a929d..33e4b78 100644
--- a/src/journal/sd-journal.h
+++ b/src/journal/sd-journal.h
@@ -32,7 +32,6 @@
 /* TODO:
  *
  *   - check LE/BE conversion for 8bit, 16bit, 32bit values
- *   - implement inotify usage on client
  *   - implement audit gateway
  *   - implement stdout gateway
  *   - extend hash tables table as we go
@@ -40,7 +39,7 @@
  *   - throttling
  *   - cryptographic hash
  *   - fix space reservation logic
- *   - comm, argv can be manipulated, should it be _COMM=, _CMDLINE= or COMM=, CMDLINE=?
+ *   - compression
  */
 
 /* Write to daemon */
@@ -92,16 +91,16 @@ enum {
         SD_JOURNAL_INVALIDATE_REMOVE
 };
 
-int sd_journal_get_fd(sd_journal *j);                              /* missing */
-int sd_journal_process(sd_journal *j);                             /* missing */
+int sd_journal_get_fd(sd_journal *j);
+int sd_journal_process(sd_journal *j);
 
 #define SD_JOURNAL_FOREACH(j)                                           \
         if (sd_journal_seek_head(j) >= 0)                               \
-                while (sd_journal_next(j) > 0)                          \
+                while (sd_journal_next(j) > 0)
 
 #define SD_JOURNAL_FOREACH_BACKWARDS(j)                                 \
         if (sd_journal_seek_tail(j) >= 0)                               \
-                while (sd_journal_previous(j) > 0)                      \
+                while (sd_journal_previous(j) > 0)
 
 #define SD_JOURNAL_FOREACH_DATA(j, data, l)                             \
         for (sd_journal_restart_data(j); sd_journal_enumerate_data((j), &(data), &(l)) > 0; )
diff --git a/src/util.c b/src/util.c
index e5b5e53..37942de 100644
--- a/src/util.c
+++ b/src/util.c
@@ -2674,7 +2674,7 @@ int acquire_terminal(const char *name, bool fail, bool force, bool ignore_tiocst
                         ssize_t l;
                         struct inotify_event *e;
 
-                        if ((l = read(notify, &inotify_buffer, sizeof(inotify_buffer))) < 0) {
+                        if ((l = read(notify, inotify_buffer, sizeof(inotify_buffer))) < 0) {
 
                                 if (errno == EINTR)
                                         continue;



More information about the systemd-commits mailing list