[systemd-devel] [PATCH] journal: Introduce journal-network

Susant Sahani susant at redhat.com
Fri Mar 13 10:25:42 PDT 2015


   This tiny daemon enables to pull journal entries and push to a UDP
multicast address in syslog RFC 5424 format. journal-syslog-network runs with own
user systemd-journal-push. It starts running after the network is up.
---
 Makefile-man.am                                |   8 +
 Makefile.am                                    |  40 ++
 man/systemd-journal-network.service.xml        |  84 +++++
 man/systemd-journal-network.xml                | 115 ++++++
 src/journal-remote/journal-network-conf.c      |  61 ++++
 src/journal-remote/journal-network-conf.h      |  32 ++
 src/journal-remote/journal-network-gperf.gperf |  18 +
 src/journal-remote/journal-network-manager.c   | 481 +++++++++++++++++++++++++
 src/journal-remote/journal-network-manager.h   |  70 ++++
 src/journal-remote/journal-network-proto.c     | 218 +++++++++++
 src/journal-remote/journal-network.c           | 218 +++++++++++
 src/journal-remote/journal-network.conf.in     |   2 +
 units/systemd-journal-network.service.in       |  19 +
 13 files changed, 1366 insertions(+)
 create mode 100644 man/systemd-journal-network.service.xml
 create mode 100644 man/systemd-journal-network.xml
 create mode 100644 src/journal-remote/journal-network-conf.c
 create mode 100644 src/journal-remote/journal-network-conf.h
 create mode 100644 src/journal-remote/journal-network-gperf.gperf
 create mode 100644 src/journal-remote/journal-network-manager.c
 create mode 100644 src/journal-remote/journal-network-manager.h
 create mode 100644 src/journal-remote/journal-network-proto.c
 create mode 100644 src/journal-remote/journal-network.c
 create mode 100644 src/journal-remote/journal-network.conf.in
 create mode 100644 units/systemd-journal-network.service.in

diff --git a/Makefile-man.am b/Makefile-man.am
index 7a9612e..efd0cbc 100644
--- a/Makefile-man.am
+++ b/Makefile-man.am
@@ -1357,6 +1357,14 @@ man/systemd-journal-gatewayd.socket.html: man/systemd-journal-gatewayd.service.h
 
 endif
 
+MANPAGES += \
+        man/systemd-journal-network.service.8 \
+        man/systemd-journal-network.8
+MANPAGES_ALIAS += \
+        man/systemd-journal-network.8
+man/systemd-journal-network.8: man/systemd-journal-network.service.8
+man/systemd-journal-network.html: man/systemd-journal-network.service.html
+
 if HAVE_MYHOSTNAME
 MANPAGES += \
 	man/nss-myhostname.8
diff --git a/Makefile.am b/Makefile.am
index 856accb..ad1dff5 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -4336,6 +4336,46 @@ EXTRA_DIST += \
 	src/journal-remote/journal-upload.conf.in
 endif
 
+rootlibexec_PROGRAMS += \
+	systemd-journal-network
+
+systemd_journal_network_SOURCES = \
+	src/journal-remote/journal-network-manager.h \
+	src/journal-remote/journal-network-manager.c \
+	src/journal-remote/journal-network-conf.h \
+	src/journal-remote/journal-network-conf.c \
+	src/journal-remote/journal-network-proto.c \
+	src/journal-remote/journal-network.c
+
+nodist_systemd_journal_network_SOURCES = \
+	src/journal-remote/journal-network-gperf.c
+
+EXTRA_DIST += \
+        src/journal-remote/journal-network-gperf.gperf
+
+CLEANFILES += \
+        src/journal-remote/journal-network-gperf.c
+
+systemd_journal_network_LDADD = \
+	libsystemd-internal.la \
+	libsystemd-journal-internal.la \
+	libsystemd-shared.la
+
+nodist_systemunit_DATA += \
+	units/systemd-journal-network.service
+
+EXTRA_DIST += \
+	units/systemd-journal-network.service.in
+
+nodist_pkgsysconf_DATA += \
+	src/journal-remote/journal-network.conf
+
+EXTRA_DIST += \
+	src/journal-remote/journal-network.conf.in
+
+CLEANFILES += \
+	src/journal-remote/journal-network.conf
+
 # using _CFLAGS = in the conditional below would suppress AM_CFLAGS
 journalctl_CFLAGS = \
 	$(AM_CFLAGS)
diff --git a/man/systemd-journal-network.service.xml b/man/systemd-journal-network.service.xml
new file mode 100644
index 0000000..47a5b3e
--- /dev/null
+++ b/man/systemd-journal-network.service.xml
@@ -0,0 +1,84 @@
+<?xml version='1.0'?> <!--*- Mode: nxml; nxml-child-indent: 2; indent-tabs-mode: nil -*-->
+<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.2//EN"
+"http://www.oasis-open.org/docbook/xml/4.2/docbookx.dtd">
+
+<!--
+This file is part of systemd.
+
+Copyright 2015 Susant Sahani
+
+systemd is free software; you can redistribute it and/or modify it
+under the terms of the GNU Lesser General Public License as published by
+the Free Software Foundation; either version 2.1 of the License, or
+(at your option) any later version.
+
+systemd is distributed in the hope that it will be useful, but
+WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public License
+along with systemd; If not, see <http://www.gnu.org/licenses/>.
+-->
+
+<refentry id="systemd-journal-network.service" xmlns:xi="http://www.w3.org/2001/XInclude">
+
+  <refentryinfo>
+    <title>systemd-journal-network.service</title>
+    <productname>systemd</productname>
+
+    <authorgroup>
+      <author>
+        <contrib>Developer</contrib>
+        <firstname>Susant</firstname>
+        <surname>Sahani</surname>
+        <email>ssahani at gmail.com</email>
+      </author>
+    </authorgroup>
+  </refentryinfo>
+
+  <refmeta>
+    <refentrytitle>systemd-journal-network.service</refentrytitle>
+    <manvolnum>8</manvolnum>
+  </refmeta>
+
+  <refnamediv>
+    <refname>systemd-journal-network.service</refname>
+    <refname>systemd-journal-network</refname>
+    <refpurpose>Syslog Client for journal events</refpurpose>
+  </refnamediv>
+
+  <refsynopsisdiv>
+    <para><filename>systemd-journal-network.service</filename></para>
+    <cmdsynopsis>
+      <command>/usr/lib/systemd/systemd-journal-network</command>
+      <arg choice="opt" rep="repeat">OPTIONS</arg>
+    </cmdsynopsis>
+  </refsynopsisdiv>
+
+  <refsect1>
+    <title>Description</title>
+
+    <para><command>systemd-journal-network</command> serves journal
+    events over the network. It multicasts journal event to Syslog RFC 5424 format.
+    </para>
+
+    <para>The program is started by
+    <citerefentry><refentrytitle>systemd</refentrytitle><manvolnum>1</manvolnum></citerefentry>
+   . Use
+    <command>systemctl start systemd-journal-network.service</command> to start
+    the service, and <command>systemctl enable systemd-journal-network.service</command>
+    to have it started on boot.</para>
+  </refsect1>
+
+  <refsect1>
+    <title>See Also</title>
+    <para>
+      <citerefentry><refentrytitle>systemd</refentrytitle><manvolnum>1</manvolnum></citerefentry>,
+      <citerefentry><refentrytitle>journalctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>,
+      <citerefentry><refentrytitle>systemd-journald.service</refentrytitle><manvolnum>8</manvolnum></citerefentry>,
+      <citerefentry><refentrytitle>systemd.journal-fields</refentrytitle><manvolnum>7</manvolnum></citerefentry>,
+    </para>
+  </refsect1>
+
+</refentry>
diff --git a/man/systemd-journal-network.xml b/man/systemd-journal-network.xml
new file mode 100644
index 0000000..473a146
--- /dev/null
+++ b/man/systemd-journal-network.xml
@@ -0,0 +1,115 @@
+<?xml version='1.0'?> <!--*- Mode: nxml; nxml-child-indent: 2; indent-tabs-mode: nil -*-->
+<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.2//EN"
+"http://www.oasis-open.org/docbook/xml/4.2/docbookx.dtd">
+
+<!--
+This file is part of systemd.
+
+Copyright 2015 Susant Sahani
+
+systemd is free software; you can redistribute it and/or modify it
+under the terms of the GNU Lesser General Public License as published by
+the Free Software Foundation; either version 2.1 of the License, or
+(at your option) any later version.
+
+systemd is distributed in the hope that it will be useful, but
+WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+Lesser General Public License for more details.
+
+You should have received a copy of the GNU Lesser General Public License
+along with systemd; If not, see <http://www.gnu.org/licenses/>.
+-->
+
+<refentry id="systemd-journal-network" xmlns:xi="http://www.w3.org/2001/XInclude">
+  <refentryinfo>
+    <title>systemd-journal-network</title>
+    <productname>systemd</productname>
+
+    <authorgroup>
+      <author>
+        <contrib>Developer</contrib>
+        <firstname>Susant</firstname>
+        <surname>Sahani</surname>
+        <email>ssahani at gmail.com</email>
+      </author>
+    </authorgroup>
+  </refentryinfo>
+
+  <refmeta>
+    <refentrytitle>systemd-journal-network</refentrytitle>
+    <manvolnum>8</manvolnum>
+  </refmeta>
+
+  <refnamediv>
+    <refname>systemd-journal-network</refname>
+    <refpurpose>Send journal messages over the network in RFC 5424 format</refpurpose>
+  </refnamediv>
+
+  <refsynopsisdiv>
+    <cmdsynopsis>
+      <command>systemd-journal-network</command>
+      <arg choice="opt" rep="repeat">OPTIONS</arg>
+      <arg choice="opt" rep="norepeat">--save-state=<replaceable>state file</replaceable></arg>
+      <arg choice="opt" rep="norepeat">--cursor=<replaceable>journal cursor</replaceable></arg>
+    </cmdsynopsis>
+  </refsynopsisdiv>
+
+  <refsect1>
+    <title>Description</title>
+
+    <para>
+      <command>systemd-journal-network</command> will send journal
+      entries to the UDP Multicast address . Unless
+      limited by one of the options specified below, all journal
+      entries accessible to the user the program is running as will be
+      sent, and then the program will wait and send new entries
+      as they become available.
+    </para>
+  </refsect1>
+
+  <refsect1>
+    <title>Options</title>
+
+    <variablelist>
+      <varlistentry>
+        <term><option>--cursor=</option></term>
+
+        <listitem><para>Push entries from the location in the
+        journal specified by the passed cursor. This has the same
+        meaning as <option>--cursor</option> option for
+        <citerefentry><refentrytitle>journalctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>.</para></listitem>
+      </varlistentry>
+
+      <varlistentry>
+        <term><option>--save-state</option><optional>=<replaceable>PATH</replaceable></optional></term>
+
+        <listitem><para>Push entries from the location in the
+        journal <emphasis>after</emphasis> the location specified by
+        the cursor saved in file at <replaceable>PATH</replaceable>
+        (<filename>/var/lib/systemd/journal-network/state</filename> by default).
+        After an entry is successfully uploaded, update this file
+        with the cursor of that entry.
+        </para></listitem>
+      </varlistentry>
+
+      <xi:include href="standard-options.xml" xpointer="help" />
+      <xi:include href="standard-options.xml" xpointer="version" />
+    </variablelist>
+  </refsect1>
+
+  <refsect1>
+    <title>Exit status</title>
+
+    <para>On success, 0 is returned; otherwise, a non-zero
+    failure code is returned.</para>
+  </refsect1>
+
+  <refsect1>
+    <title>See Also</title>
+    <para>
+      <citerefentry><refentrytitle>journalctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>,
+      <citerefentry><refentrytitle>systemd-journal-network.service</refentrytitle><manvolnum>8</manvolnum></citerefentry>,
+    </para>
+  </refsect1>
+</refentry>
diff --git a/src/journal-remote/journal-network-conf.c b/src/journal-remote/journal-network-conf.c
new file mode 100644
index 0000000..798ecbe
--- /dev/null
+++ b/src/journal-remote/journal-network-conf.c
@@ -0,0 +1,61 @@
+/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
+
+/***
+  This file is part of systemd.
+
+  Copyright 2015 Susant Sahani
+
+  systemd is free software; you can redistribute it and/or modify it
+  under the terms of the GNU Lesser General Public License as published by
+  the Free Software Foundation; either version 2.1 of the License, or
+  (at your option) any later version.
+
+  systemd is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public License
+  along with systemd; If not, see <http://www.gnu.org/licenses/>.
+***/
+
+#include "in-addr-util.h"
+#include "journal-network-conf.h"
+
+int config_parse_network_address(const char *unit,
+                                 const char *filename,
+                                 unsigned line,
+                                 const char *section,
+                                 unsigned section_line,
+                                 const char *lvalue,
+                                 int ltype,
+                                 const char *rvalue,
+                                 void *data,
+                                 void *userdata) {
+        Manager *m = userdata;
+        int r;
+
+        assert(filename);
+        assert(lvalue);
+        assert(rvalue);
+        assert(data);
+
+        r = socket_address_parse(&m->address, rvalue);
+        if (r < 0) {
+                log_syntax(unit, LOG_ERR, filename, line, -r,
+                           "Failed to parse address value, ignoring: %s", rvalue);
+                return 0;
+        }
+
+        return 0;
+}
+
+int manager_parse_config_file(Manager *m) {
+        assert(m);
+
+        return config_parse_many("/etc/systemd/journal-network.conf",
+                                 CONF_DIRS_NULSTR("systemd/journal-network.conf"),
+                                 "Network\0",
+                                 config_item_perf_lookup, journal_network_gperf_lookup,
+                                 false, m);
+}
diff --git a/src/journal-remote/journal-network-conf.h b/src/journal-remote/journal-network-conf.h
new file mode 100644
index 0000000..6dea5b6
--- /dev/null
+++ b/src/journal-remote/journal-network-conf.h
@@ -0,0 +1,32 @@
+/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
+
+/***
+  This file is part of systemd.
+
+  Copyright 2015 Susant Sahani
+
+  systemd is free software; you can redistribute it and/or modify it
+  under the terms of the GNU Lesser General Public License as published by
+  the Free Software Foundation; either version 2.1 of the License, or
+  (at your option) any later version.
+
+  systemd is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public License
+  along with systemd; If not, see <http://www.gnu.org/licenses/>.
+***/
+
+#pragma once
+
+#include "in-addr-util.h"
+#include "conf-parser.h"
+#include "journal-network-manager.h"
+
+const struct ConfigPerfItem* journal_network_gperf_lookup(const char *key, unsigned length);
+int config_parse_network_address(const char *unit, const char *filename, unsigned line, const char *section,
+                                 unsigned section_line, const char *lvalue, int ltype, const char *rvalue,
+                                 void *data, void *userdata);
+int manager_parse_config_file(Manager *m);
diff --git a/src/journal-remote/journal-network-gperf.gperf b/src/journal-remote/journal-network-gperf.gperf
new file mode 100644
index 0000000..9801be1
--- /dev/null
+++ b/src/journal-remote/journal-network-gperf.gperf
@@ -0,0 +1,18 @@
+%{
+#include <stddef.h>
+#include "conf-parser.h"
+#include "journal-network-conf.h"
+#include "journal-network-manager.h"
+%}
+struct ConfigPerfItem;
+%null_strings
+%language=ANSI-C
+%define slot-name section_and_lvalue
+%define hash-function-name journal_network_gperf_hash
+%define lookup-function-name journal_network_gperf_lookup
+%readonly-tables
+%omit-struct-type
+%struct-type
+%includes
+%%
+Network.Address,  config_parse_network_address, 0, 0
diff --git a/src/journal-remote/journal-network-manager.c b/src/journal-remote/journal-network-manager.c
new file mode 100644
index 0000000..f089980
--- /dev/null
+++ b/src/journal-remote/journal-network-manager.c
@@ -0,0 +1,481 @@
+/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
+
+/***
+  This file is part of systemd.
+
+  Copyright 2015 Susant Sahani
+
+  systemd is free software; you can redistribute it and/or modify it
+  under the terms of the GNU Lesser General Public License as published by
+  the Free Software Foundation; either version 2.1 of the License, or
+  (at your option) any later version.
+
+  systemd is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public License
+  along with systemd; If not, see <http://www.gnu.org/licenses/>.
+***/
+
+#include "util.h"
+#include "socket-util.h"
+#include "conf-parser.h"
+#include "sd-daemon.h"
+#include "network-util.h"
+#include "capability.h"
+#include "mkdir.h"
+#include "fileio.h"
+#include "journal-internal.h"
+#include "journal-network-manager.h"
+
+#define JOURNAL_PUSH_POLL_TIMEOUT (10 * USEC_PER_SEC)
+
+/* Default severity LOG_NOTICE */
+#define JOURNAL_DEFAULT_SEVERITY "5"
+
+/* Default facility LOG_USER (1<<3) */
+#define JOURNAL_DEFAULT_FACILITY "8"
+
+
+static int parse_field(const void *data, size_t length, const char *field, char **target, size_t *target_size) {
+        size_t fl, nl;
+        void *buf;
+
+        assert(data);
+        assert(field);
+        assert(target);
+        assert(target_size);
+
+        fl = strlen(field);
+        if (length < fl)
+                return 0;
+
+        if (memcmp(data, field, fl))
+                return 0;
+
+        nl = length - fl;
+        buf = malloc(nl+1);
+        if (!buf)
+                return -ENOMEM;
+
+        memcpy(buf, (const char*) data + fl, nl);
+        ((char*)buf)[nl] = 0;
+
+        free(*target);
+        *target = buf;
+        *target_size = nl;
+
+        return 1;
+}
+
+static int manager_read_journal_input(Manager *m) {
+        _cleanup_free_ char *facility = NULL, *identifier = NULL,
+                *priority = NULL, *message = NULL, *pid = NULL,
+                *hostname = NULL;
+        struct timeval tv;
+        usec_t realtime;
+        const void *data;
+        size_t length;
+        size_t n = 0;
+        int r;
+
+        assert(m);
+        assert(m->journal);
+
+        JOURNAL_FOREACH_DATA_RETVAL(m->journal, data, length, r) {
+
+                r = parse_field(data, length, "PRIORITY=", &priority, &n);
+                if (r < 0)
+                        return r;
+                else if (r > 0)
+                        continue;
+
+                r = parse_field(data, length, "SYSLOG_FACILITY=", &facility, &n);
+                if (r < 0)
+                        return r;
+                else if (r > 0)
+                        continue;
+
+                r = parse_field(data, length, "_HOSTNAME=", &hostname, &n);
+                if (r < 0)
+                        return r;
+                else if (r > 0)
+                        continue;
+
+                r = parse_field(data, length, "SYSLOG_IDENTIFIER=", &identifier, &n);
+                if (r < 0)
+                        return r;
+                else if (r > 0)
+                        continue;
+
+                r = parse_field(data, length, "_PID=", &pid, &n);
+                if (r < 0)
+                        return r;
+                else if (r > 0)
+                        continue;
+
+                r = parse_field(data, length, "MESSAGE=", &message, &n);
+                if (r < 0)
+                        return r;
+        }
+
+        r = sd_journal_get_realtime_usec(m->journal, &realtime);
+        if (r < 0)
+                log_warning_errno(r, "Failed to rerieve realtime from journal: %m");
+        else {
+                tv.tv_sec = realtime / USEC_PER_SEC;
+                tv.tv_usec = realtime % USEC_PER_SEC;
+        }
+
+        /* Set to default facility and priority if missing */
+        if (!facility) {
+                facility = strdup(JOURNAL_DEFAULT_FACILITY);
+                if (!facility)
+                        return -ENOMEM;
+        }
+
+        if (!priority) {
+                priority = strdup(JOURNAL_DEFAULT_FACILITY);
+                if (!priority)
+                        return -ENOMEM;
+        }
+
+        return manager_push_to_network(m, priority, facility, identifier,
+                                       message, hostname, pid, r >= 0 ? &tv : NULL);
+}
+
+static int update_cursor_state(Manager *m) {
+        _cleanup_fclose_ FILE *f = NULL;
+        int r;
+
+        assert(m);
+
+        if (!m->state_file || !m->last_cursor)
+                return 0;
+
+        f = fopen(m->state_file, "we");
+        if (!f)
+                goto finish;
+
+        fprintf(f,
+                "# This is private data. Do not parse.\n"
+                "LAST_CURSOR=%s\n",
+                m->last_cursor);
+
+        fflush(f);
+
+        if (ferror(f))
+                r = -errno;
+
+ finish:
+        if (r < 0)
+                log_error_errno(r, "Failed to save state %s: %m", m->state_file);
+
+        return r;
+}
+
+static int load_cursor_state(Manager *m) {
+        int r;
+
+        assert(m);
+
+        if (!m->state_file)
+                return 0;
+
+        r = parse_env_file(m->state_file, NEWLINE, "LAST_CURSOR", &m->last_cursor, NULL);
+        if (r < 0 && r != -ENOENT)
+                return r;
+
+        log_debug("Last cursor was %s.", m->last_cursor ? m->last_cursor : "Not available");
+
+        return 0;
+}
+
+static int process_journal_input(Manager *m) {
+        int r;
+
+        assert(m);
+        assert(m->journal);
+
+        while (true) {
+                r = sd_journal_next(m->journal);
+                if (r < 0) {
+                        log_error_errno(r, "Failed to get next entry: %m");
+                        return r;
+                }
+
+                if (r == 0)
+                        break;
+
+                r = manager_read_journal_input(m);
+                if (r == -ETIMEDOUT) {
+                        /* Cant send via socket . Seek one entry back */
+                        r = sd_journal_previous(m->journal);
+                        if (r < 0)
+                                log_error_errno(r, "Failed to iterate through journal: %m");
+
+                        return 0;
+                }
+        }
+
+        r = sd_journal_get_cursor(m->journal, &m->current_cursor);
+        if (r < 0)
+                return log_error_errno(r, "Failed to get cursor: %m");
+
+        free(m->last_cursor);
+        m->last_cursor = m->current_cursor;
+        m->current_cursor = NULL;
+
+        return update_cursor_state(m);
+}
+
+static int manager_journal_event_handler(sd_event_source *event, int fd, uint32_t revents, void *userp) {
+        Manager *m = userp;
+        int r;
+
+        if (revents & EPOLLHUP) {
+                log_debug("Received HUP");
+                return 0;
+        }
+
+        if (!(revents & EPOLLIN)) {
+                log_warning("Unexpected poll event %"PRIu32".", revents);
+                return -EINVAL;
+        }
+
+        if (m->event_journal_input) {
+
+                r = sd_journal_process(m->journal);
+                if (r < 0) {
+                        log_error_errno(r, "Failed to process journal: %m");
+                        manager_disconnect(m);
+                        return r;
+                }
+
+                if (r == SD_JOURNAL_NOP)
+                        return 0;
+        }
+
+        return process_journal_input(m);
+}
+
+static void close_journal_input(Manager *m) {
+        assert(m);
+
+        if (m->journal) {
+                log_debug("Closing journal input.");
+
+                sd_journal_close(m->journal);
+                m->journal = NULL;
+        }
+
+        m->timeout = 0;
+}
+
+static int manager_signal_event_handler(sd_event_source *event, const struct signalfd_siginfo *si, void *userdata) {
+        Manager *m = userdata;
+
+        assert(m);
+
+        log_received_signal(LOG_INFO, si);
+
+        manager_disconnect(m);
+
+        sd_event_exit(m->event, 0);
+
+        return 0;
+}
+
+static int manager_journal_monitor_listen(Manager *m) {
+        int r, events;
+
+        assert(m);
+
+        r = sd_journal_open(&m->journal, SD_JOURNAL_LOCAL_ONLY);
+        if (r < 0) {
+                log_error_errno(r, "Failed to open journal: %m");
+                return r;
+        }
+
+        sd_journal_set_data_threshold(m->journal, 0);
+
+        m->journal_watch_fd  = sd_journal_get_fd(m->journal);
+        if (m->journal_watch_fd  < 0)
+                return log_error_errno(m->journal_watch_fd, "sd_journal_get_fd failed: %m");
+
+        events = sd_journal_get_events(m->journal);
+
+        r = sd_journal_reliable_fd(m->journal);
+        assert(r >= 0);
+        if (r > 0)
+                m->timeout = -1;
+        else
+                m->timeout = JOURNAL_PUSH_POLL_TIMEOUT;
+
+        r = sd_event_add_io(m->event, &m->event_journal_input ,
+                            m->journal_watch_fd , events, manager_journal_event_handler, m);
+        if (r < 0)
+                return log_error_errno(r, "Failed to register input event: %m");
+
+        /* ignore failure */
+        if (!m->last_cursor)
+                (void) load_cursor_state(m);
+
+        if (m->last_cursor) {
+                r = sd_journal_seek_cursor(m->journal, m->last_cursor);
+                if (r < 0)
+                        return log_error_errno(r, "Failed to seek to cursor %s: %m",
+                                               m->last_cursor);
+        }
+
+        return 0;
+}
+
+int manager_connect(Manager *m) {
+        int r;
+
+        assert(m);
+
+        manager_disconnect(m);
+
+        r = manager_open_network_socket(m);
+        if (r < 0)
+                return r;
+
+        r = manager_journal_monitor_listen(m);
+        if (r < 0)
+                return r;
+
+        return 0;
+}
+
+void manager_disconnect(Manager *m) {
+        assert(m);
+
+        close_journal_input(m);
+
+        manager_close_network_socket(m);
+
+        m->event_journal_input = sd_event_source_unref(m->event_journal_input);
+
+        sd_notifyf(false, "STATUS=Idle.");
+}
+
+static int manager_network_event_handler(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
+        Manager *m = userdata;
+        bool connected, online;
+        int r;
+
+        assert(m);
+
+        sd_network_monitor_flush(m->network_monitor);
+
+        /* check if the machine is online */
+        online = network_is_online();
+
+        /* check if the socket is currently open*/
+        connected = m->socket >= 0;
+
+        if (connected && !online) {
+                log_info("No network connectivity, watching for changes.");
+                manager_disconnect(m);
+
+        } else if (!connected && online) {
+                log_info("Network configuration changed, trying to establish connection.");
+
+                r = manager_connect(m);
+                if (r < 0)
+                        return r;
+        }
+
+        return 0;
+}
+
+static int manager_network_monitor_listen(Manager *m) {
+        int r, fd, events;
+
+        assert(m);
+
+        r = sd_network_monitor_new(&m->network_monitor, NULL);
+        if (r < 0)
+                return r;
+
+        fd = sd_network_monitor_get_fd(m->network_monitor);
+        if (fd < 0)
+                return fd;
+
+        events = sd_network_monitor_get_events(m->network_monitor);
+        if (events < 0)
+                return events;
+
+        r = sd_event_add_io(m->event, &m->network_event_source, fd, events, manager_network_event_handler, m);
+        if (r < 0)
+                return r;
+
+        return 0;
+}
+
+void manager_free(Manager *m) {
+        if (!m)
+                return;
+
+        manager_disconnect(m);
+
+        free(m->last_cursor);
+        free(m->current_cursor);
+
+        free(m->state_file);
+
+        sd_event_source_unref(m->network_event_source);
+        sd_network_monitor_unref(m->network_monitor);
+
+        sd_event_source_unref(m->sigterm_event);
+        sd_event_source_unref(m->sigint_event);
+
+        sd_event_unref(m->event);
+
+        free(m);
+}
+
+int manager_new(Manager **ret, const char *state_file, const char *cursor) {
+        _cleanup_(manager_freep) Manager *m = NULL;
+        int r;
+
+        assert(ret);
+
+        m = new0(Manager, 1);
+        if (!m)
+                return -ENOMEM;
+
+        m->socket = m->journal_watch_fd = -1;
+
+        m->state_file = strdup(state_file);
+        if (!m->state_file)
+                return -ENOMEM;
+
+        if (cursor) {
+                m->last_cursor = strdup(cursor);
+                if (!m->last_cursor)
+                        return -ENOMEM;
+        }
+
+        r = sd_event_default(&m->event);
+        if (r < 0)
+                return log_error_errno(r, "sd_event_default failed: %m");
+
+        sd_event_add_signal(m->event, NULL, SIGTERM, manager_signal_event_handler,  m);
+        sd_event_add_signal(m->event, NULL, SIGINT, manager_signal_event_handler, m);
+
+        sd_event_set_watchdog(m->event, true);
+
+        r = manager_network_monitor_listen(m);
+        if (r < 0)
+                return r;
+
+        *ret = m;
+        m = NULL;
+
+        return 0;
+}
diff --git a/src/journal-remote/journal-network-manager.h b/src/journal-remote/journal-network-manager.h
new file mode 100644
index 0000000..b947957
--- /dev/null
+++ b/src/journal-remote/journal-network-manager.h
@@ -0,0 +1,70 @@
+/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
+
+/***
+  This file is part of systemd.
+
+  Copyright 2015 Susant Sahani
+
+  systemd is free software; you can redistribute it and/or modify it
+  under the terms of the GNU Lesser General Public License as published by
+  the Free Software Foundation; either version 2.1 of the License, or
+  (at your option) any later version.
+
+  systemd is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public License
+  along with systemd; If not, see <http://www.gnu.org/licenses/>.
+***/
+
+#pragma once
+
+#include "sd-event.h"
+#include "sd-network.h"
+#include "socket-util.h"
+#include "sd-journal.h"
+
+typedef struct Manager Manager;
+
+struct Manager {
+        sd_event *event;
+        sd_event_source *event_journal_input;
+        uint64_t timeout;
+
+        sd_event_source *sigint_event, *sigterm_event;
+
+        /* network */
+        sd_event_source *network_event_source;
+        sd_network_monitor *network_monitor;
+
+        int socket;
+
+        /* Multicast UDP address */
+        SocketAddress address;
+
+        /* journal  */
+        int journal_watch_fd;
+        sd_journal *journal;
+
+        char *state_file;
+
+        char *last_cursor, *current_cursor;
+};
+
+int manager_new(Manager **ret, const char *state_file, const char *cursor);
+void manager_free(Manager *m);
+
+DEFINE_TRIVIAL_CLEANUP_FUNC(Manager*, manager_free);
+
+int manager_connect(Manager *m);
+void manager_disconnect(Manager *m);
+
+void manager_close_network_socket(Manager *m);
+int manager_open_network_socket(Manager *m);
+
+int manager_push_to_network(Manager *m, const char *priority, const char *facility,
+                            const char *identifier, const char *message,
+                            const char *hostname, const char *pid,
+                            const struct timeval *tv);
diff --git a/src/journal-remote/journal-network-proto.c b/src/journal-remote/journal-network-proto.c
new file mode 100644
index 0000000..ca9df8a
--- /dev/null
+++ b/src/journal-remote/journal-network-proto.c
@@ -0,0 +1,218 @@
+/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
+
+/***
+  This file is part of systemd.
+
+  Copyright 2015 Susant Sahani
+
+  systemd is free software; you can redistribute it and/or modify it
+  under the terms of the GNU Lesser General Public License as published by
+  the Free Software Foundation; either version 2.1 of the License, or
+  (at your option) any later version.
+
+  systemd is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public License
+  along with systemd; If not, see <http://www.gnu.org/licenses/>.
+***/
+
+#include <unistd.h>
+#include <stddef.h>
+#include <poll.h>
+
+#include "journal-network-manager.h"
+
+#define RFC_5424_NILVALUE "-"
+#define RFC_5424_PROTOCOL 1
+
+#define SEND_TIMEOUT_USEC (200 * USEC_PER_MSEC)
+
+static int sendmsg_loop(Manager *m, struct msghdr *mh) {
+        int r;
+
+        assert(m);
+        assert(mh);
+
+        for (;;) {
+                if (sendmsg(m->socket, mh, MSG_NOSIGNAL) >= 0)
+                        return 0;
+
+                if (errno == EINTR)
+                        continue;
+
+                if (errno != EAGAIN)
+                        return -errno;
+
+                r = fd_wait_for_event(m->socket, POLLOUT, SEND_TIMEOUT_USEC);
+                if (r < 0)
+                        return r;
+                if (r == 0)
+                        return -ETIMEDOUT;
+        }
+
+        return 0;
+}
+
+static int network_send(Manager *m, struct iovec *iovec, unsigned n_iovec) {
+        struct msghdr mh = { };
+
+        assert(m);
+        assert(iovec);
+        assert(n_iovec > 0);
+
+        mh.msg_iov = iovec;
+        mh.msg_iovlen = n_iovec;
+
+        if (m->address.sockaddr.sa.sa_family == AF_INET) {
+                mh.msg_name = &m->address.sockaddr.sa;
+                mh.msg_namelen = sizeof(m->address.sockaddr.sa);
+        } else if (m->address.sockaddr.sa.sa_family == AF_INET6) {
+                mh.msg_name = &m->address.sockaddr.in6;
+                mh.msg_namelen = sizeof(m->address.sockaddr.in6);
+        } else
+                return -EAFNOSUPPORT;
+
+        return sendmsg_loop(m, &mh);
+}
+
+/* rfc3339 timestamp format: yyyy-mm-ddthh:mm:ss[.frac]<+/->zz:zz */
+static void format_rfc3339_timestamp(const struct timeval *tv, char *header_time, size_t header_size) {
+        char gm_buf[sizeof("+0530") + 1];
+        struct tm tm;
+        time_t t;
+
+        assert(header_time);
+
+        t = tv ? tv->tv_sec : ((time_t) (now(CLOCK_REALTIME) / USEC_PER_SEC));
+        localtime_r(&t, &tm);
+
+        strftime(header_time, header_size, "%Y-%m-%dT%T", &tm);
+
+        /* add fractional part */
+        if (tv)
+                snprintf(header_time + strlen(header_time), header_size, ".%06ld", tv->tv_usec);
+
+        /* format the timezone according to RFC */
+        xstrftime(gm_buf, "%z", &tm);
+        snprintf(header_time + strlen(header_time), header_size, "%.3s:%.2s ", gm_buf, gm_buf + 3);
+}
+
+/* The Syslog Protocol RFC5424 format :
+ * <pri>version sp timestamp sp hostname sp app-name sp procid sp msgid sp [sd-id]s sp msg
+ */
+int manager_push_to_network(Manager *m,
+                            const char *priority,
+                            const char *facility,
+                            const char *identifier,
+                            const char *message,
+                            const char *hostname,
+                            const char *pid,
+                            const struct timeval *tv) {
+        char header_priority[sizeof("< >1 ") + 1];
+        char header_time[FORMAT_TIMESTAMP_MAX];
+        uint16_t makepri, pri, fac;
+        struct iovec iov[13];
+        int n = 0;
+
+        assert(m);
+        assert(message);
+        assert(priority);
+        assert(facility);
+
+        pri = (uint16_t) strtoul(priority, NULL, 0);
+        fac = (uint16_t) strtoul(facility, NULL, 0);
+        makepri = (fac << 3) + pri;
+
+        /* First: priority field Second: Version  '<pri>version' */
+        snprintf(header_priority, sizeof(header_priority), "<%i>%i ", makepri, RFC_5424_PROTOCOL);
+        IOVEC_SET_STRING(iov[n++], header_priority);
+
+        /* Third: timestamp */
+        format_rfc3339_timestamp(tv, header_time, sizeof(header_time));
+        IOVEC_SET_STRING(iov[n++], header_time);
+
+        /* Fourth: hostname */
+        if (hostname)
+                IOVEC_SET_STRING(iov[n++], hostname);
+        else
+                IOVEC_SET_STRING(iov[n++], RFC_5424_NILVALUE);
+
+        IOVEC_SET_STRING(iov[n++], " ");
+
+        /* Fifth: identifier */
+        if (identifier)
+                IOVEC_SET_STRING(iov[n++], identifier);
+        else
+                IOVEC_SET_STRING(iov[n++], RFC_5424_NILVALUE);
+
+        IOVEC_SET_STRING(iov[n++], " ");
+
+        /* Sixth: procid */
+        if (pid)
+                IOVEC_SET_STRING(iov[n++], pid);
+        else
+                IOVEC_SET_STRING(iov[n++], RFC_5424_NILVALUE);
+
+        IOVEC_SET_STRING(iov[n++], " ");
+
+        /* Seventh: msgid */
+        IOVEC_SET_STRING(iov[n++], RFC_5424_NILVALUE);
+        IOVEC_SET_STRING(iov[n++], " ");
+
+        /* Eighth: [structured-data] */
+        IOVEC_SET_STRING(iov[n++], RFC_5424_NILVALUE);
+        IOVEC_SET_STRING(iov[n++], " ");
+
+        /* Ninth: message */
+        IOVEC_SET_STRING(iov[n++], message);
+
+        return network_send(m, iov, n);
+}
+
+void manager_close_network_socket(Manager *m) {
+        assert(m);
+
+        m->socket = safe_close(m->socket);
+}
+
+int manager_open_network_socket(Manager *m) {
+        const int ttl = 255;
+        const int one = 1;
+        int r;
+
+        assert(m);
+
+        if (m->address.sockaddr.sa.sa_family != AF_INET && m->address.sockaddr.sa.sa_family != AF_INET6)
+                return EAFNOSUPPORT;
+
+        m->socket = socket(m->address.sockaddr.sa.sa_family, SOCK_DGRAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
+        if (m->socket < 0)
+                return -errno;
+
+        r = setsockopt(m->socket, IPPROTO_IP, IP_PKTINFO, &one, sizeof(one));
+        if (r < 0) {
+                r = -errno;
+                goto fail;
+        }
+
+        r = setsockopt(m->socket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl));
+        if (r < 0) {
+                r = -errno;
+                goto fail;
+        }
+
+        r = setsockopt(m->socket, IPPROTO_IP, IP_MULTICAST_LOOP, &one, sizeof(one));
+        if (r < 0) {
+                r = -errno;
+                goto fail;
+        }
+
+        return m->socket;
+
+ fail:
+        m->socket = safe_close(m->socket);
+        return r;
+}
diff --git a/src/journal-remote/journal-network.c b/src/journal-remote/journal-network.c
new file mode 100644
index 0000000..e61c7b0
--- /dev/null
+++ b/src/journal-remote/journal-network.c
@@ -0,0 +1,218 @@
+/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
+
+/***
+  This file is part of systemd.
+
+  Copyright 2015 Susant Sahani
+
+  systemd is free software; you can redistribute it and/or modify it
+  under the terms of the GNU Lesser General Public License as published by
+  the Free Software Foundation; either version 2.1 of the License, or
+  (at your option) any later version.
+
+  systemd is distributed in the hope that it will be useful, but
+  WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public License
+  along with systemd; If not, see <http://www.gnu.org/licenses/>.
+***/
+
+#include <stdio.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <getopt.h>
+
+#include "sd-daemon.h"
+#include "util.h"
+#include "build.h"
+#include "mkdir.h"
+#include "capability.h"
+#include "network-util.h"
+#include "journal-network-conf.h"
+#include "journal-network-manager.h"
+
+#define STATE_FILE "/var/lib/systemd/journal-network/state"
+
+static const char *arg_cursor = NULL;
+static const char *arg_save_state = STATE_FILE;
+
+static int setup_cursor_state_file(Manager *m, uid_t uid, gid_t gid) {
+        _cleanup_close_ int fd = -1;
+        int r;
+
+        assert(m);
+
+        r = mkdir_parents(m->state_file, 0755);
+        if (r < 0)
+                return log_error_errno(r, "Cannot create parent directory of state file %s: %m",
+                                       m->state_file);
+
+        fd = open(m->state_file, O_RDWR|O_CLOEXEC, 0644);
+        if (fd >= 0) {
+
+                /* Try to fix the access mode, so that we can still
+                   touch the file after dropping priviliges */
+                fchmod(fd, 0644);
+                fchown(fd, uid, gid);
+
+        } else
+                /* create stamp file with the compiled-in date */
+                return touch_file(m->state_file, true, USEC_INFINITY, uid, gid, 0644);
+
+        return 0;
+}
+
+static void help(void) {
+        printf("%s ..\n\n"
+               "Push journal events to a UDP multicast group in RFC 5424 syslog format.\n\n"
+               "  -h --help                 Show this help\n"
+               "     --version              Show package version\n"
+               "     --cursor=CURSOR        Start at the specified cursor\n"
+               "     --save-state[=FILE]    Save uploaded cursors (default \n"
+               "                            " STATE_FILE ")\n"
+               "  -h --help                 Show this help and exit\n"
+               "     --version              Print version string and exit\n"
+               , program_invocation_short_name);
+}
+
+static int parse_argv(int argc, char *argv[]) {
+        enum {
+                ARG_VERSION = 0x100,
+                ARG_CURSOR,
+                ARG_SAVE_STATE,
+        };
+
+        static const struct option options[] = {
+                { "help",         no_argument,       NULL, 'h'                },
+                { "version",      no_argument,       NULL, ARG_VERSION        },
+                { "cursor",       required_argument, NULL, ARG_CURSOR         },
+                { "save-state",   optional_argument, NULL, ARG_SAVE_STATE     },
+                {}
+        };
+
+        int c;
+
+        assert(argc >= 0);
+        assert(argv);
+
+        opterr = 0;
+
+        while ((c = getopt_long(argc, argv, "h", options, NULL)) >= 0)
+                switch(c) {
+                case 'h':
+                        help();
+                        return 0 /* done */;
+
+                case ARG_VERSION:
+                        puts(PACKAGE_STRING);
+                        puts(SYSTEMD_FEATURES);
+                        return 0 /* done */;
+                case ARG_CURSOR:
+                        if (arg_cursor) {
+                                log_error("cannot use more than one --cursor/--after-cursor");
+                                return -EINVAL;
+                        }
+
+                        arg_cursor = optarg;
+                        break;
+                case ARG_SAVE_STATE:
+                        arg_save_state = optarg ?: STATE_FILE;
+                        break;
+
+                case '?':
+                        log_error("Unknown option %s.", argv[optind-1]);
+                        return -EINVAL;
+
+                case ':':
+                        log_error("Missing argument to %s.", argv[optind-1]);
+                        return -EINVAL;
+
+                default:
+                        assert_not_reached("Unhandled option code.");
+                }
+
+
+        if (optind < argc) {
+                log_error("Input arguments make no sense with journal input.");
+                return -EINVAL;
+        }
+
+        return 1;
+}
+
+int main(int argc, char **argv) {
+        _cleanup_(manager_freep) Manager *m = NULL;
+        const char *user = "systemd-journal-network";
+        uid_t uid;
+        gid_t gid;
+        int r;
+
+        log_show_color(true);
+        log_set_target(LOG_TARGET_AUTO);
+        log_parse_environment();
+        log_open();
+
+        umask(0022);
+
+        r = get_user_creds(&user, &uid, &gid, NULL, NULL);
+        if (r < 0) {
+                log_error_errno(r, "Cannot resolve user name %s: %m", user);
+                goto finish;
+        }
+
+        r = parse_argv(argc, argv);
+        if (r <= 0)
+                goto finish;
+
+        r = manager_new(&m, arg_save_state, arg_cursor);
+        if (r < 0) {
+                log_error_errno(r, "Failed to allocate manager: %m");
+                goto finish;
+        }
+
+        r = manager_parse_config_file(m);
+        if (r < 0)
+                log_warning_errno(r, "Failed to parse configuration file: %m");
+
+        r = setup_cursor_state_file(m, uid, gid);
+        if (r < 0)
+                goto cleanup;
+
+        r = drop_privileges(uid, gid,
+                            (1ULL << CAP_NET_ADMIN) |
+                            (1ULL << CAP_NET_BIND_SERVICE) |
+                            (1ULL << CAP_NET_BROADCAST));
+        if (r < 0)
+                goto finish;
+
+        log_debug("%s running as pid "PID_FMT,
+                  program_invocation_short_name, getpid());
+
+        sd_notify(false,
+                  "READY=1\n"
+                  "STATUS=Processing input...");
+
+        if (network_is_online()) {
+                r = manager_connect(m);
+                if (r < 0)
+                        goto finish;
+        }
+
+        r = sd_event_loop(m->event);
+        if (r < 0) {
+                log_error_errno(r, "Failed to run event loop: %m");
+                goto finish;
+        }
+
+        sd_event_get_exit_code(m->event, &r);
+
+ cleanup:
+        sd_notify(false,
+                  "STOPPING=1\n"
+                  "STATUS=Shutting down...");
+
+ finish:
+        return r >= 0 ? EXIT_SUCCESS : EXIT_FAILURE;
+}
diff --git a/src/journal-remote/journal-network.conf.in b/src/journal-remote/journal-network.conf.in
new file mode 100644
index 0000000..b567a46
--- /dev/null
+++ b/src/journal-remote/journal-network.conf.in
@@ -0,0 +1,2 @@
+[Network]
+#Address=239.0.0.1:6000
diff --git a/units/systemd-journal-network.service.in b/units/systemd-journal-network.service.in
new file mode 100644
index 0000000..7196066
--- /dev/null
+++ b/units/systemd-journal-network.service.in
@@ -0,0 +1,19 @@
+#  This file is part of systemd.
+#
+#  systemd is free software; you can redistribute it and/or modify it
+#  under the terms of the GNU Lesser General Public License as published by
+#  the Free Software Foundation; either version 2.1 of the License, or
+#  (at your option) any later version.
+[Unit]
+Description=Journal Syslog Multicast Daemon
+After=network.target
+
+[Service]
+ExecStart=/usr/lib/systemd/systemd-journal-network
+User=systemd-journal-network
+PrivateTmp=yes
+PrivateDevices=yes
+WatchdogSec=20min
+
+[Install]
+WantedBy=multi-user.target
-- 
2.1.0



More information about the systemd-devel mailing list